blob: 6822032fe0df33604e53fbe980576139eb2877d4 [file] [log] [blame]
/*-------------------------------------------------------------------------
*
* createas.c
* Execution of CREATE TABLE ... AS, a/k/a SELECT INTO.
* Since CREATE MATERIALIZED VIEW shares syntax and most behaviors,
* we implement that here, too.
*
* We implement this by diverting the query's normal output to a
* specialized DestReceiver type.
*
* Formerly, CTAS was implemented as a variant of SELECT, which led
* to assorted legacy behaviors that we still try to preserve, notably that
* we must return a tuples-processed count in the QueryCompletion. (We no
* longer do that for CTAS ... WITH NO DATA, however.)
*
* Portions Copyright (c) 1996-2021, PostgreSQL Global Development Group
* Portions Copyright (c) 1994, Regents of the University of California
*
*
* IDENTIFICATION
* src/backend/commands/createas.c
*
*-------------------------------------------------------------------------
*/
#include "postgres.h"
#include "access/heapam.h"
#include "access/htup_details.h"
#include "access/reloptions.h"
#include "access/sysattr.h"
#include "access/tableam.h"
#include "access/xact.h"
#include "access/xlog.h"
#include "catalog/namespace.h"
#include "catalog/index.h"
#include "catalog/pg_constraint.h"
#include "catalog/pg_inherits.h"
#include "catalog/pg_trigger.h"
#include "catalog/toasting.h"
#include "commands/createas.h"
#include "commands/defrem.h"
#include "commands/matview.h"
#include "commands/prepare.h"
#include "commands/tablecmds.h"
#include "commands/tablespace.h"
#include "commands/taskcmds.h"
#include "commands/trigger.h"
#include "commands/view.h"
#include "miscadmin.h"
#include "optimizer/optimizer.h"
#include "optimizer/prep.h"
#include "nodes/makefuncs.h"
#include "nodes/nodeFuncs.h"
#include "parser/parser.h"
#include "parser/parsetree.h"
#include "parser/parse_clause.h"
#include "parser/parse_func.h"
#include "parser/parse_type.h"
#include "postmaster/autostats.h"
#include "rewrite/rewriteHandler.h"
#include "rewrite/rewriteManip.h"
#include "storage/smgr.h"
#include "tcop/tcopprot.h"
#include "tcop/utility.h"
#include "utils/builtins.h"
#include "utils/lsyscache.h"
#include "utils/regproc.h"
#include "utils/fmgroids.h"
#include "utils/rel.h"
#include "utils/resgroup.h"
#include "utils/rls.h"
#include "utils/snapmgr.h"
#include "utils/syscache.h"
#include "catalog/gp_matview_aux.h"
#include "catalog/oid_dispatch.h"
#include "catalog/pg_task.h"
#include "cdb/cdbappendonlyam.h"
#include "cdb/cdbaocsam.h"
#include "cdb/cdbdisp_query.h"
#include "cdb/cdboidsync.h"
#include "cdb/cdbutil.h"
#include "cdb/cdbvars.h"
#include "cdb/memquota.h"
#include "utils/metrics_utils.h"
typedef struct
{
DestReceiver pub; /* publicly-known function pointers */
IntoClause *into; /* target relation specification */
/* These fields are filled by intorel_startup: */
Relation rel; /* relation to write to */
ObjectAddress reladdr; /* address of rel, for ExecCreateTableAs */
CommandId output_cid; /* cmin to insert in output tuples */
int ti_options; /* table_tuple_insert performance options */
BulkInsertState bistate; /* bulk insert state */
} DR_intorel;
typedef struct
{
bool has_agg;
} check_ivm_restriction_context;
static void intorel_startup_dummy(DestReceiver *self, int operation, TupleDesc typeinfo);
/* utility functions for CTAS definition creation */
static ObjectAddress create_ctas_internal(List *attrList, IntoClause *into,
QueryDesc *queryDesc, bool dispatch);
static ObjectAddress create_ctas_nodata(List *tlist, IntoClause *into, QueryDesc *queryDesc);
/* DestReceiver routines for collecting data */
static bool intorel_receive(TupleTableSlot *slot, DestReceiver *self);
static void intorel_shutdown(DestReceiver *self);
static void intorel_destroy(DestReceiver *self);
static void CreateIvmTriggersOnBaseTablesRecurse(Query *qry, Node *node, Oid matviewOid,
Relids *relids, bool ex_lock);
static void CreateIvmTrigger(Oid relOid, Oid viewOid, int16 type, int16 timing, bool ex_lock);
static void check_ivm_restriction(Node *node);
static bool check_ivm_restriction_walker(Node *node, check_ivm_restriction_context *context);
static Bitmapset *get_primary_key_attnos_from_query(Query *query, List **constraintList);
static bool check_aggregate_supports_ivm(Oid aggfnoid);
static void create_dynamic_table_auto_refresh_task(ParseState *pstate, Relation DynamicTableRel, char *schedule);
/*
* create_ctas_internal
*
* Internal utility used for the creation of the definition of a relation
* created via CREATE TABLE AS or a materialized view. Caller needs to
* provide a list of attributes (ColumnDef nodes).
*/
static ObjectAddress
create_ctas_internal(List *attrList, IntoClause *into, QueryDesc *queryDesc, bool dispatch)
{
CreateStmt *create = makeNode(CreateStmt);
bool is_matview;
char relkind;
Datum toast_options;
static char *validnsps[] = HEAP_RELOPT_NAMESPACES;
ObjectAddress intoRelationAddr;
/* Sync OIDs for into relation, if any */
cdb_sync_oid_to_segments();
/* This code supports both CREATE TABLE AS and CREATE MATERIALIZED VIEW */
is_matview = (into->viewQuery != NULL);
relkind = is_matview ? RELKIND_MATVIEW : RELKIND_RELATION;
if (Gp_role == GP_ROLE_EXECUTE)
{
create = queryDesc->ddesc->intoCreateStmt;
}
/* funny indentation to avoid re-indenting a lot of upstream code */
else
{
/*
* Create the target relation by faking up a CREATE TABLE parsetree and
* passing it to DefineRelation.
*/
create->relation = into->rel;
create->tableElts = attrList;
create->inhRelations = NIL;
create->ofTypename = NULL;
create->constraints = NIL;
create->options = into->options;
create->oncommit = into->onCommit;
create->tablespacename = into->tableSpaceName;
create->if_not_exists = false;
create->origin = ORIGIN_NO_GEN;
create->distributedBy = NULL; /* We will pass a pre-made intoPolicy instead */
create->partitionBy = NULL; /* CTAS does not not support partition. */
create->buildAoBlkdir = false;
create->attr_encodings = NULL; /* filled in by DefineRelation */
/* Save them in CreateStmt for dispatching. */
create->relKind = relkind;
create->ownerid = GetUserId();
create->accessMethod = into->accessMethod;
create->isCtas = true;
create->intoQuery = into->viewQuery;
create->intoPolicy = queryDesc->plannedstmt->intoPolicy;
}
/* end of funny indentation */
/*
* Create the relation. (This will error out if there's an existing view,
* so we don't need more code to complain if "replace" is false.)
*
* Don't dispatch it yet, as we haven't created the toast and other
* auxiliary tables yet.
*
* Pass the policy that was computed by the planner.
*/
intoRelationAddr = DefineRelation(create,
relkind,
InvalidOid,
NULL,
NULL,
false,
queryDesc->ddesc ? queryDesc->ddesc->useChangedAOOpts : true,
queryDesc->plannedstmt->intoPolicy);
if (Gp_role == GP_ROLE_DISPATCH)
{
queryDesc->ddesc->intoCreateStmt = create;
}
/*
* If necessary, create a TOAST table for the target table. Note that
* NewRelationCreateToastTable ends with CommandCounterIncrement(), so
* that the TOAST table will be visible for insertion.
*/
CommandCounterIncrement();
/* parse and validate reloptions for the toast table */
toast_options = transformRelOptions((Datum) 0,
create->options,
"toast",
validnsps,
true, false);
NewRelationCreateToastTable(intoRelationAddr.objectId, toast_options);
/* Create the "view" part of a materialized view. */
if (is_matview)
{
/* StoreViewQuery scribbles on tree, so make a copy */
Query *query = (Query *) copyObject(into->viewQuery);
StoreViewQuery(intoRelationAddr.objectId, query, false);
CommandCounterIncrement();
}
if (Gp_role == GP_ROLE_DISPATCH && dispatch)
CdbDispatchUtilityStatement((Node *) create,
DF_CANCEL_ON_ERROR |
DF_NEED_TWO_PHASE |
DF_WITH_SNAPSHOT,
GetAssignedOidsForDispatch(),
NULL);
return intoRelationAddr;
}
/*
* create_ctas_nodata
*
* Create CTAS or materialized view when WITH NO DATA is used, starting from
* the targetlist of the SELECT or view definition.
*/
static ObjectAddress
create_ctas_nodata(List *tlist, IntoClause *into, QueryDesc *queryDesc)
{
List *attrList;
ListCell *t,
*lc;
ObjectAddress intoRelationAddr;
/*
* Build list of ColumnDefs from non-junk elements of the tlist. If a
* column name list was specified in CREATE TABLE AS, override the column
* names in the query. (Too few column names are OK, too many are not.)
*/
attrList = NIL;
lc = list_head(into->colNames);
foreach(t, tlist)
{
TargetEntry *tle = (TargetEntry *) lfirst(t);
if (!tle->resjunk)
{
ColumnDef *col;
char *colname;
if (lc)
{
colname = strVal(lfirst(lc));
lc = lnext(into->colNames, lc);
}
else
colname = tle->resname;
col = makeColumnDef(colname,
exprType((Node *) tle->expr),
exprTypmod((Node *) tle->expr),
exprCollation((Node *) tle->expr));
/*
* It's possible that the column is of a collatable type but the
* collation could not be resolved, so double-check. (We must
* check this here because DefineRelation would adopt the type's
* default collation rather than complaining.)
*/
if (!OidIsValid(col->collOid) &&
type_is_collatable(col->typeName->typeOid))
ereport(ERROR,
(errcode(ERRCODE_INDETERMINATE_COLLATION),
errmsg("no collation was derived for column \"%s\" with collatable type %s",
col->colname,
format_type_be(col->typeName->typeOid)),
errhint("Use the COLLATE clause to set the collation explicitly.")));
attrList = lappend(attrList, col);
}
}
if (lc != NULL)
ereport(ERROR,
(errcode(ERRCODE_SYNTAX_ERROR),
errmsg("too many column names were specified")));
/* Create the relation definition using the ColumnDef list */
intoRelationAddr = create_ctas_internal(attrList, into, queryDesc, true);
return intoRelationAddr;
}
/*
* ExecCreateTableAs -- execute a CREATE TABLE AS command
*/
ObjectAddress
ExecCreateTableAs(ParseState *pstate, CreateTableAsStmt *stmt,
ParamListInfo params, QueryEnvironment *queryEnv,
QueryCompletion *qc)
{
Query *query = castNode(Query, stmt->query);
IntoClause *into = stmt->into;
bool is_matview = (into->viewQuery != NULL);
DestReceiver *dest;
Oid save_userid = InvalidOid;
int save_sec_context = 0;
int save_nestlevel = 0;
ObjectAddress address;
List *rewritten;
PlannedStmt *plan;
QueryDesc *queryDesc;
Query *query_immv = NULL;
Oid relationOid = InvalidOid; /* relation that is modified */
AutoStatsCmdType cmdType = AUTOSTATS_CMDTYPE_SENTINEL; /* command type */
Assert(Gp_role != GP_ROLE_EXECUTE);
/* Check if the relation exists or not */
{
Oid nspid;
Oid oldrelid;
nspid = RangeVarGetCreationNamespace(into->rel);
oldrelid = get_relname_relid(into->rel->relname, nspid);
if (OidIsValid(oldrelid))
{
if (!stmt->if_not_exists)
ereport(ERROR,
(errcode(ERRCODE_DUPLICATE_TABLE),
errmsg("relation \"%s\" already exists",
into->rel->relname)));
/*
* The relation exists and IF NOT EXISTS has been specified.
*
* If we are in an extension script, insist that the pre-existing
* object be a member of the extension, to avoid security risks.
*/
ObjectAddressSet(address, RelationRelationId, oldrelid);
checkMembershipInCurrentExtension(&address);
/* OK to skip */
ereport(NOTICE,
(errcode(ERRCODE_DUPLICATE_TABLE),
errmsg("relation \"%s\" already exists, skipping",
into->rel->relname)));
return InvalidObjectAddress;
}
}
/*
* Create the tuple receiver object and insert info it will need
*/
dest = CreateIntoRelDestReceiver(into);
/*
* The contained Query could be a SELECT, or an EXECUTE utility command.
* If the latter, we just pass it off to ExecuteQuery.
*/
if (query->commandType == CMD_UTILITY &&
IsA(query->utilityStmt, ExecuteStmt))
{
ExecuteStmt *estmt = castNode(ExecuteStmt, query->utilityStmt);
Assert(!is_matview); /* excluded by syntax */
ExecuteQuery(pstate, estmt, into, params, dest, qc);
/* get object address that intorel_startup saved for us */
address = ((DR_intorel *) dest)->reladdr;
return address;
}
Assert(query->commandType == CMD_SELECT);
/*
* For materialized views, lock down security-restricted operations and
* arrange to make GUC variable changes local to this command. This is
* not necessary for security, but this keeps the behavior similar to
* REFRESH MATERIALIZED VIEW. Otherwise, one could create a materialized
* view not possible to refresh.
*/
if (is_matview)
{
GetUserIdAndSecContext(&save_userid, &save_sec_context);
SetUserIdAndSecContext(save_userid,
save_sec_context | SECURITY_RESTRICTED_OPERATION);
save_nestlevel = NewGUCNestLevel();
}
if (is_matview && into->ivm)
{
/* check if the query is supported in IMMV definition */
if (contain_mutable_functions((Node *) query))
ereport(ERROR,
(errcode(ERRCODE_FEATURE_NOT_SUPPORTED),
errmsg("mutable function is not supported on incrementally maintainable materialized view"),
errhint("functions must be marked IMMUTABLE")));
check_ivm_restriction((Node *) query);
/* For IMMV, we need to rewrite matview query */
query = rewriteQueryForIMMV(query, into->colNames);
query_immv = copyObject(query);
}
{
/*
* Parse analysis was done already, but we still have to run the rule
* rewriter. We do not do AcquireRewriteLocks: we assume the query
* either came straight from the parser, or suitable locks were
* acquired by plancache.c.
*/
rewritten = QueryRewrite(query);
/* SELECT should never rewrite to more or less than one SELECT query */
if (list_length(rewritten) != 1)
elog(ERROR, "unexpected rewrite result for %s",
is_matview ? "CREATE MATERIALIZED VIEW" :
"CREATE TABLE AS SELECT");
query = linitial_node(Query, rewritten);
Assert(query->commandType == CMD_SELECT);
/* plan the query */
plan = pg_plan_query(query, pstate->p_sourcetext,
CURSOR_OPT_PARALLEL_OK, params);
/*GPDB: Save the target information in PlannedStmt */
/*
* GPDB_92_MERGE_FIXME: it really should be an optimizer's responsibility
* to correctly set the into-clause and into-policy of the PlannedStmt.
*/
plan->intoClause = copyObject(stmt->into);
/*
* Use a snapshot with an updated command ID to ensure this query sees
* results of any previously executed queries. (This could only
* matter if the planner executed an allegedly-stable function that
* changed the database contents, but let's do it anyway to be
* parallel to the EXPLAIN code path.)
*/
PushCopiedSnapshot(GetActiveSnapshot());
UpdateActiveSnapshotCommandId();
/* Create a QueryDesc, redirecting output to our tuple receiver */
queryDesc = CreateQueryDesc(plan, pstate->p_sourcetext,
GetActiveSnapshot(), InvalidSnapshot,
dest, params, queryEnv, 0);
}
/* GPDB hook for collecting query info */
if (query_info_collect_hook)
(*query_info_collect_hook)(METRICS_QUERY_SUBMIT, queryDesc);
if (into->skipData)
{
/*
* If WITH NO DATA was specified, do not go through the rewriter,
* planner and executor. Just define the relation using a code path
* similar to CREATE VIEW. This avoids dump/restore problems stemming
* from running the planner before all dependencies are set up.
*/
queryDesc->ddesc = makeNode(QueryDispatchDesc);
address = create_ctas_nodata(query->targetList, into, queryDesc);
}
else
{
check_and_unassign_from_resgroup(queryDesc->plannedstmt);
queryDesc->plannedstmt->query_mem = ResourceManagerGetQueryMemoryLimit(queryDesc->plannedstmt);
/* call ExecutorStart to prepare the plan for execution */
ExecutorStart(queryDesc, GetIntoRelEFlags(into));
if (Gp_role == GP_ROLE_DISPATCH)
autostats_get_cmdtype(queryDesc, &cmdType, &relationOid);
/* run the plan to completion */
ExecutorRun(queryDesc, ForwardScanDirection, 0L, true);
/* and clean up */
ExecutorFinish(queryDesc);
ExecutorEnd(queryDesc);
/*
* In GPDB, computing the row count needs to happen after ExecutorEnd()
* because that is where it gets the row count from dispatch. There's
* also some special processing if the relation was a replicated table.
* In upstream Postgres, the rowcount is saved before ExecutorFinish().
*/
if (into->distributedBy &&
((DistributedBy *)(into->distributedBy))->ptype == POLICYTYPE_REPLICATED)
queryDesc->es_processed /= ((DistributedBy *)(into->distributedBy))->numsegments;
/* get object address that intorel_startup saved for us */
address = ((DR_intorel *) dest)->reladdr;
/* save the rowcount if we're given a qc to fill */
if (qc)
SetQueryCompletion(qc, CMDTAG_SELECT, queryDesc->es_processed);
/* MPP-14001: Running auto_stats */
if (Gp_role == GP_ROLE_DISPATCH)
{
bool inFunction = already_under_executor_run() || utility_nested();
auto_stats(cmdType, relationOid, queryDesc->es_processed, inFunction);
}
}
if (is_matview)
{
/* Roll back any GUC changes */
AtEOXact_GUC(false, save_nestlevel);
/* Restore userid and security context */
SetUserIdAndSecContext(save_userid, save_sec_context);
Oid matviewOid = address.objectId;
Relation matviewRel = table_open(matviewOid, NoLock);
/*
* Record materialized view aux entry.
* This is used to check if a materialized view's meta data,
* ex: data is up to date and etc.
* The info is used for expanding Incremental Appedn Agg Plan
* and Answer Query Using Materialized Views.
*/
if (IS_QD_OR_SINGLENODE())
InsertMatviewAuxEntry(matviewOid, (Query* )into->viewQuery, into->skipData);
if (into->ivm)
{
/*
* Mark relisivm field, if it's a matview and into->ivm is true.
*/
SetMatViewIVMState(matviewRel, true);
if (!into->skipData)
{
Assert(query_immv != NULL);
/* Create triggers on incremental maintainable materialized view */
CreateIvmTriggersOnBaseTables(query_immv, matviewOid);
}
}
/* Set Dynamic Tables. */
if (into->dynamicTbl)
{
SetDynamicTableState(matviewRel);
create_dynamic_table_auto_refresh_task(pstate, matviewRel, into->schedule);
}
table_close(matviewRel, NoLock);
}
{
dest->rDestroy(dest);
FreeQueryDesc(queryDesc);
PopActiveSnapshot();
}
return address;
}
/*
* rewriteQueryForIMMV -- rewrite view definition query for IMMV
*
* count(*) is added for counting distinct tuples in views.
* Also, additional hidden columns are added for aggregate values.
*/
Query *
rewriteQueryForIMMV(Query *query, List *colNames)
{
Query *rewritten;
Node *node;
ParseState *pstate = make_parsestate(NULL);
FuncCall *fn;
rewritten = copyObject(query);
pstate->p_expr_kind = EXPR_KIND_SELECT_TARGET;
/* group keys must be in targetlist */
if (rewritten->groupClause)
{
ListCell *lc;
foreach(lc, rewritten->groupClause)
{
SortGroupClause *scl = (SortGroupClause *) lfirst(lc);
TargetEntry *tle = get_sortgroupclause_tle(scl, rewritten->targetList);
if (tle->resjunk)
ereport(ERROR,
(errcode(ERRCODE_FEATURE_NOT_SUPPORTED),
errmsg("GROUP BY expression not appearing in select list is not supported on incrementally maintainable materialized view")));
}
}
/* Convert DISTINCT to GROUP BY. count(*) will be added afterward. */
else if (!rewritten->hasAggs && rewritten->distinctClause)
rewritten->groupClause = transformDistinctClause(NULL, &rewritten->targetList, rewritten->sortClause, false);
/* Add additional columns for aggregate values */
if (rewritten->hasAggs)
{
ListCell *lc;
List *aggs = NIL;
AttrNumber next_resno = list_length(rewritten->targetList) + 1;
foreach(lc, rewritten->targetList)
{
TargetEntry *tle = (TargetEntry *) lfirst(lc);
char *resname = (colNames == NIL || foreach_current_index(lc) >= list_length(colNames) ?
tle->resname : strVal(list_nth(colNames, tle->resno - 1)));
if (IsA(tle->expr, Aggref))
makeIvmAggColumn(pstate, (Aggref *) tle->expr, resname, &next_resno, &aggs);
}
rewritten->targetList = list_concat(rewritten->targetList, aggs);
}
/* Add count(*) for counting distinct tuples in views */
if (rewritten->distinctClause || rewritten->hasAggs)
{
TargetEntry *tle;
fn = makeFuncCall(SystemFuncName("count"), NIL, COERCE_EXPLICIT_CALL, -1);
fn->agg_star = true;
node = ParseFuncOrColumn(pstate, fn->funcname, NIL, NULL, fn, false, -1);
tle = makeTargetEntry((Expr *) node,
list_length(rewritten->targetList) + 1,
pstrdup("__ivm_count__"),
false);
rewritten->targetList = lappend(rewritten->targetList, tle);
rewritten->hasAggs = true;
}
return rewritten;
}
/*
* makeIvmAggColumn -- make additional aggregate columns for IVM
*
* For an aggregate column specified by aggref, additional aggregate columns
* are added, which are used to calculate the new aggregate value in IMMV.
* An additional aggregate columns has a name based on resname
* (ex. ivm_count_resname), and resno specified by next_resno. The created
* columns are returned to aggs, and the resno for the next column is also
* returned to next_resno.
*
* Currently, an additional count() is created for aggref other than count.
* In addition, sum() is created for avg aggregate column.
*/
void
makeIvmAggColumn(ParseState *pstate, Aggref *aggref, char *resname, AttrNumber *next_resno, List **aggs)
{
TargetEntry *tle_count;
Node *node;
FuncCall *fn;
Const *dmy_arg = makeConst(INT4OID,
-1,
InvalidOid,
sizeof(int32),
Int32GetDatum(1),
false,
true); /* pass by value */
const char *aggname = get_func_name(aggref->aggfnoid);
/*
* For aggregate functions except count, add count() func with the same arg parameters.
* This count result is used for determining if the aggregate value should be NULL or not.
* Also, add sum() func for avg because we need to calculate an average value as sum/count.
*
* XXX: If there are same expressions explicitly in the target list, we can use this instead
* of adding new duplicated one.
*/
if (strcmp(aggname, "count") != 0)
{
fn = makeFuncCall(SystemFuncName("count"), NIL, COERCE_EXPLICIT_CALL, -1);
/* Make a Func with a dummy arg, and then override this by the original agg's args. */
node = ParseFuncOrColumn(pstate, fn->funcname, list_make1(dmy_arg), NULL, fn, false, -1);
((Aggref *)node)->args = aggref->args;
tle_count = makeTargetEntry((Expr *) node,
*next_resno,
pstrdup(makeObjectName("__ivm_count",resname, "_")),
false);
*aggs = lappend(*aggs, tle_count);
(*next_resno)++;
}
if (strcmp(aggname, "avg") == 0)
{
List *dmy_args = NIL;
ListCell *lc;
foreach(lc, aggref->aggargtypes)
{
Oid typeid = lfirst_oid(lc);
Type type = typeidType(typeid);
Const *con = makeConst(typeid,
-1,
typeTypeCollation(type),
typeLen(type),
(Datum) 0,
true,
typeByVal(type));
dmy_args = lappend(dmy_args, con);
ReleaseSysCache(type);
}
fn = makeFuncCall(SystemFuncName("sum"), NIL, COERCE_EXPLICIT_CALL, -1);
/* Make a Func with dummy args, and then override this by the original agg's args. */
node = ParseFuncOrColumn(pstate, fn->funcname, dmy_args, NULL, fn, false, -1);
((Aggref *)node)->args = aggref->args;
tle_count = makeTargetEntry((Expr *) node,
*next_resno,
pstrdup(makeObjectName("__ivm_sum",resname, "_")),
false);
*aggs = lappend(*aggs, tle_count);
(*next_resno)++;
}
}
/*
* GetIntoRelEFlags --- compute executor flags needed for CREATE TABLE AS
*
* This is exported because EXPLAIN and PREPARE need it too. (Note: those
* callers still need to deal explicitly with the skipData flag; since they
* use different methods for suppressing execution, it doesn't seem worth
* trying to encapsulate that part.)
*/
int
GetIntoRelEFlags(IntoClause *intoClause)
{
int flags = 0;
if (intoClause->skipData)
flags |= EXEC_FLAG_WITH_NO_DATA;
return flags;
}
/*
* CreateTableAsRelExists --- check existence of relation for CreateTableAsStmt
*
* Utility wrapper checking if the relation pending for creation in this
* CreateTableAsStmt query already exists or not. Returns true if the
* relation exists, otherwise false.
*/
bool
CreateTableAsRelExists(CreateTableAsStmt *ctas)
{
Oid nspid;
IntoClause *into = ctas->into;
nspid = RangeVarGetCreationNamespace(into->rel);
if (get_relname_relid(into->rel->relname, nspid))
{
if (!ctas->if_not_exists)
ereport(ERROR,
(errcode(ERRCODE_DUPLICATE_TABLE),
errmsg("relation \"%s\" already exists",
into->rel->relname)));
/* The relation exists and IF NOT EXISTS has been specified */
ereport(NOTICE,
(errcode(ERRCODE_DUPLICATE_TABLE),
errmsg("relation \"%s\" already exists, skipping",
into->rel->relname)));
return true;
}
/* Relation does not exist, it can be created */
return false;
}
/*
* CreateIntoRelDestReceiver -- create a suitable DestReceiver object
*
* intoClause will be NULL if called from CreateDestReceiver(), in which
* case it has to be provided later. However, it is convenient to allow
* self->into to be filled in immediately for other callers.
*/
DestReceiver *
CreateIntoRelDestReceiver(IntoClause *intoClause)
{
DR_intorel *self = (DR_intorel *) palloc0(sizeof(DR_intorel));
self->pub.receiveSlot = intorel_receive;
self->pub.rStartup = intorel_startup_dummy;
self->pub.rShutdown = intorel_shutdown;
self->pub.rDestroy = intorel_destroy;
self->pub.mydest = DestIntoRel;
self->into = intoClause;
return (DestReceiver *) self;
}
/*
* intorel_startup_dummy --- executor startup
*/
static void
intorel_startup_dummy(DestReceiver *self, int operation, TupleDesc typeinfo)
{
/* See intorel_initplan() for explanation */
table_dml_init(((DR_intorel *)self)->rel, CMD_INSERT);
}
/*
* intorel_initplan --- Based on PG intorel_startup().
* Parameters are different. We need to run the code earlier before the
* executor runs since we want the relation to be created earlier else current
* MPP framework will fail. This could be called in InitPlan() as before, but
* we could call it just before ExecutorRun() in ExecCreateTableAs(). In the
* future if the requirment is general we could add an interface into
* DestReceiver but so far that is not needed (Based on PG 11 code.)
*/
void
intorel_initplan(struct QueryDesc *queryDesc, int eflags)
{
DR_intorel *myState;
/* Get 'into' from the dispatched plan */
IntoClause *into = queryDesc->plannedstmt->intoClause;
bool is_matview;
List *attrList;
ObjectAddress intoRelationAddr;
Relation intoRelationDesc;
ListCell *lc;
int attnum;
TupleDesc typeinfo = queryDesc->tupDesc;
/* If EXPLAIN/QE, skip creating the "into" relation. */
if ((eflags & EXEC_FLAG_EXPLAIN_ONLY) ||
(Gp_role == GP_ROLE_EXECUTE && !Gp_is_writer))
return;
/* This code supports both CREATE TABLE AS and CREATE MATERIALIZED VIEW */
is_matview = (into->viewQuery != NULL);
/*
* Build column definitions using "pre-cooked" type and collation info. If
* a column name list was specified in CREATE TABLE AS, override the
* column names derived from the query. (Too few column names are OK, too
* many are not.)
*/
attrList = NIL;
lc = list_head(into->colNames);
for (attnum = 0; attnum < typeinfo->natts; attnum++)
{
Form_pg_attribute attribute = TupleDescAttr(typeinfo, attnum);
ColumnDef *col;
char *colname;
/* Don't override hidden columns added for IVM */
if (lc && !isIvmName(NameStr(attribute->attname)))
{
colname = strVal(lfirst(lc));
lc = lnext(into->colNames, lc);
}
else
colname = NameStr(attribute->attname);
col = makeColumnDef(colname,
attribute->atttypid,
attribute->atttypmod,
attribute->attcollation);
/*
* It's possible that the column is of a collatable type but the
* collation could not be resolved, so double-check. (We must check
* this here because DefineRelation would adopt the type's default
* collation rather than complaining.)
*/
if (!OidIsValid(col->collOid) &&
type_is_collatable(col->typeName->typeOid))
ereport(ERROR,
(errcode(ERRCODE_INDETERMINATE_COLLATION),
errmsg("no collation was derived for column \"%s\" with collatable type %s",
col->colname,
format_type_be(col->typeName->typeOid)),
errhint("Use the COLLATE clause to set the collation explicitly.")));
attrList = lappend(attrList, col);
}
if (lc != NULL)
ereport(ERROR,
(errcode(ERRCODE_SYNTAX_ERROR),
errmsg("too many column names were specified")));
/*
* Actually create the target table
* We also get here with CREATE TABLE AS EXECUTE ... WITH NO DATA. In that
* case, dispatch the creation of the table immediately. Normally, the table
* is created in the initialization of the plan in QEs, but with NO DATA, we
* don't need to dispatch the plan during ExecutorStart().
*/
intoRelationAddr = create_ctas_internal(attrList, into, queryDesc,
into->skipData ? true : false);
/*
* Finally we can open the target table
*/
intoRelationDesc = table_open(intoRelationAddr.objectId, AccessExclusiveLock);
/*
* Make sure the constructed table does not have RLS enabled.
*
* check_enable_rls() will ereport(ERROR) itself if the user has requested
* something invalid, and otherwise will return RLS_ENABLED if RLS should
* be enabled here. We don't actually support that currently, so throw
* our own ereport(ERROR) if that happens.
*/
if (check_enable_rls(intoRelationAddr.objectId, InvalidOid, false) == RLS_ENABLED)
ereport(ERROR,
(errcode(ERRCODE_FEATURE_NOT_SUPPORTED),
errmsg("policies not yet implemented for this command")));
/*
* Tentatively mark the target as populated, if it's a matview and we're
* going to fill it; otherwise, no change needed.
*/
if (is_matview && !into->skipData)
SetMatViewPopulatedState(intoRelationDesc, true);
/*
* Fill private fields of myState for use by later routines
*/
if (queryDesc->dest->mydest != DestIntoRel)
queryDesc->dest = CreateIntoRelDestReceiver(into);
myState = (DR_intorel *) queryDesc->dest;
myState->rel = intoRelationDesc;
myState->reladdr = intoRelationAddr;
myState->output_cid = GetCurrentCommandId(true);
myState->ti_options = TABLE_INSERT_SKIP_FSM;
/*
* If WITH NO DATA is specified, there is no need to set up the state for
* bulk inserts as there are no tuples to insert.
*/
if (!into->skipData)
myState->bistate = GetBulkInsertState();
else
myState->bistate = NULL;
/*
* Valid smgr_targblock implies something already wrote to the relation.
* This may be harmless, but this function hasn't planned for it.
*/
Assert(RelationGetTargetBlock(intoRelationDesc) == InvalidBlockNumber);
}
/*
* intorel_receive --- receive one tuple
*/
static bool
intorel_receive(TupleTableSlot *slot, DestReceiver *self)
{
DR_intorel *myState = (DR_intorel *) self;
/* Nothing to insert if WITH NO DATA is specified. */
if (!myState->into->skipData)
{
/*
* Note that the input slot might not be of the type of the target
* relation. That's supported by table_tuple_insert(), but slightly
* less efficient than inserting with the right slot - but the
* alternative would be to copy into a slot of the right type, which
* would not be cheap either. This also doesn't allow accessing per-AM
* data (say a tuple's xmin), but since we don't do that here...
*/
table_tuple_insert(myState->rel,
slot,
myState->output_cid,
myState->ti_options,
myState->bistate);
}
/* We know this is a newly created relation, so there are no indexes */
return true;
}
/*
* intorel_shutdown --- executor end
*/
static void
intorel_shutdown(DestReceiver *self)
{
DR_intorel *myState = (DR_intorel *) self;
IntoClause *into = myState->into;
Relation into_rel = myState->rel;
if (into_rel == NULL)
return;
if (!into->skipData)
{
FreeBulkInsertState(myState->bistate);
table_finish_bulk_insert(myState->rel, myState->ti_options);
}
/* close rel, but keep lock until commit */
table_close(myState->rel, NoLock);
myState->rel = NULL;
}
/*
* intorel_destroy --- release DestReceiver object
*/
static void
intorel_destroy(DestReceiver *self)
{
pfree(self);
}
/*
* Get the OID of the relation created for SELECT INTO or CREATE TABLE AS.
*
* To be called between ExecutorStart and ExecutorEnd.
*/
Oid
GetIntoRelOid(QueryDesc *queryDesc)
{
DR_intorel *myState = (DR_intorel *) queryDesc->dest;
Relation into_rel = myState->rel;
if (myState && myState->pub.mydest == DestIntoRel && into_rel)
return RelationGetRelid(into_rel);
else
return InvalidOid;
}
/*
* CreateIvmTriggersOnBaseTables -- create IVM triggers on all base tables
*/
void
CreateIvmTriggersOnBaseTables(Query *qry, Oid matviewOid)
{
Relids relids = NULL;
bool ex_lock = false;
RangeTblEntry *rte;
/* Immediately return if we don't have any base tables. */
if (list_length(qry->rtable) < 1)
return;
/*
* If the view has more than one base tables, we need an exclusive lock
* on the view so that the view would be maintained serially to avoid
* the inconsistency that occurs when two base tables are modified in
* concurrent transactions. However, if the view has only one table,
* we can use a weaker lock.
*
* The type of lock should be determined here, because if we check the
* view definition at maintenance time, we need to acquire a weaker lock,
* and upgrading the lock level after this increases probability of
* deadlock.
*/
rte = list_nth(qry->rtable, 0);
if (list_length(qry->rtable) > 1 || rte->rtekind != RTE_RELATION)
ex_lock = true;
CreateIvmTriggersOnBaseTablesRecurse(qry, (Node *)qry, matviewOid, &relids, ex_lock);
bms_free(relids);
}
static void
CreateIvmTriggersOnBaseTablesRecurse(Query *qry, Node *node, Oid matviewOid,
Relids *relids, bool ex_lock)
{
if (node == NULL)
return;
/* This can recurse, so check for excessive recursion */
check_stack_depth();
switch (nodeTag(node))
{
case T_Query:
{
Query *query = (Query *) node;
CreateIvmTriggersOnBaseTablesRecurse(qry, (Node *)query->jointree, matviewOid, relids, ex_lock);
}
break;
case T_RangeTblRef:
{
int rti = ((RangeTblRef *) node)->rtindex;
RangeTblEntry *rte = rt_fetch(rti, qry->rtable);
if (rte->rtekind == RTE_RELATION && !bms_is_member(rte->relid, *relids))
{
CreateIvmTrigger(rte->relid, matviewOid, TRIGGER_TYPE_INSERT, TRIGGER_TYPE_BEFORE, ex_lock);
CreateIvmTrigger(rte->relid, matviewOid, TRIGGER_TYPE_DELETE, TRIGGER_TYPE_BEFORE, ex_lock);
CreateIvmTrigger(rte->relid, matviewOid, TRIGGER_TYPE_UPDATE, TRIGGER_TYPE_BEFORE, ex_lock);
CreateIvmTrigger(rte->relid, matviewOid, TRIGGER_TYPE_TRUNCATE, TRIGGER_TYPE_BEFORE, true);
CreateIvmTrigger(rte->relid, matviewOid, TRIGGER_TYPE_INSERT, TRIGGER_TYPE_AFTER, ex_lock);
CreateIvmTrigger(rte->relid, matviewOid, TRIGGER_TYPE_DELETE, TRIGGER_TYPE_AFTER, ex_lock);
CreateIvmTrigger(rte->relid, matviewOid, TRIGGER_TYPE_UPDATE, TRIGGER_TYPE_AFTER, ex_lock);
CreateIvmTrigger(rte->relid, matviewOid, TRIGGER_TYPE_TRUNCATE, TRIGGER_TYPE_AFTER, true);
*relids = bms_add_member(*relids, rte->relid);
}
}
break;
case T_FromExpr:
{
FromExpr *f = (FromExpr *) node;
ListCell *l;
foreach(l, f->fromlist)
CreateIvmTriggersOnBaseTablesRecurse(qry, lfirst(l), matviewOid, relids, ex_lock);
}
break;
case T_JoinExpr:
{
JoinExpr *j = (JoinExpr *) node;
CreateIvmTriggersOnBaseTablesRecurse(qry, j->larg, matviewOid, relids, ex_lock);
CreateIvmTriggersOnBaseTablesRecurse(qry, j->rarg, matviewOid, relids, ex_lock);
}
break;
default:
elog(ERROR, "unrecognized node type: %d", (int) nodeTag(node));
}
}
/*
* CreateIvmTrigger -- create IVM trigger on a base table
*/
static void
CreateIvmTrigger(Oid relOid, Oid viewOid, int16 type, int16 timing, bool ex_lock)
{
ObjectAddress refaddr;
ObjectAddress address;
CreateTrigStmt *ivm_trigger;
List *transitionRels = NIL;
char internaltrigname[NAMEDATALEN];
Assert(timing == TRIGGER_TYPE_BEFORE || timing == TRIGGER_TYPE_AFTER);
refaddr.classId = RelationRelationId;
refaddr.objectId = viewOid;
refaddr.objectSubId = 0;
ivm_trigger = makeNode(CreateTrigStmt);
ivm_trigger->relation = makeRangeVar(get_namespace_name(get_rel_namespace(relOid)), get_rel_name(relOid), -1);
ivm_trigger->row = false;
ivm_trigger->matviewId = viewOid;
ivm_trigger->timing = timing;
ivm_trigger->events = type;
switch (type)
{
case TRIGGER_TYPE_INSERT:
ivm_trigger->trigname = (timing == TRIGGER_TYPE_BEFORE ? "IVM_trigger_ins_before" : "IVM_trigger_ins_after");
break;
case TRIGGER_TYPE_DELETE:
ivm_trigger->trigname = (timing == TRIGGER_TYPE_BEFORE ? "IVM_trigger_del_before" : "IVM_trigger_del_after");
break;
case TRIGGER_TYPE_UPDATE:
ivm_trigger->trigname = (timing == TRIGGER_TYPE_BEFORE ? "IVM_trigger_upd_before" : "IVM_trigger_upd_after");
break;
case TRIGGER_TYPE_TRUNCATE:
ivm_trigger->trigname = (timing == TRIGGER_TYPE_BEFORE ? "IVM_trigger_truncate_before" : "IVM_trigger_truncate_after");
break;
default:
elog(ERROR, "unsupported trigger type");
}
snprintf(internaltrigname, sizeof(internaltrigname),
"%s_%u", ivm_trigger->trigname, viewOid);
ivm_trigger->trigname = pstrdup(internaltrigname);
if (timing == TRIGGER_TYPE_AFTER)
{
if (type == TRIGGER_TYPE_INSERT || type == TRIGGER_TYPE_UPDATE)
{
TriggerTransition *n = makeNode(TriggerTransition);
n->name = "__ivm_newtable";
n->isNew = true;
n->isTable = true;
transitionRels = lappend(transitionRels, n);
}
if (type == TRIGGER_TYPE_DELETE || type == TRIGGER_TYPE_UPDATE)
{
TriggerTransition *n = makeNode(TriggerTransition);
n->name = "__ivm_oldtable";
n->isNew = false;
n->isTable = true;
transitionRels = lappend(transitionRels, n);
}
}
/*
* XXX: When using DELETE or UPDATE, we must use exclusive lock for now
* because apply_old_delta(_with_count) uses ctid to identify the tuple
* to be deleted/deleted, but doesn't work in concurrent situations.
*
* If the view doesn't have aggregate, distinct, or tuple duplicate,
* then it would work even in concurrent situations. However, we don't have
* any way to guarantee the view has a unique key before opening the IMMV
* at the maintenance time because users may drop the unique index.
*/
if (type == TRIGGER_TYPE_DELETE || type == TRIGGER_TYPE_UPDATE)
ex_lock = true;
ivm_trigger->funcname =
(timing == TRIGGER_TYPE_BEFORE ? SystemFuncName("ivm_immediate_before") : SystemFuncName("ivm_immediate_maintenance"));
ivm_trigger->columns = NIL;
ivm_trigger->transitionRels = transitionRels;
ivm_trigger->whenClause = NULL;
ivm_trigger->isconstraint = false;
ivm_trigger->deferrable = false;
ivm_trigger->initdeferred = false;
ivm_trigger->constrrel = NULL;
ivm_trigger->args = list_make2(
makeString(DatumGetPointer(DirectFunctionCall1(oidout, ObjectIdGetDatum(viewOid)))),
makeString(DatumGetPointer(DirectFunctionCall1(boolout, BoolGetDatum(ex_lock))))
);
address = CreateTrigger(ivm_trigger, NULL, relOid, InvalidOid, InvalidOid,
InvalidOid, InvalidOid, InvalidOid, NULL, false, false);
recordDependencyOn(&address, &refaddr, DEPENDENCY_AUTO);
if (Gp_role == GP_ROLE_DISPATCH && ENABLE_DISPATCH())
{
CdbDispatchUtilityStatement((Node *) ivm_trigger,
DF_CANCEL_ON_ERROR|
DF_WITH_SNAPSHOT|
DF_NEED_TWO_PHASE,
GetAssignedOidsForDispatch(),
NULL);
}
/* Make changes-so-far visible */
CommandCounterIncrement();
}
/*
* check_ivm_restriction --- look for specify nodes in the query tree
*/
static void
check_ivm_restriction(Node *node)
{
check_ivm_restriction_context context = {false};
check_ivm_restriction_walker(node, &context);
}
static bool
check_ivm_restriction_walker(Node *node, check_ivm_restriction_context *context)
{
if (node == NULL)
return false;
/*
* We currently don't support Sub-Query.
*/
if (IsA(node, SubPlan) || IsA(node, SubLink))
ereport(ERROR,
(errcode(ERRCODE_FEATURE_NOT_SUPPORTED),
errmsg("subquery is not supported on incrementally maintainable materialized view")));
/* This can recurse, so check for excessive recursion */
check_stack_depth();
switch (nodeTag(node))
{
case T_Query:
{
Query *qry = (Query *)node;
ListCell *lc;
List *vars;
/* if contained CTE, return error */
if (qry->cteList != NIL)
ereport(ERROR,
(errcode(ERRCODE_FEATURE_NOT_SUPPORTED),
errmsg("CTE is not supported on incrementally maintainable materialized view")));
if (qry->groupClause != NIL && !qry->hasAggs)
ereport(ERROR,
(errcode(ERRCODE_FEATURE_NOT_SUPPORTED),
errmsg("GROUP BY clause without aggregate is not supported on incrementally maintainable materialized view")));
if (qry->havingQual != NULL)
ereport(ERROR,
(errcode(ERRCODE_FEATURE_NOT_SUPPORTED),
errmsg("HAVING clause is not supported on incrementally maintainable materialized view")));
if (qry->sortClause != NIL) /* There is a possibility that we don't need to return an error */
ereport(ERROR,
(errcode(ERRCODE_FEATURE_NOT_SUPPORTED),
errmsg("ORDER BY clause is not supported on incrementally maintainable materialized view")));
if (qry->limitOffset != NULL || qry->limitCount != NULL)
ereport(ERROR,
(errcode(ERRCODE_FEATURE_NOT_SUPPORTED),
errmsg("LIMIT/OFFSET clause is not supported on incrementally maintainable materialized view")));
if (qry->hasDistinctOn)
ereport(ERROR,
(errcode(ERRCODE_FEATURE_NOT_SUPPORTED),
errmsg("DISTINCT ON is not supported on incrementally maintainable materialized view")));
if (qry->hasWindowFuncs)
ereport(ERROR,
(errcode(ERRCODE_FEATURE_NOT_SUPPORTED),
errmsg("window functions are not supported on incrementally maintainable materialized view")));
if (qry->groupingSets != NIL)
ereport(ERROR,
(errcode(ERRCODE_FEATURE_NOT_SUPPORTED),
errmsg("GROUPING SETS, ROLLUP, or CUBE clauses is not supported on incrementally maintainable materialized view")));
if (qry->setOperations != NULL)
ereport(ERROR,
(errcode(ERRCODE_FEATURE_NOT_SUPPORTED),
errmsg("UNION/INTERSECT/EXCEPT statements are not supported on incrementally maintainable materialized view")));
if (list_length(qry->targetList) == 0)
ereport(ERROR,
(errcode(ERRCODE_FEATURE_NOT_SUPPORTED),
errmsg("empty target list is not supported on incrementally maintainable materialized view")));
if (qry->rowMarks != NIL)
ereport(ERROR,
(errcode(ERRCODE_FEATURE_NOT_SUPPORTED),
errmsg("FOR UPDATE/SHARE clause is not supported on incrementally maintainable materialized view")));
/* system column restrictions */
vars = pull_vars_of_level((Node *) qry, 0);
foreach(lc, vars)
{
if (IsA(lfirst(lc), Var))
{
Var *var = (Var *) lfirst(lc);
/* if system column, return error */
if (var->varattno < 0)
ereport(ERROR,
(errcode(ERRCODE_FEATURE_NOT_SUPPORTED),
errmsg("system column is not supported on incrementally maintainable materialized view")));
}
}
context->has_agg |= qry->hasAggs;
/* restrictions for rtable */
foreach(lc, qry->rtable)
{
RangeTblEntry *rte = (RangeTblEntry *) lfirst(lc);
if (rte->subquery)
ereport(ERROR,
(errcode(ERRCODE_FEATURE_NOT_SUPPORTED),
errmsg("subquery is not supported on incrementally maintainable materialized view")));
if (rte->tablesample != NULL)
ereport(ERROR,
(errcode(ERRCODE_FEATURE_NOT_SUPPORTED),
errmsg("TABLESAMPLE clause is not supported on incrementally maintainable materialized view")));
if (rte->relkind == RELKIND_PARTITIONED_TABLE)
ereport(ERROR,
(errcode(ERRCODE_FEATURE_NOT_SUPPORTED),
errmsg("partitioned table is not supported on incrementally maintainable materialized view")));
if (rte->relkind == RELKIND_RELATION && has_superclass(rte->relid))
ereport(ERROR,
(errcode(ERRCODE_FEATURE_NOT_SUPPORTED),
errmsg("partitions is not supported on incrementally maintainable materialized view")));
if (rte->relkind == RELKIND_RELATION && find_inheritance_children(rte->relid, NoLock) != NIL)
ereport(ERROR,
(errcode(ERRCODE_FEATURE_NOT_SUPPORTED),
errmsg("inheritance parent is not supported on incrementally maintainable materialized view")));
if (rte->relkind == RELKIND_FOREIGN_TABLE)
ereport(ERROR,
(errcode(ERRCODE_FEATURE_NOT_SUPPORTED),
errmsg("foreign table is not supported on incrementally maintainable materialized view")));
if (rte->relkind == RELKIND_VIEW ||
rte->relkind == RELKIND_MATVIEW)
ereport(ERROR,
(errcode(ERRCODE_FEATURE_NOT_SUPPORTED),
errmsg("VIEW or MATERIALIZED VIEW is not supported on incrementally maintainable materialized view")));
if (rte->rtekind == RTE_VALUES)
ereport(ERROR,
(errcode(ERRCODE_FEATURE_NOT_SUPPORTED),
errmsg("VALUES is not supported on incrementally maintainable materialized view")));
if (rte->relid != InvalidOid)
{
Relation rel = table_open(rte->relid, NoLock);
if (RelationIsAppendOptimized(rel))
ereport(ERROR,
(errcode(ERRCODE_FEATURE_NOT_SUPPORTED),
errmsg("append-optimized table is not supported on incrementally maintainable materialized view")));
table_close(rel, NoLock);
}
}
query_tree_walker(qry, check_ivm_restriction_walker, (void *) context, QTW_IGNORE_RANGE_TABLE);
break;
}
case T_TargetEntry:
{
TargetEntry *tle = (TargetEntry *)node;
if (isIvmName(tle->resname))
ereport(ERROR,
(errcode(ERRCODE_FEATURE_NOT_SUPPORTED),
errmsg("column name %s is not supported on incrementally maintainable materialized view", tle->resname)));
if (context->has_agg && !IsA(tle->expr, Aggref) && contain_aggs_of_level((Node *) tle->expr, 0))
ereport(ERROR,
(errcode(ERRCODE_FEATURE_NOT_SUPPORTED),
errmsg("expression containing an aggregate in it is not supported on incrementally maintainable materialized view")));
expression_tree_walker(node, check_ivm_restriction_walker, (void *) context);
break;
}
case T_JoinExpr:
{
JoinExpr *joinexpr = (JoinExpr *)node;
if (joinexpr->jointype > JOIN_INNER)
ereport(ERROR,
(errcode(ERRCODE_FEATURE_NOT_SUPPORTED),
errmsg("OUTER JOIN is not supported on incrementally maintainable materialized view")));
expression_tree_walker(node, check_ivm_restriction_walker, (void *) context);
break;
}
case T_Aggref:
{
/* Check if this supports IVM */
Aggref *aggref = (Aggref *) node;
const char *aggname = format_procedure(aggref->aggfnoid);
if (aggref->aggfilter != NULL)
ereport(ERROR,
(errcode(ERRCODE_FEATURE_NOT_SUPPORTED),
errmsg("aggregate function with FILTER clause is not supported on incrementally maintainable materialized view")));
if (aggref->aggdistinct != NULL)
ereport(ERROR,
(errcode(ERRCODE_FEATURE_NOT_SUPPORTED),
errmsg("aggregate function with DISTINCT arguments is not supported on incrementally maintainable materialized view")));
if (aggref->aggorder != NULL)
ereport(ERROR,
(errcode(ERRCODE_FEATURE_NOT_SUPPORTED),
errmsg("aggregate function with ORDER clause is not supported on incrementally maintainable materialized view")));
if (!check_aggregate_supports_ivm(aggref->aggfnoid))
ereport(ERROR,
(errcode(ERRCODE_FEATURE_NOT_SUPPORTED),
errmsg("aggregate function %s is not supported on incrementally maintainable materialized view", aggname)));
break;
}
default:
expression_tree_walker(node, check_ivm_restriction_walker, (void *) context);
break;
}
return false;
}
/*
* check_aggregate_supports_ivm
*
* Check if the given aggregate function is supporting IVM
*/
static bool
check_aggregate_supports_ivm(Oid aggfnoid)
{
switch (aggfnoid)
{
/* count */
case F_COUNT_ANY:
case F_COUNT_:
/* sum */
case F_SUM_INT8:
case F_SUM_INT4:
case F_SUM_INT2:
case F_SUM_FLOAT4:
case F_SUM_FLOAT8:
case F_SUM_MONEY:
case F_SUM_INTERVAL:
case F_SUM_NUMERIC:
/* avg */
case F_AVG_INT8:
case F_AVG_INT4:
case F_AVG_INT2:
case F_AVG_NUMERIC:
case F_AVG_FLOAT4:
case F_AVG_FLOAT8:
case F_AVG_INTERVAL:
return true;
default:
return false;
}
}
/*
* CreateIndexOnIMMV
*
* Create a unique index on incremental maintainable materialized view.
* If the view definition query has a GROUP BY clause, the index is created
* on the columns of GROUP BY expressions. Otherwise, if the view contains
* all primary key attritubes of its base tables in the target list, the index
* is created on these attritubes. In other cases, no index is created.
*/
void
CreateIndexOnIMMV(Query *query, Relation matviewRel)
{
ListCell *lc;
IndexStmt *index;
ObjectAddress address;
List *constraintList = NIL;
char idxname[NAMEDATALEN];
List *indexoidlist = RelationGetIndexList(matviewRel);
ListCell *indexoidscan;
snprintf(idxname, sizeof(idxname), "%s_index", RelationGetRelationName(matviewRel));
index = makeNode(IndexStmt);
index->unique = true;
index->primary = false;
index->isconstraint = false;
index->deferrable = false;
index->initdeferred = false;
index->idxname = idxname;
index->relation =
makeRangeVar(get_namespace_name(RelationGetNamespace(matviewRel)),
pstrdup(RelationGetRelationName(matviewRel)),
-1);
index->accessMethod = DEFAULT_INDEX_TYPE;
index->options = NIL;
index->tableSpace = get_tablespace_name(matviewRel->rd_rel->reltablespace);
index->whereClause = NULL;
index->indexParams = NIL;
index->indexIncludingParams = NIL;
index->excludeOpNames = NIL;
index->idxcomment = NULL;
index->indexOid = InvalidOid;
index->oldNode = InvalidOid;
index->oldCreateSubid = InvalidSubTransactionId;
index->oldFirstRelfilenodeSubid = InvalidSubTransactionId;
index->transformed = true;
index->concurrent = false;
index->if_not_exists = false;
if (query->groupClause)
{
/* create unique constraint on GROUP BY expression columns */
foreach(lc, query->groupClause)
{
SortGroupClause *scl = (SortGroupClause *) lfirst(lc);
TargetEntry *tle = get_sortgroupclause_tle(scl, query->targetList);
Form_pg_attribute attr = TupleDescAttr(matviewRel->rd_att, tle->resno - 1);
IndexElem *iparam;
iparam = makeNode(IndexElem);
iparam->name = pstrdup(NameStr(attr->attname));
iparam->expr = NULL;
iparam->indexcolname = NULL;
iparam->collation = NIL;
iparam->opclass = NIL;
iparam->opclassopts = NIL;
iparam->ordering = SORTBY_DEFAULT;
iparam->nulls_ordering = SORTBY_NULLS_DEFAULT;
index->indexParams = lappend(index->indexParams, iparam);
}
}
else if (query->distinctClause)
{
/* create unique constraint on all columns */
foreach(lc, query->targetList)
{
TargetEntry *tle = (TargetEntry *) lfirst(lc);
Form_pg_attribute attr = TupleDescAttr(matviewRel->rd_att, tle->resno - 1);
IndexElem *iparam;
iparam = makeNode(IndexElem);
iparam->name = pstrdup(NameStr(attr->attname));
iparam->expr = NULL;
iparam->indexcolname = NULL;
iparam->collation = NIL;
iparam->opclass = NIL;
iparam->opclassopts = NIL;
iparam->ordering = SORTBY_DEFAULT;
iparam->nulls_ordering = SORTBY_NULLS_DEFAULT;
index->indexParams = lappend(index->indexParams, iparam);
}
}
else
{
Bitmapset *key_attnos;
/* create index on the base tables' primary key columns */
key_attnos = get_primary_key_attnos_from_query(query, &constraintList);
if (key_attnos)
{
foreach(lc, query->targetList)
{
TargetEntry *tle = (TargetEntry *) lfirst(lc);
Form_pg_attribute attr = TupleDescAttr(matviewRel->rd_att, tle->resno - 1);
if (bms_is_member(tle->resno - FirstLowInvalidHeapAttributeNumber, key_attnos))
{
IndexElem *iparam;
iparam = makeNode(IndexElem);
iparam->name = pstrdup(NameStr(attr->attname));
iparam->expr = NULL;
iparam->indexcolname = NULL;
iparam->collation = NIL;
iparam->opclass = NIL;
iparam->opclassopts = NIL;
iparam->ordering = SORTBY_DEFAULT;
iparam->nulls_ordering = SORTBY_NULLS_DEFAULT;
index->indexParams = lappend(index->indexParams, iparam);
}
}
}
else
{
/* create no index, just notice that an appropriate index is necessary for efficient IVM */
ereport(NOTICE,
(errmsg("could not create an index on materialized view \"%s\" automatically",
RelationGetRelationName(matviewRel)),
errdetail("This target list does not have all the primary key columns, "
"or this view does not contain GROUP BY or DISTINCT clause."),
errhint("Create an index on the materialized view for efficient incremental maintenance.")));
return;
}
}
/* If we have a compatible index, we don't need to create another. */
foreach(indexoidscan, indexoidlist)
{
Oid indexoid = lfirst_oid(indexoidscan);
Relation indexRel;
bool hasCompatibleIndex = false;
indexRel = index_open(indexoid, AccessShareLock);
if (CheckIndexCompatible(indexRel->rd_id,
index->accessMethod,
index->indexParams,
index->excludeOpNames))
hasCompatibleIndex = true;
index_close(indexRel, AccessShareLock);
if (hasCompatibleIndex)
return;
}
address = DefineIndex(RelationGetRelid(matviewRel),
index,
InvalidOid,
InvalidOid,
InvalidOid,
false, true, false, false, true);
ereport(NOTICE,
(errmsg("created index \"%s\" on materialized view \"%s\"",
idxname, RelationGetRelationName(matviewRel))));
/*
* Make dependencies so that the index is dropped if any base tables's
* primary key is dropped.
*/
foreach(lc, constraintList)
{
Oid constraintOid = lfirst_oid(lc);
ObjectAddress refaddr;
refaddr.classId = ConstraintRelationId;
refaddr.objectId = constraintOid;
refaddr.objectSubId = 0;
recordDependencyOn(&address, &refaddr, DEPENDENCY_NORMAL);
}
}
/*
* get_primary_key_attnos_from_query
*
* Identify the columns in base tables' primary keys in the target list.
*
* Returns a Bitmapset of the column attnos of the primary key's columns of
* tables that used in the query. The attnos are offset by
* FirstLowInvalidHeapAttributeNumber as same as get_primary_key_attnos.
*
* If any table has no primary key or any primary key's columns is not in
* the target list, return NULL. We also return NULL if any pkey constraint
* is deferrable.
*
* constraintList is set to a list of the OIDs of the pkey constraints.
*/
static Bitmapset *
get_primary_key_attnos_from_query(Query *query, List **constraintList)
{
List *key_attnos_list = NIL;
ListCell *lc;
int i;
Bitmapset *keys = NULL;
Relids rels_in_from;
/*
* Collect primary key attributes from all tables used in query. The key attributes
* sets for each table are stored in key_attnos_list in order by RTE index.
*/
foreach(lc, query->rtable)
{
RangeTblEntry *r = (RangeTblEntry*) lfirst(lc);
Bitmapset *key_attnos;
bool has_pkey = true;
/* for tables, call get_primary_key_attnos */
if (r->rtekind == RTE_RELATION)
{
Oid constraintOid;
key_attnos = get_primary_key_attnos(r->relid, false, &constraintOid);
*constraintList = lappend_oid(*constraintList, constraintOid);
has_pkey = (key_attnos != NULL);
}
/* for other RTEs, store NULL into key_attnos_list */
else
key_attnos = NULL;
/*
* If any table or subquery has no primary key or its pkey constraint is deferrable,
* we cannot get key attributes for this query, so return NULL.
*/
if (!has_pkey)
return NULL;
key_attnos_list = lappend(key_attnos_list, key_attnos);
}
/* Collect key attributes appearing in the target list */
i = 1;
foreach(lc, query->targetList)
{
TargetEntry *tle = (TargetEntry *) flatten_join_alias_vars(query, lfirst(lc));
if (IsA(tle->expr, Var))
{
Var *var = (Var*) tle->expr;
Bitmapset *key_attnos = list_nth(key_attnos_list, var->varno - 1);
/* check if this attribute is from a base table's primary key */
if (bms_is_member(var->varattno - FirstLowInvalidHeapAttributeNumber, key_attnos))
{
/*
* Remove found key attributes from key_attnos_list, and add this
* to the result list.
*/
key_attnos = bms_del_member(key_attnos, var->varattno - FirstLowInvalidHeapAttributeNumber);
if (bms_is_empty(key_attnos))
{
key_attnos_list = list_delete_nth_cell(key_attnos_list, var->varno - 1);
key_attnos_list = list_insert_nth(key_attnos_list, var->varno - 1, NULL);
}
keys = bms_add_member(keys, i - FirstLowInvalidHeapAttributeNumber);
}
}
i++;
}
/* Collect RTE indexes of relations appearing in the FROM clause */
rels_in_from = get_relids_in_jointree((Node *) query->jointree, false);
/*
* Check if all key attributes of relations in FROM are appearing in the target
* list. If an attribute remains in key_attnos_list in spite of the table is used
* in FROM clause, the target is missing this key attribute, so we return NULL.
*/
i = 1;
foreach(lc, key_attnos_list)
{
Bitmapset *bms = (Bitmapset *)lfirst(lc);
if (!bms_is_empty(bms) && bms_is_member(i, rels_in_from))
return NULL;
i++;
}
return keys;
}
/*
* Create auto-refresh task for Dynamic Tables.
*/
static void
create_dynamic_table_auto_refresh_task(ParseState *pstate, Relation DynamicTableRel, char *schedule)
{
ObjectAddress refaddr;
ObjectAddress address;
StringInfoData buf;
char *dtname = NULL;
if (schedule == NULL)
schedule = DYNAMIC_TABLE_DEFAULT_REFRESH_INTERVAL;
/* Create auto refresh task. */
CreateTaskStmt *task_stmt = makeNode(CreateTaskStmt);
initStringInfo(&buf);
appendStringInfo(&buf, "gp_dynamic_table_refresh_%u", RelationGetRelid(DynamicTableRel));
task_stmt->taskname = pstrdup(buf.data);
task_stmt->schedule = pstrdup(schedule);
task_stmt->if_not_exists = false; /* report error if failed. */
dtname = quote_qualified_identifier(get_namespace_name(RelationGetNamespace(DynamicTableRel)),
RelationGetRelationName(DynamicTableRel));
resetStringInfo(&buf);
appendStringInfo(&buf, "REFRESH DYNAMIC TABLE %s", dtname);
task_stmt->sql = pstrdup(buf.data);
bool saved_allowSystemTableMods = allowSystemTableMods;
allowSystemTableMods = true;
address = DefineTask(pstate, task_stmt);
allowSystemTableMods = saved_allowSystemTableMods;
refaddr.classId = RelationRelationId;
refaddr.objectId = RelationGetRelid(DynamicTableRel);
refaddr.objectSubId = 0;
recordDependencyOn(&address, &refaddr, DEPENDENCY_INTERNAL);
}