blob: ea3408654c2bcbfd4a8b52439f77df7f4abefedb [file]
/*-------------------------------------------------------------------------
*
* matview.c
* materialized view support
*
* Portions Copyright (c) 1996-2021, PostgreSQL Global Development Group
* Portions Copyright (c) 1994, Regents of the University of California
*
*
* IDENTIFICATION
* src/backend/commands/matview.c
*
*-------------------------------------------------------------------------
*/
#include "postgres.h"
#include "access/amapi.h"
#include "access/genam.h"
#include "access/heapam.h"
#include "access/htup_details.h"
#include "access/multixact.h"
#include "access/tableam.h"
#include "access/xact.h"
#include "access/xlog.h"
#include "catalog/catalog.h"
#include "catalog/gp_matview_aux.h"
#include "catalog/indexing.h"
#include "catalog/namespace.h"
#include "catalog/oid_dispatch.h"
#include "catalog/pg_am.h"
#include "catalog/pg_depend.h"
#include "catalog/pg_trigger.h"
#include "catalog/pg_opclass.h"
#include "catalog/pg_operator.h"
#include "cdb/cdbaocsam.h"
#include "cdb/cdbappendonlyam.h"
#include "cdb/cdbvars.h"
#include "commands/cluster.h"
#include "commands/createas.h"
#include "commands/defrem.h"
#include "commands/matview.h"
#include "commands/tablecmds.h"
#include "commands/tablespace.h"
#include "common/hashfn.h"
#include "executor/executor.h"
#include "executor/nodeShareInputScan.h"
#include "executor/spi.h"
#include "executor/tstoreReceiver.h"
#include "miscadmin.h"
#include "nodes/makefuncs.h"
#include "optimizer/optimizer.h"
#include "parser/analyze.h"
#include "parser/parse_clause.h"
#include "parser/parse_func.h"
#include "parser/parse_relation.h"
#include "parser/parsetree.h"
#include "pgstat.h"
#include "rewrite/rewriteHandler.h"
#include "rewrite/rowsecurity.h"
#include "storage/proc.h"
#include "storage/lmgr.h"
#include "storage/smgr.h"
#include "tcop/tcopprot.h"
#include "utils/builtins.h"
#include "utils/lsyscache.h"
#include "utils/rel.h"
#include "utils/snapmgr.h"
#include "utils/syscache.h"
#include "utils/typcache.h"
typedef struct
{
DestReceiver pub; /* publicly-known function pointers */
Oid transientoid; /* OID of new heap into which to store */
Oid oldreloid;
bool concurrent;
bool skipData;
char relpersistence;
/* These fields are filled by transientrel_startup: */
Relation transientrel; /* relation to write to */
CommandId output_cid; /* cmin to insert in output tuples */
int ti_options; /* table_tuple_insert performance options */
BulkInsertState bistate; /* bulk insert state */
uint64 processed; /* GPDB: number of tuples inserted */
} DR_transientrel;
#define MV_INIT_QUERYHASHSIZE 32
#define MV_INIT_SNAPSHOTHASHSIZE (2 * MaxBackends)
#define SNAPSHOT_KEYSIZE 128
/*
* MV_TriggerHashEntry
*
* Hash entry for base tables on which IVM trigger is invoked
*/
typedef struct MV_TriggerHashEntry
{
Oid matview_id; /* OID of the materialized view */
int before_trig_count; /* count of before triggers invoked */
int after_trig_count; /* count of after triggers invoked */
int pid; /* for debug */
int reference; /* reference count */
Snapshot snapshot; /* Snapshot just before table change */
char *snapname; /* Snapshot name for lookup */
List *tables; /* List of MV_TriggerTable */
bool has_old; /* tuples are deleted from any table? */
bool has_new; /* tuples are inserted into any table? */
MemoryContext context; /* The session-scoped memory context. */
ResourceOwner resowner; /* The session-scoped resource owner. */
} MV_TriggerHashEntry;
/* SnapshotDumpEntry to hold information about a snapshot dump entry */
typedef struct SnapshotDumpEntry
{
char snapname[SNAPSHOT_KEYSIZE]; /* Name of the snapshot */
Oid matview_id; /* OID of the materialized view */
int pid; /* Process ID of creater */
dsm_handle handle; /* Handle to the DSM segment */
dsm_segment *segment; /* Pointer to the DSM segment */
} SnapshotDumpEntry;
/*
* MV_TriggerTable
*
* IVM related data for tables on which the trigger is invoked.
*/
typedef struct MV_TriggerTable
{
Oid table_id; /* OID of the modified table */
List *old_tuplestores; /* tuplestores for deleted tuples */
List *new_tuplestores; /* tuplestores for inserted tuples */
List *rte_indexes; /* List of RTE index of the modified table */
RangeTblEntry *original_rte; /* the original RTE saved before rewriting query */
Relation rel; /* relation of the modified table */
TupleTableSlot *slot; /* for checking visibility in the pre-state table */
} MV_TriggerTable;
static HTAB *mv_trigger_info = NULL;
static HTAB *mv_trigger_snapshot = NULL;
/* kind of IVM operation for the view */
typedef enum
{
IVM_ADD,
IVM_SUB
} IvmOp;
/* ENR name for materialized view delta */
#define NEW_DELTA_ENRNAME "new_delta"
#define OLD_DELTA_ENRNAME "old_delta"
static int matview_maintenance_depth = 0;
static RefreshClause* MakeRefreshClause(bool concurrent, bool skipData, RangeVar *relation);
static IntoClause* makeIvmIntoClause(const char *enrname, Relation matviewRel);
static void transientrel_startup(DestReceiver *self, int operation, TupleDesc typeinfo);
static bool transientrel_receive(TupleTableSlot *slot, DestReceiver *self);
static void transientrel_shutdown(DestReceiver *self);
static void transientrel_destroy(DestReceiver *self);
static uint64 refresh_matview_datafill(DestReceiver *dest, Query *query,
const char *queryString, RefreshClause *refreshClause);
static uint64 refresh_matview_memoryfill(DestReceiver *dest,Query *query,
QueryEnvironment *queryEnv,
TupleDesc *resultTupleDesc,
const char *queryString, Relation matviewRel);
static char *make_temptable_name_n(char *tempname, int n);
static void refresh_by_match_merge(Oid matviewOid, Oid tempOid, Oid relowner,
int save_sec_context);
static void refresh_by_heap_swap(Oid matviewOid, Oid OIDNewHeap, char relpersistence);
static bool is_usable_unique_index(Relation indexRel);
static void OpenMatViewIncrementalMaintenance(void);
static void CloseMatViewIncrementalMaintenance(void);
/* CBDB: move to matview.h */
#if 0
static Query *get_matview_query(Relation matviewRel);
#endif
static Query *rewrite_query_for_preupdate_state(Query *query, List *tables,
ParseState *pstate, Oid matviewid);
static void register_delta_ENRs(ParseState *pstate, Query *query, List *tables);
static RangeTblEntry *get_prestate_rte(RangeTblEntry *rte, MV_TriggerTable *table,
QueryEnvironment *queryEnv, Oid matviewid);
static RangeTblEntry *replace_rte_with_delta(RangeTblEntry *rte, MV_TriggerTable *table, bool is_new,
QueryEnvironment *queryEnv);
static Query *rewrite_query_for_counting_and_aggregates(Query *query, ParseState *pstate);
static char* calc_delta_old(Tuplestorestate *ts,Relation matviewRel, MV_TriggerTable *table, int rte_index, Query *query,
DestReceiver *dest_old,
TupleDesc *tupdesc_old,
QueryEnvironment *queryEnv);
static char* calc_delta_new(Tuplestorestate *ts, Relation matviewRel, MV_TriggerTable *table, int rte_index, Query *query,
DestReceiver *dest_new,
TupleDesc *tupdesc_new,
QueryEnvironment *queryEnv);
static Query *rewrite_query_for_postupdate_state(Query *query, MV_TriggerTable *table, int rte_index);
static void apply_delta(char *old_enr, char *new_enr, MV_TriggerTable *table, Oid matviewOid, Tuplestorestate *old_tuplestores, Tuplestorestate *new_tuplestores,
TupleDesc tupdesc_old, TupleDesc tupdesc_new,
Query *query, bool use_count, char *count_colname);
static void append_set_clause_for_count(const char *resname, StringInfo buf_old,
StringInfo buf_new,StringInfo aggs_list);
static void append_set_clause_for_sum(const char *resname, StringInfo buf_old,
StringInfo buf_new, StringInfo aggs_list);
static void append_set_clause_for_avg(const char *resname, StringInfo buf_old,
StringInfo buf_new, StringInfo aggs_list,
const char *aggtype);
static char *get_operation_string(IvmOp op, const char *col, const char *arg1, const char *arg2,
const char* count_col, const char *castType);
static char *get_null_condition_string(IvmOp op, const char *arg1, const char *arg2,
const char* count_col);
static void apply_old_delta(const char *matviewname, const char *deltaname_old,
List *keys);
static void apply_old_delta_with_count(const char *matviewname, Oid matviewRelid, const char *deltaname_old,
List *keys, StringInfo aggs_list, StringInfo aggs_set,
const char *count_colname);
static void apply_new_delta(const char *matviewname, const char *deltaname_new,
StringInfo target_list);
static void apply_new_delta_with_count(const char *matviewname, const char* deltaname_new,
List *keys, StringInfo target_list, StringInfo aggs_set,
const char* count_colname);
static char *get_matching_condition_string(List *keys);
static void generate_equal(StringInfo querybuf, Oid opttype,
const char *leftop, const char *rightop);
static void clean_up_IVM_hash_entry(MV_TriggerHashEntry *entry, bool is_abort);
static void clean_up_ivm_dsm_entry(MV_TriggerHashEntry *entry);
static void apply_cleanup(Oid matview_id);
static void ivm_export_snapshot(Oid matview_id, char *snapname);
static Snapshot ivm_import_snapshot(const char *idstr);
static void ExecuteTruncateGuts_IVM(Relation matviewRel, Oid matviewOid, Query *query);
static void ivm_set_ts_persitent_name(TriggerData *trigdata, Oid relid, Oid mvid);
/*
* SetMatViewPopulatedState
* Mark a materialized view as populated, or not.
*
* NOTE: caller must be holding an appropriate lock on the relation.
*/
void
SetMatViewPopulatedState(Relation relation, bool newstate)
{
Relation pgrel;
HeapTuple tuple;
Assert(relation->rd_rel->relkind == RELKIND_MATVIEW);
/*
* Update relation's pg_class entry. Crucial side-effect: other backends
* (and this one too!) are sent SI message to make them rebuild relcache
* entries.
*/
pgrel = table_open(RelationRelationId, RowExclusiveLock);
tuple = SearchSysCacheCopy1(RELOID,
ObjectIdGetDatum(RelationGetRelid(relation)));
if (!HeapTupleIsValid(tuple))
elog(ERROR, "cache lookup failed for relation %u",
RelationGetRelid(relation));
((Form_pg_class) GETSTRUCT(tuple))->relispopulated = newstate;
CatalogTupleUpdate(pgrel, &tuple->t_self, tuple);
heap_freetuple(tuple);
table_close(pgrel, RowExclusiveLock);
/*
* Advance command counter to make the updated pg_class row locally
* visible.
*/
CommandCounterIncrement();
}
static RefreshClause*
MakeRefreshClause(bool concurrent, bool skipData, RangeVar *relation)
{
RefreshClause *refreshClause;
refreshClause = makeNode(RefreshClause);
refreshClause->concurrent = concurrent;
refreshClause->skipData = skipData;
refreshClause->relation = relation;
return refreshClause;
}
/*
* SetDynamicTableState
* Mark a materialized view as Dynamic Table, or not.
*
* NOTE: caller must be holding an appropriate lock on the relation.
*/
void
SetDynamicTableState(Relation relation)
{
Relation pgrel;
HeapTuple tuple;
Assert(relation->rd_rel->relkind == RELKIND_MATVIEW);
/*
* Update relation's pg_class entry. Crucial side-effect: other backends
* (and this one too!) are sent SI message to make them rebuild relcache
* entries.
*/
pgrel = table_open(RelationRelationId, RowExclusiveLock);
tuple = SearchSysCacheCopy1(RELOID,
ObjectIdGetDatum(RelationGetRelid(relation)));
if (!HeapTupleIsValid(tuple))
elog(ERROR, "cache lookup failed for relation %u",
RelationGetRelid(relation));
((Form_pg_class) GETSTRUCT(tuple))->relisdynamic = true;
CatalogTupleUpdate(pgrel, &tuple->t_self, tuple);
heap_freetuple(tuple);
table_close(pgrel, RowExclusiveLock);
/*
* Advance command counter to make the updated pg_class row locally
* visible.
*/
CommandCounterIncrement();
}
/*
* SetMatViewIVMState
* Mark a materialized view as IVM, or not.
*
* NOTE: caller must be holding an appropriate lock on the relation.
*/
void
SetMatViewIVMState(Relation relation, bool newstate)
{
Relation pgrel;
HeapTuple tuple;
Assert(relation->rd_rel->relkind == RELKIND_MATVIEW);
/*
* Update relation's pg_class entry. Crucial side-effect: other backends
* (and this one too!) are sent SI message to make them rebuild relcache
* entries.
*/
pgrel = table_open(RelationRelationId, RowExclusiveLock);
tuple = SearchSysCacheCopy1(RELOID,
ObjectIdGetDatum(RelationGetRelid(relation)));
if (!HeapTupleIsValid(tuple))
elog(ERROR, "cache lookup failed for relation %u",
RelationGetRelid(relation));
((Form_pg_class) GETSTRUCT(tuple))->relisivm = newstate;
CatalogTupleUpdate(pgrel, &tuple->t_self, tuple);
heap_freetuple(tuple);
table_close(pgrel, RowExclusiveLock);
/*
* Advance command counter to make the updated pg_class row locally
* visible.
*/
CommandCounterIncrement();
}
/*
* ExecRefreshMatView -- execute a REFRESH MATERIALIZED VIEW command
*
* This refreshes the materialized view by creating a new table and swapping
* the relfilenodes of the new table and the old materialized view, so the OID
* of the original materialized view is preserved. Thus we do not lose GRANT
* nor references to this materialized view.
*
* If WITH NO DATA was specified, this is effectively like a TRUNCATE;
* otherwise it is like a TRUNCATE followed by an INSERT using the SELECT
* statement associated with the materialized view. The statement node's
* skipData field shows whether the clause was used.
*
* Indexes are rebuilt too, via REINDEX. Since we are effectively bulk-loading
* the new heap, it's better to create the indexes afterwards than to fill them
* incrementally while we load.
*
* The matview's "populated" state is changed based on whether the contents
* reflect the result set of the materialized view's query.
*/
ObjectAddress
ExecRefreshMatView(RefreshMatViewStmt *stmt, const char *queryString,
ParamListInfo params, QueryCompletion *qc)
{
Oid matviewOid;
Relation matviewRel;
Query *dataQuery;
Query *viewQuery;
Oid tableSpace;
Oid relowner;
Oid OIDNewHeap;
DestReceiver *dest;
uint64 processed = 0;
bool concurrent;
LOCKMODE lockmode;
char relpersistence;
Oid save_userid;
int save_sec_context;
int save_nestlevel;
ObjectAddress address;
RefreshClause *refreshClause;
bool oldPopulated;
bool ao_has_index;
/* MATERIALIZED_VIEW_FIXME: Refresh MatView is not MPP-fied. */
/* Determine strength of lock needed. */
concurrent = stmt->concurrent;
lockmode = concurrent ? ExclusiveLock : AccessExclusiveLock;
/*
* Get a lock until end of transaction.
*/
matviewOid = RangeVarGetRelidExtended(stmt->relation,
lockmode, 0,
RangeVarCallbackOwnsTable, NULL);
matviewRel = table_open(matviewOid, NoLock);
relowner = matviewRel->rd_rel->relowner;
/*
* Switch to the owner's userid, so that any functions are run as that
* user. Also lock down security-restricted operations and arrange to
* make GUC variable changes local to this command.
*/
GetUserIdAndSecContext(&save_userid, &save_sec_context);
SetUserIdAndSecContext(relowner,
save_sec_context | SECURITY_RESTRICTED_OPERATION);
save_nestlevel = NewGUCNestLevel();
oldPopulated = RelationIsPopulated(matviewRel);
/* Make sure it is a materialized view. */
if (matviewRel->rd_rel->relkind != RELKIND_MATVIEW)
ereport(ERROR,
(errcode(ERRCODE_FEATURE_NOT_SUPPORTED),
errmsg("\"%s\" is not a materialized view",
RelationGetRelationName(matviewRel))));
/* Check that CONCURRENTLY is not specified if not populated. */
if (concurrent && !RelationIsPopulated(matviewRel))
ereport(ERROR,
(errcode(ERRCODE_FEATURE_NOT_SUPPORTED),
errmsg("CONCURRENTLY cannot be used when the materialized view is not populated")));
/* Check that conflicting options have not been specified. */
if (concurrent && stmt->skipData)
ereport(ERROR,
(errcode(ERRCODE_SYNTAX_ERROR),
errmsg("%s and %s options cannot be used together",
"CONCURRENTLY", "WITH NO DATA")));
viewQuery = get_matview_query(matviewRel);
/* For IMMV, we need to rewrite matview query */
if (!stmt->skipData && RelationIsIVM(matviewRel))
dataQuery = rewriteQueryForIMMV(viewQuery,NIL);
else
/* viewQuery maybe released in make_new_heap_with_colname. */
dataQuery = copyObject(viewQuery);
/*
* Check that there is a unique index with no WHERE clause on one or more
* columns of the materialized view if CONCURRENTLY is specified.
*/
if (concurrent)
{
List *indexoidlist = RelationGetIndexList(matviewRel);
ListCell *indexoidscan;
bool hasUniqueIndex = false;
foreach(indexoidscan, indexoidlist)
{
Oid indexoid = lfirst_oid(indexoidscan);
Relation indexRel;
indexRel = index_open(indexoid, AccessShareLock);
hasUniqueIndex = is_usable_unique_index(indexRel);
index_close(indexRel, AccessShareLock);
if (hasUniqueIndex)
break;
}
list_free(indexoidlist);
if (!hasUniqueIndex)
ereport(ERROR,
(errcode(ERRCODE_OBJECT_NOT_IN_PREREQUISITE_STATE),
errmsg("cannot refresh materialized view \"%s\" concurrently",
quote_qualified_identifier(get_namespace_name(RelationGetNamespace(matviewRel)),
RelationGetRelationName(matviewRel))),
errhint("Create a unique index with no WHERE clause on one or more columns of the materialized view.")));
}
/*
* Check for active uses of the relation in the current transaction, such
* as open scans.
*
* NB: We count on this to protect us against problems with refreshing the
* data using TABLE_INSERT_FROZEN.
*/
CheckTableNotInUse(matviewRel, "REFRESH MATERIALIZED VIEW");
/*
* Fast path to REFRESH a view:
* avoid doing the real REFRESH if the data of view
* is up to date. The data should be the logically same as after
* REFRESH when there is data changed since latest REFRESH.
* In that case we may save a lot, ex: a cron task REFRESH view periodically
* or manually executed by users each time.
*
* Set this feature default to true, but let uesrs decide if they intend
* to do a real REFRESH.
*/
if (gp_enable_refresh_fast_path &&
!RelationIsIVM(matviewRel) &&
!stmt->skipData &&
MatviewIsUpToDate(matviewOid) &&
!MatviewHasForeignTables(matviewOid))
{
table_close(matviewRel, NoLock);
/* Roll back any GUC changes */
AtEOXact_GUC(false, save_nestlevel);
/* Restore userid and security context */
SetUserIdAndSecContext(save_userid, save_sec_context);
ObjectAddressSet(address, RelationRelationId, matviewOid);
return address;
}
/*
* Tentatively mark the matview as populated or not (this will roll back
* if we fail later).
*/
SetMatViewPopulatedState(matviewRel, !stmt->skipData);
if (IS_QD_OR_SINGLENODE())
{
/*
* Update view info:
* It's actually a TRUNCATE command if skipData is true.
*/
if (stmt->skipData)
SetMatviewAuxStatus(RelationGetRelid(matviewRel), MV_DATA_STATUS_EXPIRED);
else
SetMatviewAuxStatus(RelationGetRelid(matviewRel), MV_DATA_STATUS_UP_TO_DATE);
}
/*
* The stored query was rewritten at the time of the MV definition, but
* has not been scribbled on by the planner.
*
* GPDB: using original query directly may cause dangling pointers if
* shared-inval-queue is overflow, which will cause rebuild the matview
* relation. when rebuilding matview relation(relcache), it is found
* that oldRel->rule(parentStmtType = PARENTSTMTTYPE_REFRESH_MATVIEW)
* is not equal to newRel->rule(parentStmtType = PARENTSTMTTYPE_NONE),
* caused oldRel->rule(dataQuery) to be released
*/
Assert(IsA(dataQuery, Query));
dataQuery->parentStmtType = PARENTSTMTTYPE_REFRESH_MATVIEW;
/* Concurrent refresh builds new data in temp tablespace, and does diff. */
if (concurrent)
{
tableSpace = GetDefaultTablespace(RELPERSISTENCE_TEMP, false);
relpersistence = RELPERSISTENCE_TEMP;
}
else
{
tableSpace = matviewRel->rd_rel->reltablespace;
relpersistence = matviewRel->rd_rel->relpersistence;
}
/* delete IMMV triggers. */
if (RelationIsIVM(matviewRel) && stmt->skipData)
{
Relation tgRel;
Relation depRel;
ScanKeyData key;
SysScanDesc scan;
HeapTuple tup;
ObjectAddresses *immv_triggers;
immv_triggers = new_object_addresses();
tgRel = table_open(TriggerRelationId, RowExclusiveLock);
depRel = table_open(DependRelationId, RowExclusiveLock);
/* search triggers that depends on IMMV. */
ScanKeyInit(&key,
Anum_pg_depend_refobjid,
BTEqualStrategyNumber, F_OIDEQ,
ObjectIdGetDatum(matviewOid));
scan = systable_beginscan(depRel, DependReferenceIndexId, true,
NULL, 1, &key);
while ((tup = systable_getnext(scan)) != NULL)
{
ObjectAddress obj;
Form_pg_depend foundDep = (Form_pg_depend) GETSTRUCT(tup);
if (foundDep->classid == TriggerRelationId)
{
HeapTuple tgtup;
ScanKeyData tgkey[1];
SysScanDesc tgscan;
Form_pg_trigger tgform;
/* Find the trigger name. */
ScanKeyInit(&tgkey[0],
Anum_pg_trigger_oid,
BTEqualStrategyNumber, F_OIDEQ,
ObjectIdGetDatum(foundDep->objid));
tgscan = systable_beginscan(tgRel, TriggerOidIndexId, true,
NULL, 1, tgkey);
tgtup = systable_getnext(tgscan);
if (!HeapTupleIsValid(tgtup))
elog(ERROR, "could not find tuple for immv trigger %u", foundDep->objid);
tgform = (Form_pg_trigger) GETSTRUCT(tgtup);
/* If trigger is created by IMMV, delete it. */
if (strncmp(NameStr(tgform->tgname), "IVM_trigger_", 12) == 0)
{
obj.classId = foundDep->classid;
obj.objectId = foundDep->objid;
obj.objectSubId = foundDep->refobjsubid;
add_exact_object_address(&obj, immv_triggers);
}
systable_endscan(tgscan);
}
}
systable_endscan(scan);
performMultipleDeletions(immv_triggers, DROP_RESTRICT, PERFORM_DELETION_INTERNAL);
table_close(depRel, RowExclusiveLock);
table_close(tgRel, RowExclusiveLock);
free_object_addresses(immv_triggers);
}
/*
* Fix issue https://github.com/apache/cloudberry/issues/865
* Create blockdir for Matartalized View of AO/AOCS storage has index.
*/
ao_has_index = matviewRel->rd_rel->relhasindex && RelationIsAppendOptimized(matviewRel);
/*
* Create the transient table that will receive the regenerated data. Lock
* it against access by any other process until commit (by which time it
* will be gone).
*/
OIDNewHeap = make_new_heap_with_colname(matviewOid, tableSpace, matviewRel->rd_rel->relam, relpersistence,
ExclusiveLock, ao_has_index, true, "_$");
LockRelationOid(OIDNewHeap, AccessExclusiveLock);
dest = CreateTransientRelDestReceiver(OIDNewHeap, matviewOid, concurrent, relpersistence,
stmt->skipData);
refreshClause = MakeRefreshClause(concurrent, stmt->skipData, stmt->relation);
/*
* Only in dispatcher role, we should set intoPolicy, else it should remain NULL.
*/
if (GP_ROLE_DISPATCH == Gp_role)
{
dataQuery->intoPolicy = matviewRel->rd_cdbpolicy;
}
/* Generate the data, if wanted. */
/*
* In GPDB, we call refresh_matview_datafill() even when WITH NO DATA was
* specified, because it will dispatch the operation to the segments.
*/
processed = refresh_matview_datafill(dest, dataQuery, queryString, refreshClause);
/* Make the matview match the newly generated data. */
if (concurrent)
{
int old_depth = matview_maintenance_depth;
PG_TRY();
{
refresh_by_match_merge(matviewOid, OIDNewHeap, relowner,
save_sec_context);
}
PG_CATCH();
{
matview_maintenance_depth = old_depth;
PG_RE_THROW();
}
PG_END_TRY();
Assert(matview_maintenance_depth == old_depth);
}
else
{
refresh_by_heap_swap(matviewOid, OIDNewHeap, relpersistence);
/*
* Inform stats collector about our activity: basically, we truncated
* the matview and inserted some new data. (The concurrent code path
* above doesn't need to worry about this because the inserts and
* deletes it issues get counted by lower-level code.)
*
* In GPDB, we should count the pgstat on segments and union them on
* QD, so both the segments and coordinator will have pgstat for this
* relation. See pgstat_combine_from_qe(pgstat.c) for more details.
* Then comment out the below codes on the dispatcher side and leave
* the current comment to avoid further upstream merge issues.
* The pgstat is updated in function transientrel_shutdown on QE side.
* This related to issue: https://github.com/greenplum-db/gpdb/issues/11375
*/
// pgstat_count_truncate(matviewRel);
// if (!stmt->skipData)
// pgstat_count_heap_insert(matviewRel, processed);
}
if (!stmt->skipData && RelationIsIVM(matviewRel) && !oldPopulated)
{
CreateIvmTriggersOnBaseTables(dataQuery, matviewOid);
}
table_close(matviewRel, NoLock);
/* Roll back any GUC changes */
AtEOXact_GUC(false, save_nestlevel);
/* Restore userid and security context */
SetUserIdAndSecContext(save_userid, save_sec_context);
ObjectAddressSet(address, RelationRelationId, matviewOid);
/*
* Save the rowcount so that pg_stat_statements can track the total number
* of rows processed by REFRESH MATERIALIZED VIEW command. Note that we
* still don't display the rowcount in the command completion tag output,
* i.e., the display_rowcount flag of CMDTAG_REFRESH_MATERIALIZED_VIEW
* command tag is left false in cmdtaglist.h. Otherwise, the change of
* completion tag output might break applications using it.
*/
if (qc)
{
if (stmt->isdynamic)
SetQueryCompletion(qc, CMDTAG_REFRESH_DYNAMIC_TABLE, processed);
else
SetQueryCompletion(qc, CMDTAG_REFRESH_MATERIALIZED_VIEW, processed);
}
return address;
}
/*
* refresh_matview_datafill
*
* Execute the given query, sending result rows to "dest" (which will
* insert them into the target matview).
*
* Returns number of rows inserted.
*/
static uint64
refresh_matview_datafill(DestReceiver *dest, Query *query,
const char *queryString, RefreshClause *refreshClause)
{
List *rewritten;
PlannedStmt *plan;
QueryDesc *queryDesc;
Query *copied_query;
uint64 processed;
/*
* Cloudberry specific behavior:
* MPP architecture needs to make sure OIDs of the temp table are the same
* among QD and all QEs. It stores the OID in the static variable dispatch_oids.
* This variable will be consumed for each dispatch.
*
* During planning, Cloudberry might pre-evalute some function expr, this will
* lead to dispatch if the function is in SQL or PLPGSQL and consumes the above
* static variable. So later refresh matview's dispatch will not find the
* oid on QEs.
*
* We first store the OIDs information in a local variable, and then restore
* it is for later refresh matview's dispatch to solve the above issue.
*
* See Github Issue for details: https://github.com/greenplum-db/gpdb/issues/11956
*/
List *saved_dispatch_oids = SaveOidAssignments();
/* Lock and rewrite, using a copy to preserve the original query. */
copied_query = copyObject(query);
AcquireRewriteLocks(copied_query, true, false);
rewritten = QueryRewrite(copied_query);
/* SELECT should never rewrite to more or less than one SELECT query */
if (list_length(rewritten) != 1)
elog(ERROR, "unexpected rewrite result for REFRESH MATERIALIZED VIEW");
query = (Query *) linitial(rewritten);
/*
* In GPDB, the refresh clause is dispatched to segments when execute the query plan.
* But for WITH NO DATA option, it's effectively like a TRUNCATE, so it doesn't need
* to take a long time to run the query.
*
* Add a constant-FALSE to the qual to simulate a plan like this, dispatch the refresh
* clause without run the long query:
* Motion
* Result (cost=0.00..0.01 rows=1 width=0)
* One-Time Filter: false
* Planner create the motion node on the top according to the matview's distribution
* policy in the query->intoPolicy.
*/
if (refreshClause->skipData)
query->jointree->quals = (Node *) makeBoolConst(false, false);
/* Check for user-requested abort. */
CHECK_FOR_INTERRUPTS();
/* Plan the query which will generate data for the refresh. */
plan = pg_plan_query(query, queryString, CURSOR_OPT_PARALLEL_OK, NULL);
plan->refreshClause = refreshClause;
/*
* Use a snapshot with an updated command ID to ensure this query sees
* results of any previously executed queries. (This could only matter if
* the planner executed an allegedly-stable function that changed the
* database contents, but let's do it anyway to be safe.)
*/
PushCopiedSnapshot(GetActiveSnapshot());
UpdateActiveSnapshotCommandId();
/* Create a QueryDesc, redirecting output to our tuple receiver */
queryDesc = CreateQueryDesc(plan, queryString,
GetActiveSnapshot(), InvalidSnapshot,
dest, NULL, NULL, 0);
RestoreOidAssignments(saved_dispatch_oids);
/* call ExecutorStart to prepare the plan for execution */
ExecutorStart(queryDesc, 0);
/* run the plan */
ExecutorRun(queryDesc, ForwardScanDirection, 0L, true);
/* and clean up */
ExecutorFinish(queryDesc);
ExecutorEnd(queryDesc);
processed = queryDesc->es_processed;
FreeQueryDesc(queryDesc);
PopActiveSnapshot();
return processed;
}
DestReceiver *
CreateTransientRelDestReceiver(Oid transientoid, Oid oldreloid, bool concurrent,
char relpersistence, bool skipdata)
{
DR_transientrel *self = (DR_transientrel *) palloc0(sizeof(DR_transientrel));
self->pub.receiveSlot = transientrel_receive;
self->pub.rStartup = transientrel_startup;
self->pub.rShutdown = transientrel_shutdown;
self->pub.rDestroy = transientrel_destroy;
self->pub.mydest = DestTransientRel;
self->transientoid = transientoid;
self->oldreloid = oldreloid;
self->concurrent = concurrent;
self->skipData = skipdata;
self->relpersistence = relpersistence;
self->processed = 0;
return (DestReceiver *) self;
}
void
transientrel_init(QueryDesc *queryDesc)
{
Oid matviewOid;
Relation matviewRel;
Oid tableSpace;
Oid OIDNewHeap;
bool concurrent;
char relpersistence;
LOCKMODE lockmode;
RefreshClause *refreshClause;
bool ao_has_index;
refreshClause = queryDesc->plannedstmt->refreshClause;
/* Determine strength of lock needed. */
concurrent = refreshClause->concurrent;
lockmode = concurrent ? ExclusiveLock : AccessExclusiveLock;
/*
* Get a lock until end of transaction.
*/
matviewOid = RangeVarGetRelidExtended(refreshClause->relation,
lockmode, 0,
RangeVarCallbackOwnsTable, NULL);
matviewRel = heap_open(matviewOid, NoLock);
/*
* Tentatively mark the matview as populated or not (this will roll back
* if we fail later).
*/
SetMatViewPopulatedState(matviewRel, !refreshClause->skipData);
/* Concurrent refresh builds new data in temp tablespace, and does diff. */
if (concurrent)
{
tableSpace = GetDefaultTablespace(RELPERSISTENCE_TEMP, false);
relpersistence = RELPERSISTENCE_TEMP;
}
else
{
tableSpace = matviewRel->rd_rel->reltablespace;
relpersistence = matviewRel->rd_rel->relpersistence;
}
ao_has_index = matviewRel->rd_rel->relhasindex && RelationIsAppendOptimized(matviewRel);
/*
* Create the transient table that will receive the regenerated data. Lock
* it against access by any other process until commit (by which time it
* will be gone).
*/
OIDNewHeap = make_new_heap(matviewOid, tableSpace, matviewRel->rd_rel->relam,
relpersistence,
ExclusiveLock, ao_has_index, false);
LockRelationOid(OIDNewHeap, AccessExclusiveLock);
queryDesc->dest = CreateTransientRelDestReceiver(OIDNewHeap, matviewOid, concurrent,
relpersistence, refreshClause->skipData);
heap_close(matviewRel, NoLock);
}
/*
* transientrel_startup --- executor startup
*/
static void
transientrel_startup(DestReceiver *self, int operation, TupleDesc typeinfo)
{
DR_transientrel *myState = (DR_transientrel *) self;
Relation transientrel;
transientrel = table_open(myState->transientoid, NoLock);
/*
* Fill private fields of myState for use by later routines
*/
myState->transientrel = transientrel;
myState->output_cid = GetCurrentCommandId(true);
myState->ti_options = TABLE_INSERT_SKIP_FSM | TABLE_INSERT_FROZEN;
myState->bistate = GetBulkInsertState();
myState->processed = 0;
if (RelationIsAoRows(myState->transientrel))
appendonly_dml_init(myState->transientrel, CMD_INSERT);
else if (RelationIsAoCols(myState->transientrel))
aoco_dml_init(myState->transientrel, CMD_INSERT);
else if (ext_dml_init_hook)
ext_dml_init_hook(myState->transientrel, CMD_INSERT);
/*
* Valid smgr_targblock implies something already wrote to the relation.
* This may be harmless, but this function hasn't planned for it.
*/
Assert(RelationGetTargetBlock(transientrel) == InvalidBlockNumber);
}
/*
* transientrel_receive --- receive one tuple
*/
static bool
transientrel_receive(TupleTableSlot *slot, DestReceiver *self)
{
DR_transientrel *myState = (DR_transientrel *) self;
if (myState->skipData)
return true;
/*
* Note that the input slot might not be of the type of the target
* relation. That's supported by table_tuple_insert(), but slightly less
* efficient than inserting with the right slot - but the alternative
* would be to copy into a slot of the right type, which would not be
* cheap either. This also doesn't allow accessing per-AM data (say a
* tuple's xmin), but since we don't do that here...
*/
table_tuple_insert(myState->transientrel,
slot,
myState->output_cid,
myState->ti_options,
myState->bistate);
myState->processed++;
/* We know this is a newly created relation, so there are no indexes */
return true;
}
/*
* transientrel_shutdown --- executor end
*/
static void
transientrel_shutdown(DestReceiver *self)
{
DR_transientrel *myState = (DR_transientrel *) self;
FreeBulkInsertState(myState->bistate);
table_finish_bulk_insert(myState->transientrel, myState->ti_options);
/* close transientrel, but keep lock until commit */
table_close(myState->transientrel, NoLock);
myState->transientrel = NULL;
if (Gp_role == GP_ROLE_EXECUTE && !myState->concurrent)
{
Relation matviewRel;
matviewRel = table_open(myState->oldreloid, NoLock);
refresh_by_heap_swap(myState->oldreloid, myState->transientoid, myState->relpersistence);
/*
* In GPDB, we should count the pgstat on segments and union them on
* QD, so both the segments and coordinator will have pgstat for this
* relation. See pgstat_combine_from_qe(pgstat.c) for more details.
* Here each QE will count it's pgstat and report to QD if needed.
* This related to issue: https://github.com/greenplum-db/gpdb/issues/11375
*/
pgstat_count_truncate(matviewRel);
if (!myState->skipData)
pgstat_count_heap_insert(matviewRel, myState->processed);
table_close(matviewRel, NoLock);
}
}
/*
* transientrel_destroy --- release DestReceiver object
*/
static void
transientrel_destroy(DestReceiver *self)
{
pfree(self);
}
/*
* Given a qualified temporary table name, append an underscore followed by
* the given integer, to make a new table name based on the old one.
* The result is a palloc'd string.
*
* As coded, this would fail to make a valid SQL name if the given name were,
* say, "FOO"."BAR". Currently, the table name portion of the input will
* never be double-quoted because it's of the form "pg_temp_NNN", cf
* make_new_heap(). But we might have to work harder someday.
*/
static char *
make_temptable_name_n(char *tempname, int n)
{
StringInfoData namebuf;
initStringInfo(&namebuf);
appendStringInfoString(&namebuf, tempname);
appendStringInfo(&namebuf, "_%d", n);
return namebuf.data;
}
/*
* refresh_by_match_merge
*
* Refresh a materialized view with transactional semantics, while allowing
* concurrent reads.
*
* This is called after a new version of the data has been created in a
* temporary table. It performs a full outer join against the old version of
* the data, producing "diff" results. This join cannot work if there are any
* duplicated rows in either the old or new versions, in the sense that every
* column would compare as equal between the two rows. It does work correctly
* in the face of rows which have at least one NULL value, with all non-NULL
* columns equal. The behavior of NULLs on equality tests and on UNIQUE
* indexes turns out to be quite convenient here; the tests we need to make
* are consistent with default behavior. If there is at least one UNIQUE
* index on the materialized view, we have exactly the guarantee we need.
*
* The temporary table used to hold the diff results contains just the TID of
* the old record (if matched) and the ROW from the new table as a single
* column of complex record type (if matched).
*
* Once we have the diff table, we perform set-based DELETE and INSERT
* operations against the materialized view, and discard both temporary
* tables.
*
* Everything from the generation of the new data to applying the differences
* takes place under cover of an ExclusiveLock, since it seems as though we
* would want to prohibit not only concurrent REFRESH operations, but also
* incremental maintenance. It also doesn't seem reasonable or safe to allow
* SELECT FOR UPDATE or SELECT FOR SHARE on rows being updated or deleted by
* this command.
*/
static void
refresh_by_match_merge(Oid matviewOid, Oid tempOid, Oid relowner,
int save_sec_context)
{
StringInfoData querybuf;
Relation matviewRel;
Relation tempRel;
char *matviewname;
char *tempname;
char *diffname;
TupleDesc tupdesc;
TupleDesc newHeapDesc;
bool foundUniqueIndex;
List *indexoidlist;
ListCell *indexoidscan;
int16 relnatts;
Oid *opUsedForQual;
initStringInfo(&querybuf);
matviewRel = table_open(matviewOid, NoLock);
matviewname = quote_qualified_identifier(get_namespace_name(RelationGetNamespace(matviewRel)),
RelationGetRelationName(matviewRel));
tempRel = table_open(tempOid, NoLock);
tempname = quote_qualified_identifier(get_namespace_name(RelationGetNamespace(tempRel)),
RelationGetRelationName(tempRel));
diffname = make_temptable_name_n(tempname, 2);
relnatts = RelationGetNumberOfAttributes(matviewRel);
/* Open SPI context. */
if (SPI_connect() != SPI_OK_CONNECT)
elog(ERROR, "SPI_connect failed");
/* Analyze the temp table with the new contents. */
appendStringInfo(&querybuf, "ANALYZE %s", tempname);
if (SPI_exec(querybuf.data, 0) != SPI_OK_UTILITY)
elog(ERROR, "SPI_exec failed: %s", querybuf.data);
/*
* We need to ensure that there are not duplicate rows without NULLs in
* the new data set before we can count on the "diff" results. Check for
* that in a way that allows showing the first duplicated row found. Even
* after we pass this test, a unique index on the materialized view may
* find a duplicate key problem.
*
* Note: here and below, we use "tablename.*::tablerowtype" as a hack to
* keep ".*" from being expanded into multiple columns in a SELECT list.
* Compare ruleutils.c's get_variable().
*/
resetStringInfo(&querybuf);
appendStringInfo(&querybuf,
"SELECT newdata.*::%s FROM %s newdata "
"WHERE newdata.* IS NOT NULL AND EXISTS "
"(SELECT 1 FROM %s newdata2 WHERE newdata2.* IS NOT NULL "
"AND newdata2.* OPERATOR(pg_catalog.*=) newdata.* "
"AND newdata2.ctid OPERATOR(pg_catalog.<>) "
"newdata.ctid AND newdata2.gp_segment_id "
"OPERATOR(pg_catalog.=) newdata.gp_segment_id)",
tempname, tempname, tempname);
if (SPI_execute(querybuf.data, false, 1) != SPI_OK_SELECT)
elog(ERROR, "SPI_exec failed: %s", querybuf.data);
if (SPI_processed > 0)
{
/*
* Note that this ereport() is returning data to the user. Generally,
* we would want to make sure that the user has been granted access to
* this data. However, REFRESH MAT VIEW is only able to be run by the
* owner of the mat view (or a superuser) and therefore there is no
* need to check for access to data in the mat view.
*/
ereport(ERROR,
(errcode(ERRCODE_CARDINALITY_VIOLATION),
errmsg("new data for materialized view \"%s\" contains duplicate rows without any null columns",
RelationGetRelationName(matviewRel)),
errdetail("Row: %s",
SPI_getvalue(SPI_tuptable->vals[0], SPI_tuptable->tupdesc, 1))));
}
/*
* Create the temporary "diff" table.
*
* Temporarily switch out of the SECURITY_RESTRICTED_OPERATION context,
* because you cannot create temp tables in SRO context. For extra
* paranoia, add the composite type column only after switching back to
* SRO context.
*
* Greenplum doesn't store diffs in a composite type column, instead it
* creates a similar table with the same distribution for performance
* considerations.
*/
SetUserIdAndSecContext(relowner,
save_sec_context | SECURITY_LOCAL_USERID_CHANGE);
resetStringInfo(&querybuf);
appendStringInfo(&querybuf,
"CREATE TEMP TABLE %s (LIKE %s)",
diffname, tempname);
if (SPI_exec(querybuf.data, 0) != SPI_OK_UTILITY)
elog(ERROR, "SPI_exec failed: %s", querybuf.data);
SetUserIdAndSecContext(relowner,
save_sec_context | SECURITY_RESTRICTED_OPERATION);
resetStringInfo(&querybuf);
appendStringInfo(&querybuf,
"ALTER TABLE %s ADD COLUMN tid pg_catalog.tid",
diffname);
if (SPI_exec(querybuf.data, 0) != SPI_OK_UTILITY)
elog(ERROR, "SPI_exec failed: %s", querybuf.data);
resetStringInfo(&querybuf);
appendStringInfo(&querybuf,
"ALTER TABLE %s ADD COLUMN sid pg_catalog.int4",
diffname);
if (SPI_exec(querybuf.data, 0) != SPI_OK_UTILITY)
elog(ERROR, "SPI_exec failed: %s", querybuf.data);
/* Start building the query for populating the diff table. */
resetStringInfo(&querybuf);
appendStringInfo(&querybuf,
"INSERT INTO %s "
"SELECT newdata.*, mv.ctid AS tid, mv.gp_segment_id as sid "
"FROM %s mv FULL JOIN %s newdata ON (",
diffname, matviewname, tempname);
/*
* Get the list of index OIDs for the table from the relcache, and look up
* each one in the pg_index syscache. We will test for equality on all
* columns present in all unique indexes which only reference columns and
* include all rows.
*/
tupdesc = matviewRel->rd_att;
newHeapDesc = tempRel->rd_att;
opUsedForQual = (Oid *) palloc0(sizeof(Oid) * relnatts);
foundUniqueIndex = false;
indexoidlist = RelationGetIndexList(matviewRel);
foreach(indexoidscan, indexoidlist)
{
Oid indexoid = lfirst_oid(indexoidscan);
Relation indexRel;
indexRel = index_open(indexoid, RowExclusiveLock);
if (is_usable_unique_index(indexRel))
{
Form_pg_index indexStruct = indexRel->rd_index;
int indnkeyatts = indexStruct->indnkeyatts;
oidvector *indclass;
Datum indclassDatum;
bool isnull;
int i;
/* Must get indclass the hard way. */
indclassDatum = SysCacheGetAttr(INDEXRELID,
indexRel->rd_indextuple,
Anum_pg_index_indclass,
&isnull);
Assert(!isnull);
indclass = (oidvector *) DatumGetPointer(indclassDatum);
/* Add quals for all columns from this index. */
for (i = 0; i < indnkeyatts; i++)
{
int attnum = indexStruct->indkey.values[i];
Oid opclass = indclass->values[i];
Form_pg_attribute attr = TupleDescAttr(tupdesc, attnum - 1);
Form_pg_attribute newattr = TupleDescAttr(newHeapDesc, attnum - 1);
Oid attrtype = attr->atttypid;
HeapTuple cla_ht;
Form_pg_opclass cla_tup;
Oid opfamily;
Oid opcintype;
Oid op;
const char *leftop;
const char *rightop;
/*
* Identify the equality operator associated with this index
* column. First we need to look up the column's opclass.
*/
cla_ht = SearchSysCache1(CLAOID, ObjectIdGetDatum(opclass));
if (!HeapTupleIsValid(cla_ht))
elog(ERROR, "cache lookup failed for opclass %u", opclass);
cla_tup = (Form_pg_opclass) GETSTRUCT(cla_ht);
Assert(IsIndexAccessMethod(cla_tup->opcmethod, BTREE_AM_OID));
opfamily = cla_tup->opcfamily;
opcintype = cla_tup->opcintype;
ReleaseSysCache(cla_ht);
op = get_opfamily_member(opfamily, opcintype, opcintype,
BTEqualStrategyNumber);
if (!OidIsValid(op))
elog(ERROR, "missing operator %d(%u,%u) in opfamily %u",
BTEqualStrategyNumber, opcintype, opcintype, opfamily);
/*
* If we find the same column with the same equality semantics
* in more than one index, we only need to emit the equality
* clause once.
*
* Since we only remember the last equality operator, this
* code could be fooled into emitting duplicate clauses given
* multiple indexes with several different opclasses ... but
* that's so unlikely it doesn't seem worth spending extra
* code to avoid.
*/
if (opUsedForQual[attnum - 1] == op)
continue;
opUsedForQual[attnum - 1] = op;
/*
* Actually add the qual, ANDed with any others.
*/
if (foundUniqueIndex)
appendStringInfoString(&querybuf, " AND ");
leftop = quote_qualified_identifier("newdata",
NameStr(newattr->attname));
rightop = quote_qualified_identifier("mv",
NameStr(attr->attname));
generate_operator_clause(&querybuf,
leftop, attrtype,
op,
rightop, attrtype);
foundUniqueIndex = true;
}
}
/* Keep the locks, since we're about to run DML which needs them. */
index_close(indexRel, NoLock);
}
list_free(indexoidlist);
/*
* There must be at least one usable unique index on the matview.
*
* ExecRefreshMatView() checks that after taking the exclusive lock on the
* matview. So at least one unique index is guaranteed to exist here
* because the lock is still being held; so an Assert seems sufficient.
*/
Assert(foundUniqueIndex);
appendStringInfoString(&querybuf,
" AND newdata.* OPERATOR(pg_catalog.*=) mv.*) "
"WHERE newdata.* IS NULL OR mv.* IS NULL "
"ORDER BY tid");
/* Populate the temporary "diff" table. */
if (SPI_exec(querybuf.data, 0) != SPI_OK_INSERT)
elog(ERROR, "SPI_exec failed: %s", querybuf.data);
/*
* We have no further use for data from the "full-data" temp table, but we
* must keep it around because its type is referenced from the diff table.
*/
/* Analyze the diff table. */
resetStringInfo(&querybuf);
appendStringInfo(&querybuf, "ANALYZE %s", diffname);
if (SPI_exec(querybuf.data, 0) != SPI_OK_UTILITY)
elog(ERROR, "SPI_exec failed: %s", querybuf.data);
OpenMatViewIncrementalMaintenance();
/* Deletes must come before inserts; do them first. */
resetStringInfo(&querybuf);
appendStringInfo(&querybuf,
"DELETE FROM %s mv WHERE ctid OPERATOR(pg_catalog.=) ANY "
"(SELECT diff.tid FROM %s diff "
"WHERE diff.tid IS NOT NULL "
"AND diff.tid OPERATOR(pg_catalog.=) mv.ctid AND diff.sid "
"OPERATOR(pg_catalog.=) mv.gp_segment_id)",
matviewname, diffname);
if (SPI_exec(querybuf.data, 0) != SPI_OK_DELETE)
elog(ERROR, "SPI_exec failed: %s", querybuf.data);
/* Inserts go last. */
resetStringInfo(&querybuf);
appendStringInfo(&querybuf, "INSERT INTO %s SELECT", matviewname);
for (int i = 0; i < newHeapDesc->natts; ++i)
{
Form_pg_attribute attr = TupleDescAttr(newHeapDesc, i);
if (i == newHeapDesc->natts - 1)
appendStringInfo(&querybuf, " %s", NameStr(attr->attname));
else
appendStringInfo(&querybuf, " %s,", NameStr(attr->attname));
}
appendStringInfo(&querybuf,
" FROM %s diff WHERE tid IS NULL",
diffname);
if (SPI_exec(querybuf.data, 0) != SPI_OK_INSERT)
elog(ERROR, "SPI_exec failed: %s", querybuf.data);
/* We're done maintaining the materialized view. */
CloseMatViewIncrementalMaintenance();
table_close(tempRel, NoLock);
table_close(matviewRel, NoLock);
/* Clean up temp tables. */
resetStringInfo(&querybuf);
appendStringInfo(&querybuf, "DROP TABLE %s, %s", diffname, tempname);
if (SPI_exec(querybuf.data, 0) != SPI_OK_UTILITY)
elog(ERROR, "SPI_exec failed: %s", querybuf.data);
/* Close SPI context. */
if (SPI_finish() != SPI_OK_FINISH)
elog(ERROR, "SPI_finish failed");
}
/*
* Swap the physical files of the target and transient tables, then rebuild
* the target's indexes and throw away the transient table. Security context
* swapping is handled by the called function, so it is not needed here.
*/
static void
refresh_by_heap_swap(Oid matviewOid, Oid OIDNewHeap, char relpersistence)
{
finish_heap_swap(matviewOid, OIDNewHeap,
false, /* is_system_catalog */
false, /* swap_toast_by_content */
false, /* swap_stats */
true, /* check_constraints */
true, /* is_internal */
RecentXmin, ReadNextMultiXactId(),
relpersistence);
}
/*
* Check whether specified index is usable for match merge.
*/
static bool
is_usable_unique_index(Relation indexRel)
{
Form_pg_index indexStruct = indexRel->rd_index;
/*
* Must be unique, valid, immediate, non-partial, and be defined over
* plain user columns (not expressions). We also require it to be a
* btree. Even if we had any other unique index kinds, we'd not know how
* to identify the corresponding equality operator, nor could we be sure
* that the planner could implement the required FULL JOIN with non-btree
* operators.
*/
if (indexStruct->indisunique &&
indexStruct->indimmediate &&
IsIndexAccessMethod(indexRel->rd_rel->relam, BTREE_AM_OID) &&
indexStruct->indisvalid &&
RelationGetIndexPredicate(indexRel) == NIL &&
indexStruct->indnatts > 0)
{
/*
* The point of groveling through the index columns individually is to
* reject both index expressions and system columns. Currently,
* matviews couldn't have OID columns so there's no way to create an
* index on a system column; but maybe someday that wouldn't be true,
* so let's be safe.
*/
int numatts = indexStruct->indnatts;
int i;
for (i = 0; i < numatts; i++)
{
int attnum = indexStruct->indkey.values[i];
if (attnum <= 0)
return false;
}
return true;
}
return false;
}
/*
* This should be used to test whether the backend is in a context where it is
* OK to allow DML statements to modify materialized views. We only want to
* allow that for internal code driven by the materialized view definition,
* not for arbitrary user-supplied code.
*
* While the function names reflect the fact that their main intended use is
* incremental maintenance of materialized views (in response to changes to
* the data in referenced relations), they are initially used to allow REFRESH
* without blocking concurrent reads.
*/
bool
MatViewIncrementalMaintenanceIsEnabled(void)
{
if (Gp_role == GP_ROLE_EXECUTE)
return true;
else
return matview_maintenance_depth > 0;
}
static void
OpenMatViewIncrementalMaintenance(void)
{
matview_maintenance_depth++;
}
static void
CloseMatViewIncrementalMaintenance(void)
{
matview_maintenance_depth--;
Assert(matview_maintenance_depth >= 0);
}
/*
* get_matview_query - get the Query from a matview's _RETURN rule.
*/
Query *
get_matview_query(Relation matviewRel)
{
RewriteRule *rule;
List * actions;
/*
* Check that everything is correct for a refresh. Problems at this point
* are internal errors, so elog is sufficient.
*/
if (matviewRel->rd_rel->relhasrules == false ||
matviewRel->rd_rules->numLocks < 1)
elog(ERROR,
"materialized view \"%s\" is missing rewrite information",
RelationGetRelationName(matviewRel));
if (matviewRel->rd_rules->numLocks > 1)
elog(ERROR,
"materialized view \"%s\" has too many rules",
RelationGetRelationName(matviewRel));
rule = matviewRel->rd_rules->rules[0];
if (rule->event != CMD_SELECT || !(rule->isInstead))
elog(ERROR,
"the rule for materialized view \"%s\" is not a SELECT INSTEAD OF rule",
RelationGetRelationName(matviewRel));
actions = rule->actions;
if (list_length(actions) != 1)
elog(ERROR,
"the rule for materialized view \"%s\" is not a single action",
RelationGetRelationName(matviewRel));
/*
* The stored query was rewritten at the time of the MV definition, but
* has not been scribbled on by the planner.
*/
return linitial_node(Query, actions);
}
/* ----------------------------------------------------
* Incremental View Maintenance routines
* ---------------------------------------------------
*/
/*
* ivm_immediate_before
*
* IVM trigger function invoked before base table is modified. If this is
* invoked firstly in the same statement, we save the transaction id and the
* command id at that time.
*/
Datum
ivm_immediate_before(PG_FUNCTION_ARGS)
{
TriggerData *trigdata = (TriggerData *) fcinfo->context;
char *matviewOid_text = trigdata->tg_trigger->tgargs[0];
char *ex_lock_text = trigdata->tg_trigger->tgargs[1];
Oid matviewOid;
MV_TriggerHashEntry *entry;
bool found;
bool ex_lock;
ResourceOwner oldowner;
MemoryContext oldctx;
Relation rel = trigdata->tg_relation;
matviewOid = DatumGetObjectId(DirectFunctionCall1(oidin, CStringGetDatum(matviewOid_text)));
ex_lock = DatumGetBool(DirectFunctionCall1(boolin, CStringGetDatum(ex_lock_text)));
/* If the view has more than one tables, we have to use an exclusive lock. */
if (ex_lock)
{
/*
* Wait for concurrent transactions which update this materialized view at
* READ COMMITED. This is needed to see changes committed in other
* transactions. No wait and raise an error at REPEATABLE READ or
* SERIALIZABLE to prevent update anomalies of matviews.
* XXX: dead-lock is possible here.
*/
if (!IsolationUsesXactSnapshot())
LockRelationOid(matviewOid, ExclusiveLock);
else if (!ConditionalLockRelationOid(matviewOid, ExclusiveLock))
{
/* try to throw error by name; relation could be deleted... */
char *relname = get_rel_name(matviewOid);
if (!relname)
ereport(ERROR,
(errcode(ERRCODE_LOCK_NOT_AVAILABLE),
errmsg("could not obtain lock on materialized view during incremental maintenance")));
ereport(ERROR,
(errcode(ERRCODE_LOCK_NOT_AVAILABLE),
errmsg("could not obtain lock on materialized view \"%s\" during incremental maintenance",
relname)));
}
}
else
LockRelationOid(matviewOid, RowExclusiveLock);
entry = (MV_TriggerHashEntry *) hash_search(mv_trigger_info,
(void *) &matviewOid,
HASH_ENTER, &found);
/* On the first BEFORE to update the view, initialize trigger data */
if (!found)
{
/*
* Get a snapshot just before the table was modified for checking
* tuple visibility in the pre-update state of the table.
*/
entry->context = AllocSetContextCreate(TopMemoryContext,
"IVM Writer Session",
ALLOCSET_DEFAULT_SIZES);
entry->resowner = ResourceOwnerCreate(TopTransactionResourceOwner, "IVM Writer Session");
/* Change to session owner */
oldowner = CurrentResourceOwner;
CurrentResourceOwner = entry->resowner;
oldctx = MemoryContextSwitchTo(entry->context);
entry->snapname = (char*) palloc0(SNAPSHOT_KEYSIZE);
entry->matview_id = matviewOid;
entry->before_trig_count = 0;
entry->after_trig_count = 0;
entry->snapshot = NULL;
entry->tables = NIL;
entry->has_old = false;
entry->has_new = false;
entry->reference = 1;
entry->pid = MyProcPid;
entry->snapname[0] = '\0';
MemoryContextSwitchTo(oldctx);
CurrentResourceOwner = oldowner;
}
entry->before_trig_count++;
elogif(Debug_print_ivm, INFO, "IVM ivm_immediate_before ref %d, mvid:%d", entry->before_trig_count, matviewOid);
if (Gp_role == GP_ROLE_DISPATCH && !TRIGGER_FIRED_BY_TRUNCATE(trigdata->tg_event))
{
snprintf(entry->snapname, SNAPSHOT_KEYSIZE, "%08X-%08X-%d-%d",
MyProc->backendId, MyProc->lxid, gp_command_count, entry->before_trig_count);
ivm_export_snapshot(matviewOid, entry->snapname);
}
ivm_set_ts_persitent_name(trigdata, rel->rd_id, matviewOid);
return PointerGetDatum(NULL);
}
/*
* ivm_immediate_maintenance
*
* IVM trigger function invoked after base table is modified.
* For each table, tuplestores of transition tables are collected.
* and after the last modification
*/
Datum
ivm_immediate_maintenance(PG_FUNCTION_ARGS)
{
TriggerData *trigdata = (TriggerData *) fcinfo->context;
Relation rel;
Oid relid;
Oid matviewOid;
Query *query;
Query *rewritten = NULL;
char *matviewOid_text = trigdata->tg_trigger->tgargs[0];
Relation matviewRel;
int old_depth = matview_maintenance_depth;
Oid relowner;
Tuplestorestate *old_tuplestore = NULL;
Tuplestorestate *new_tuplestore = NULL;
DestReceiver *dest_new = NULL, *dest_old = NULL;
Oid save_userid;
int save_sec_context;
int save_nestlevel;
MV_TriggerHashEntry *entry;
MV_TriggerTable *table;
bool found;
ParseState *pstate;
MemoryContext oldcxt;
ListCell *lc;
int i;
ResourceOwner oldowner;
QueryEnvironment *queryEnv = create_queryEnv();
/* Create a ParseState for rewriting the view definition query */
pstate = make_parsestate(NULL);
pstate->p_queryEnv = queryEnv;
pstate->p_expr_kind = EXPR_KIND_SELECT_TARGET;
rel = trigdata->tg_relation;
relid = rel->rd_id;
matviewOid = DatumGetObjectId(DirectFunctionCall1(oidin, CStringGetDatum(matviewOid_text)));
/* get the entry for this materialized view */
entry = (MV_TriggerHashEntry *) hash_search(mv_trigger_info,
(void *) &matviewOid,
HASH_FIND, &found);
Assert (found && entry != NULL);
entry->after_trig_count++;
elogif(Debug_print_ivm, INFO, "IVM ivm_immediate_maintenance ref %d, mvid:%d", entry->after_trig_count, matviewOid);
oldowner = CurrentResourceOwner;
CurrentResourceOwner = entry->resowner;
oldcxt = MemoryContextSwitchTo(entry->context);
/* search the entry for the modified table and create new entry if not found */
found = false;
foreach(lc, entry->tables)
{
table = (MV_TriggerTable *) lfirst(lc);
if (table->table_id == relid)
{
found = true;
break;
}
}
if (!found)
{
table = (MV_TriggerTable *) palloc0(sizeof(MV_TriggerTable));
table->table_id = relid;
table->old_tuplestores = NIL;
table->new_tuplestores = NIL;
table->rte_indexes = NIL;
table->slot = MakeSingleTupleTableSlot(RelationGetDescr(rel), table_slot_callbacks(rel));
table->rel = table_open(RelationGetRelid(rel), NoLock);
entry->tables = lappend(entry->tables, table);
}
/* Save the transition tables and make a request to not free immediately */
if (trigdata->tg_oldtable)
{
table->old_tuplestores = lappend(table->old_tuplestores, trigdata->tg_oldtable);
entry->has_old = true;
}
if (trigdata->tg_newtable)
{
table->new_tuplestores = lappend(table->new_tuplestores, trigdata->tg_newtable);
entry->has_new = true;
}
if (entry->has_new || entry->has_old)
{
CmdType cmd;
if (TRIGGER_FIRED_BY_INSERT(trigdata->tg_event))
cmd = CMD_INSERT;
else if (TRIGGER_FIRED_BY_DELETE(trigdata->tg_event))
cmd = CMD_DELETE;
else if (TRIGGER_FIRED_BY_UPDATE(trigdata->tg_event))
cmd = CMD_UPDATE;
else
elog(ERROR,"unsupported trigger type");
/* Prolong lifespan of transition tables to the end of the last AFTER trigger */
SetTransitionTablePreserved(relid, cmd);
}
/* If this is not the last AFTER trigger call, immediately exit. */
Assert (entry->before_trig_count >= entry->after_trig_count);
if (entry->before_trig_count != entry->after_trig_count)
return PointerGetDatum(NULL);
/*
* If this is the last AFTER trigger call, continue and update the view.
*/
/*
* Advance command counter to make the updated base table row locally
* visible.
*/
CommandCounterIncrement();
matviewRel = table_open(matviewOid, NoLock);
/* Make sure it is a materialized view. */
Assert(matviewRel->rd_rel->relkind == RELKIND_MATVIEW);
/*
* Get and push the latast snapshot to see any changes which is committed
* during waiting in other transactions at READ COMMITTED level.
*/
PushActiveSnapshot(GetTransactionSnapshot());
/*
* Check for active uses of the relation in the current transaction, such
* as open scans.
*
* NB: We count on this to protect us against problems with refreshing the
* data using TABLE_INSERT_FROZEN.
*/
CheckTableNotInUse(matviewRel, "refresh a materialized view incrementally");
/*
* Switch to the owner's userid, so that any functions are run as that
* user. Also arrange to make GUC variable changes local to this command.
* We will switch modes when we are about to execute user code.
*/
relowner = matviewRel->rd_rel->relowner;
GetUserIdAndSecContext(&save_userid, &save_sec_context);
SetUserIdAndSecContext(relowner,
save_sec_context);
save_nestlevel = NewGUCNestLevel();
/* get view query*/
query = get_matview_query(matviewRel);
/*
* When a base table is truncated, the view content will be empty if the
* view definition query does not contain an outer join or an aggregate
* without a GROUP clause. Therefore, such views can be truncated.
*
* Aggregate views without a GROUP clause always have one row. Therefore,
* if a base table is truncated, the view will not be empty and will contain
* a row with NULL value (or 0 for count()). So, in this case, we refresh the
* view instead of truncating it.
*/
if (TRIGGER_FIRED_BY_TRUNCATE(trigdata->tg_event))
{
MemoryContextSwitchTo(oldcxt);
if (!(query->hasAggs && query->groupClause == NIL))
ExecuteTruncateGuts(list_make1(matviewRel), list_make1_oid(matviewOid),
NIL, DROP_RESTRICT, false, NULL);
else if (Gp_role == GP_ROLE_DISPATCH)
ExecuteTruncateGuts_IVM(matviewRel, matviewOid, query);
/* Clean up hash entry and delete tuplestores */
clean_up_IVM_hash_entry(entry, false);
/* Pop the original snapshot. */
PopActiveSnapshot();
table_close(matviewRel, NoLock);
/* Roll back any GUC changes */
AtEOXact_GUC(false, save_nestlevel);
/* Restore userid and security context */
SetUserIdAndSecContext(save_userid, save_sec_context);
/* Restore resource owner */
CurrentResourceOwner = oldowner;
return PointerGetDatum(NULL);
}
/*
* rewrite query for calculating deltas
*/
rewritten = copyObject(query);
/* Replace resnames in a target list with materialized view's attnames */
i = 0;
foreach (lc, rewritten->targetList)
{
TargetEntry *tle = (TargetEntry *) lfirst(lc);
Form_pg_attribute attr = TupleDescAttr(matviewRel->rd_att, i);
char *resname = NameStr(attr->attname);
tle->resname = pstrdup(resname);
i++;
}
/*
* Step1: collect transition tables in QEs and
* Set all tables in the query to pre-update state and
*/
rewritten = rewrite_query_for_preupdate_state(rewritten, entry->tables,
pstate, matviewOid);
/* Rewrite for counting duplicated tuples and aggregates functions*/
rewritten = rewrite_query_for_counting_and_aggregates(rewritten, pstate);
/* Create tuplestores to store view deltas */
if (entry->has_old)
{
MemoryContext cxt = MemoryContextSwitchTo(TopTransactionContext);
old_tuplestore = tuplestore_begin_heap(false, false, work_mem);
dest_old = CreateDestReceiver(DestTuplestore);
SetTuplestoreDestReceiverParams(dest_old,
old_tuplestore,
TopTransactionContext,
false,
NULL,
NULL);
MemoryContextSwitchTo(cxt);
}
if (entry->has_new)
{
MemoryContext cxt = MemoryContextSwitchTo(TopTransactionContext);
new_tuplestore = tuplestore_begin_heap(false, false, work_mem);
dest_new = CreateDestReceiver(DestTuplestore);
SetTuplestoreDestReceiverParams(dest_new,
new_tuplestore,
TopTransactionContext,
false,
NULL,
NULL);
MemoryContextSwitchTo(cxt);
}
/*
* Step2: calculate delta tables
* Step3: apply delta tables to the materialized view
*/
if (Gp_role == GP_ROLE_DISPATCH)
{
/* for all modified tables */
foreach(lc, entry->tables)
{
ListCell *lc2;
table = (MV_TriggerTable *) lfirst(lc);
/* loop for self-join */
foreach(lc2, table->rte_indexes)
{
int rte_index = lfirst_int(lc2);
TupleDesc tupdesc_old;
TupleDesc tupdesc_new;
Size snaplen = strlen(entry->snapname);
bool use_count = false;
char *count_colname = NULL;
char *old_enr = NULL;
char *new_enr = NULL;
count_colname = pstrdup("__ivm_count__");
if (query->hasAggs || query->distinctClause)
use_count = true;
configure_queryEnv(queryEnv, matviewOid, table->table_id, entry->snapname, snaplen);
/* calculate delta tables */
old_enr = calc_delta_old(old_tuplestore, matviewRel, table, rte_index, rewritten, dest_old,
&tupdesc_old, queryEnv);
new_enr = calc_delta_new(new_tuplestore, matviewRel, table, rte_index, rewritten, dest_new,
&tupdesc_new, queryEnv);
configure_queryEnv(queryEnv, InvalidOid, InvalidOid, NULL, 0);
/* Set the table in the query to post-update state */
rewritten = rewrite_query_for_postupdate_state(rewritten, table, rte_index);
PG_TRY();
{
/* apply the delta tables to the materialized view */
apply_delta(old_enr, new_enr, table, matviewOid, old_tuplestore, new_tuplestore,
tupdesc_old, tupdesc_new, query, use_count, count_colname);
}
PG_CATCH();
{
matview_maintenance_depth = old_depth;
PG_RE_THROW();
}
PG_END_TRY();
/* clear view delta tuplestores */
if (old_tuplestore)
tuplestore_clear(old_tuplestore);
if (new_tuplestore)
tuplestore_clear(new_tuplestore);
}
}
}
if (old_tuplestore)
{
dest_old->rDestroy(dest_old);
tuplestore_end(old_tuplestore);
}
if (new_tuplestore)
{
dest_new->rDestroy(dest_new);
tuplestore_end(new_tuplestore);
}
MemoryContextSwitchTo(oldcxt);
/* Pop the original snapshot. */
PopActiveSnapshot();
table_close(matviewRel, NoLock);
/* Roll back any GUC changes */
AtEOXact_GUC(false, save_nestlevel);
/* Restore userid and security context */
SetUserIdAndSecContext(save_userid, save_sec_context);
CurrentResourceOwner = oldowner;
/*
* Step4: cleanup stage
*/
if (Gp_role == GP_ROLE_DISPATCH)
{
apply_cleanup(matviewOid);
DirectFunctionCall1(ivm_immediate_cleanup, ObjectIdGetDatum(matviewOid));
}
return PointerGetDatum(NULL);
}
/*
* rewrite_query_for_preupdate_state
*
* Rewrite the query so that base tables' RTEs will represent "pre-update"
* state of tables. This is necessary to calculate view delta after multiple
* tables are modified.
*/
static Query*
rewrite_query_for_preupdate_state(Query *query, List *tables,
ParseState *pstate, Oid matviewid)
{
ListCell *lc;
int num_rte = list_length(query->rtable);
int i;
/* register delta ENRs */
register_delta_ENRs(pstate, query, tables);
/* XXX: Is necessary? Is this right timing? */
AcquireRewriteLocks(query, true, false);
i = 1;
foreach(lc, query->rtable)
{
RangeTblEntry *r = (RangeTblEntry*) lfirst(lc);
ListCell *lc2;
foreach(lc2, tables)
{
MV_TriggerTable *table = (MV_TriggerTable *) lfirst(lc2);
/*
* if the modified table is found then replace the original RTE with
* "pre-state" RTE and append its index to the list.
*/
if (r->relid == table->table_id)
{
List *securityQuals;
List *withCheckOptions;
bool hasRowSecurity;
bool hasSubLinks;
RangeTblEntry *rte_pre = get_prestate_rte(r, table, pstate->p_queryEnv, matviewid);
/*
* Set a row security policies of the modified table to the subquery RTE which
* represents the pre-update state of the table.
*/
get_row_security_policies(query, table->original_rte, i,
&securityQuals, &withCheckOptions,
&hasRowSecurity, &hasSubLinks);
if (hasRowSecurity)
{
query->hasRowSecurity = true;
rte_pre->security_barrier = true;
}
if (hasSubLinks)
query->hasSubLinks = true;
rte_pre->securityQuals = securityQuals;
lfirst(lc) = rte_pre;
table->rte_indexes = list_append_unique_int(table->rte_indexes, i);
break;
}
}
/* finish the loop if we processed all RTE included in the original query */
if (i++ >= num_rte)
break;
}
return query;
}
/*
* register_delta_ENRs
*
* For all modified tables, make ENRs for their transition tables
* and register them to the queryEnv. ENR's RTEs are also appended
* into the list in query tree.
*/
static void
register_delta_ENRs(ParseState *pstate, Query *query, List *tables)
{
QueryEnvironment *queryEnv = pstate->p_queryEnv;
ListCell *lc;
RangeTblEntry *rte;
foreach(lc, tables)
{
MV_TriggerTable *table = (MV_TriggerTable *) lfirst(lc);
ListCell *lc2;
foreach(lc2, table->old_tuplestores)
{
Tuplestorestate *oldtable = (Tuplestorestate *) lfirst(lc2);
bool freezed = tuplestore_in_freezed(oldtable);
EphemeralNamedRelation enr =
palloc(sizeof(EphemeralNamedRelationData));
ParseNamespaceItem *nsitem;
char* shared_name = tuplestore_get_sharedname(oldtable);
if (freezed || shared_name)
enr->md.name = pstrdup(shared_name);
else
enr->md.name = MakeDeltaName("old", table->table_id, gp_command_count);
enr->md.reliddesc = table->table_id;
enr->md.tupdesc = CreateTupleDescCopy(table->rel->rd_att);
enr->md.enrtype = ENR_NAMED_TUPLESTORE;
enr->md.enrtuples = tuplestore_tuple_count(oldtable);
enr->reldata = NULL;
register_ENR(queryEnv, enr);
nsitem = addRangeTableEntryForENR(pstate, makeRangeVar(NULL, enr->md.name, -1), true);
rte = nsitem->p_rte;
query->rtable = list_append_unique_ptr(query->rtable, rte);
/* Note: already freezed case */
if (freezed)
{
continue;
}
tuplestore_make_sharedV2(oldtable,
get_shareinput_fileset(),
enr->md.name,
tuplestore_get_resowner(oldtable));
tuplestore_freeze(oldtable);
}
foreach(lc2, table->new_tuplestores)
{
Tuplestorestate *newtable = (Tuplestorestate *) lfirst(lc2);
bool freezed = tuplestore_in_freezed(newtable);
EphemeralNamedRelation enr =
palloc(sizeof(EphemeralNamedRelationData));
ParseNamespaceItem *nsitem;
char* shared_name = tuplestore_get_sharedname(newtable);
if (freezed || shared_name)
enr->md.name = pstrdup(shared_name);
else
enr->md.name = MakeDeltaName("new", table->table_id, gp_command_count);
enr->md.reliddesc = table->table_id;
enr->md.tupdesc = CreateTupleDescCopy(table->rel->rd_att);
enr->md.enrtype = ENR_NAMED_TUPLESTORE;
enr->md.enrtuples = tuplestore_tuple_count(newtable);
enr->reldata = NULL;
register_ENR(queryEnv, enr);
nsitem = addRangeTableEntryForENR(pstate, makeRangeVar(NULL, enr->md.name, -1), true);
rte = nsitem->p_rte;
query->rtable = list_append_unique_ptr(query->rtable, rte);
/* Note: already freezed case */
if (freezed)
{
continue;
}
tuplestore_make_sharedV2(newtable,
get_shareinput_fileset(),
enr->md.name,
tuplestore_get_resowner(newtable));
tuplestore_freeze(newtable);
}
}
}
#define DatumGetItemPointer(X) ((ItemPointer) DatumGetPointer(X))
#define PG_GETARG_ITEMPOINTER(n) DatumGetItemPointer(PG_GETARG_DATUM(n))
/*
* ivm_visible_in_prestate
*
* Check visibility of a tuple specified by the tableoid and item pointer
* using the snapshot taken just before the table was modified.
*/
Datum
ivm_visible_in_prestate(PG_FUNCTION_ARGS)
{
Oid tableoid = PG_GETARG_OID(0);
ItemPointer itemPtr = PG_GETARG_ITEMPOINTER(1);
Oid matviewOid = PG_GETARG_OID(2);
/*
CBDB_PG_15_FIXME: Here, we do not use the 4th argument of this function.
Either justify its existence by using it, or remove the 4th argument from
the function definition (catalog change).
*/
ListCell *lc;
bool result = true;
bool found = false;
MemoryContext oldcxt;
ResourceOwner oldowner;
MV_TriggerHashEntry *entry;
MV_TriggerTable *table = NULL;
entry = (MV_TriggerHashEntry *) hash_search(mv_trigger_info,
(void *) &matviewOid,
HASH_FIND, &found);
Assert (found && entry != NULL);
foreach(lc, entry->tables)
{
table = (MV_TriggerTable *) lfirst(lc);
if (table->table_id == tableoid)
break;
}
Assert (table != NULL);
if (table->rel == NULL && table->slot == NULL)
{
oldowner = CurrentResourceOwner;
CurrentResourceOwner = entry->resowner;
oldcxt = MemoryContextSwitchTo(entry->context);
/* Lazy open relation */
table->rel = table_open(tableoid, NoLock);
table->slot = MakeSingleTupleTableSlot(RelationGetDescr(table->rel), table_slot_callbacks(table->rel));
MemoryContextSwitchTo(oldcxt);
CurrentResourceOwner = oldowner;
}
result = table_tuple_fetch_row_version(table->rel, itemPtr, entry->snapshot, table->slot);
ExecClearTuple(table->slot);
elogif(Debug_print_ivm, INFO, "IVM tableoid: %d, ctid: %s, visible %d.", tableoid, ItemPointerToString(itemPtr), result);
PG_RETURN_BOOL(result);
}
/*
* get_prestate_rte
*
* Rewrite RTE of the modified table to a subquery which represents
* "pre-state" table. The original RTE is saved in table->rte_original.
*/
static RangeTblEntry*
get_prestate_rte(RangeTblEntry *rte, MV_TriggerTable *table,
QueryEnvironment *queryEnv, Oid matviewid)
{
StringInfoData str;
RawStmt *raw;
Query *subquery;
Relation rel;
ParseState *pstate;
char *relname;
ListCell *lc;
pstate = make_parsestate(NULL);
pstate->p_queryEnv = queryEnv;
pstate->p_expr_kind = EXPR_KIND_SELECT_TARGET;
/*
* We can use NoLock here since AcquireRewriteLocks should
* have locked the relation already.
*/
rel = table_open(table->table_id, NoLock);
relname = quote_qualified_identifier(
get_namespace_name(RelationGetNamespace(rel)),
RelationGetRelationName(rel));
table_close(rel, NoLock);
/*
* Filtering inserted row using the snapshot taken before the table
* is modified. <ctid,gp_segment_id> is required for maintaining outer join views.
*/
initStringInfo(&str);
appendStringInfo(&str,
"SELECT t.* FROM %s t"
" WHERE pg_catalog.ivm_visible_in_prestate(t.tableoid, t.ctid, %d::pg_catalog.oid, t.gp_segment_id)",
relname, matviewid);
/*
* Append deleted rows contained in old transition tables.
*/
foreach(lc, table->old_tuplestores)
{
Tuplestorestate *tuplestore = (Tuplestorestate *) lfirst(lc);
appendStringInfo(&str, " UNION ALL ");
appendStringInfo(&str," SELECT * FROM %s",
tuplestore_get_sharedname(tuplestore));
}
elogif(Debug_print_ivm, INFO, "IVM execute prestate visibilty chek new %s", str.data);
/* Get a subquery representing pre-state of the table */
raw = (RawStmt*)linitial(raw_parser(str.data, RAW_PARSE_DEFAULT));
subquery = transformStmt(pstate, raw->stmt);
/* save the original RTE */
table->original_rte = copyObject(rte);
rte->rtekind = RTE_SUBQUERY;
rte->subquery = subquery;
rte->security_barrier = false;
/* Clear fields that should not be set in a subquery RTE */
rte->relid = InvalidOid;
rte->relkind = 0;
rte->rellockmode = 0;
rte->tablesample = NULL;
rte->inh = false; /* must not be set for a subquery */
return rte;
}
/*
* replace_rte_with_delta
*
* Replace RTE of the modified table with a single table delta that combine its
* all transition tables.
*/
static RangeTblEntry*
replace_rte_with_delta(RangeTblEntry *rte, MV_TriggerTable *table, bool is_new,
QueryEnvironment *queryEnv)
{
StringInfoData str;
ParseState *pstate;
RawStmt *raw;
Query *sub;
int i;
ListCell *lc;
List *ts = is_new ? table->new_tuplestores : table->old_tuplestores;
/* the previous RTE must be a subquery which represents "pre-state" table */
Assert(rte->rtekind == RTE_SUBQUERY);
/* Create a ParseState for rewriting the view definition query */
pstate = make_parsestate(NULL);
pstate->p_queryEnv = queryEnv;
pstate->p_expr_kind = EXPR_KIND_SELECT_TARGET;
initStringInfo(&str);
i = 0;
foreach(lc, ts)
{
Tuplestorestate *tuplestore = (Tuplestorestate *) lfirst(lc);
if (i > 0)
appendStringInfo(&str, " UNION ALL ");
appendStringInfo(&str,
" SELECT * FROM %s",
tuplestore_get_sharedname(tuplestore));
i++;
}
raw = (RawStmt*)linitial(raw_parser(str.data, RAW_PARSE_DEFAULT));
sub = transformStmt(pstate, raw->stmt);
/*
* Update the subquery so that it represents the combined transition
* table. Note that we leave the security_barrier and securityQuals
* fields so that the subquery relation can be protected by the RLS
* policy as same as the modified table.
*/
rte->rtekind = RTE_SUBQUERY;
rte->subquery = sub;
rte->security_barrier = false;
/* Clear fields that should not be set in a subquery RTE */
rte->relid = InvalidOid;
rte->relkind = 0;
rte->rellockmode = 0;
rte->tablesample = NULL;
rte->inh = false; /* must not be set for a subquery */
return rte;
}
/*
* rewrite_query_for_counting_and_aggregates
*
* Rewrite query for counting duplicated tuples and aggregate functions.
*/
static Query *
rewrite_query_for_counting_and_aggregates(Query *query, ParseState *pstate)
{
TargetEntry *tle_count;
FuncCall *fn;
Node *node;
/* For aggregate views */
if (query->hasAggs)
{
ListCell *lc;
List *aggs = NIL;
AttrNumber next_resno = list_length(query->targetList) + 1;
foreach(lc, query->targetList)
{
TargetEntry *tle = (TargetEntry *) lfirst(lc);
if (IsA(tle->expr, Aggref))
makeIvmAggColumn(pstate, (Aggref *)tle->expr, tle->resname, &next_resno, &aggs);
}
query->targetList = list_concat(query->targetList, aggs);
}
/* Add count(*) for counting distinct tuples in views */
fn = makeFuncCall(SystemFuncName("count"), NIL, COERCE_EXPLICIT_CALL, -1);
fn->agg_star = true;
if (!query->groupClause && !query->hasAggs)
query->groupClause = transformDistinctClause(NULL, &query->targetList, query->sortClause, false);
node = ParseFuncOrColumn(pstate, fn->funcname, NIL, NULL, fn, false, -1);
tle_count = makeTargetEntry((Expr *) node,
list_length(query->targetList) + 1,
pstrdup("__ivm_count__"),
false);
query->targetList = lappend(query->targetList, tle_count);
query->hasAggs = true;
return query;
}
/*
* calc_delta
*
* Calculate view deltas generated under the modification of a table specified
* by the RTE index.
*/
static char*
calc_delta_old(Tuplestorestate *ts, Relation matviewRel, MV_TriggerTable *table, int rte_index, Query *query,
DestReceiver *dest_old,
TupleDesc *tupdesc_old,
QueryEnvironment *queryEnv)
{
ListCell *lc = list_nth_cell(query->rtable, rte_index - 1);
RangeTblEntry *rte = (RangeTblEntry *) lfirst(lc);
uint64 es_processed = 0;
char *enrname = NULL;
/* Generate old delta */
if (list_length(table->old_tuplestores) > 0)
{
/* Replace the modified table with the old delta table and calculate the old view delta. */
replace_rte_with_delta(rte, table, false, queryEnv);
enrname = MakeDeltaName(OLD_DELTA_ENRNAME, RelationGetRelid(matviewRel), gp_command_count);
es_processed = refresh_matview_memoryfill(dest_old, query, queryEnv,
tupdesc_old, enrname, matviewRel);
if (ts)
tuplestore_set_tuplecount(ts, es_processed);
}
return enrname;
}
static char*
calc_delta_new(Tuplestorestate *ts, Relation matviewRel, MV_TriggerTable *table, int rte_index, Query *query,
DestReceiver *dest_new,
TupleDesc *tupdesc_new,
QueryEnvironment *queryEnv)
{
ListCell *lc = list_nth_cell(query->rtable, rte_index - 1);
RangeTblEntry *rte = (RangeTblEntry *) lfirst(lc);
uint64 es_processed = 0;
char *enrname = NULL;
/* Generate new delta */
if (list_length(table->new_tuplestores) > 0)
{
/* Replace the modified table with the new delta table and calculate the new view delta*/
replace_rte_with_delta(rte, table, true, queryEnv);
enrname = MakeDeltaName(NEW_DELTA_ENRNAME, RelationGetRelid(matviewRel), gp_command_count);
es_processed = refresh_matview_memoryfill(dest_new, query, queryEnv,
tupdesc_new, enrname, matviewRel);
if (ts)
tuplestore_set_tuplecount(ts, es_processed);
}
return enrname;
}
/*
* rewrite_query_for_postupdate_state
*
* Rewrite the query so that the specified base table's RTEs will represent
* "post-update" state of tables. This is called after the view delta
* calculation due to changes on this table finishes.
*/
static Query*
rewrite_query_for_postupdate_state(Query *query, MV_TriggerTable *table, int rte_index)
{
ListCell *lc = list_nth_cell(query->rtable, rte_index - 1);
/* Retore the original RTE */
lfirst(lc) = table->original_rte;
return query;
}
#define IVM_colname(type, col) makeObjectName("__ivm_" type, col, "_")
/*
* apply_delta
*
* Apply deltas to the materialized view. In outer join cases, this requires
* the view maintenance graph.
*/
static void
apply_delta(char *old_enr, char *new_enr, MV_TriggerTable *table, Oid matviewOid,
Tuplestorestate *old_tuplestores, Tuplestorestate *new_tuplestores,
TupleDesc tupdesc_old, TupleDesc tupdesc_new,
Query *query, bool use_count, char *count_colname)
{
StringInfoData querybuf;
StringInfoData target_list_buf;
StringInfo aggs_list_buf = NULL;
StringInfo aggs_set_old = NULL;
StringInfo aggs_set_new = NULL;
Relation matviewRel;
char *matviewname;
ListCell *lc;
int i;
List *keys = NIL;
/*
* get names of the materialized view and delta tables
*/
matviewRel = table_open(matviewOid, NoLock);
matviewname = quote_qualified_identifier(get_namespace_name(RelationGetNamespace(matviewRel)),
RelationGetRelationName(matviewRel));
/*
* Build parts of the maintenance queries
*/
initStringInfo(&querybuf);
initStringInfo(&target_list_buf);
if (query->hasAggs)
{
if (old_tuplestores && tuplestore_tuple_count(old_tuplestores) > 0)
aggs_set_old = makeStringInfo();
if (new_tuplestores && tuplestore_tuple_count(new_tuplestores) > 0)
aggs_set_new = makeStringInfo();
aggs_list_buf = makeStringInfo();
}
/* build string of target list */
for (i = 0; i < matviewRel->rd_att->natts; i++)
{
Form_pg_attribute attr = TupleDescAttr(matviewRel->rd_att, i);
char *resname = NameStr(attr->attname);
if (i != 0)
appendStringInfo(&target_list_buf, ", ");
appendStringInfo(&target_list_buf, "%s", quote_qualified_identifier(NULL, resname));
}
i = 0;
foreach (lc, query->targetList)
{
TargetEntry *tle = (TargetEntry *) lfirst(lc);
Form_pg_attribute attr = TupleDescAttr(matviewRel->rd_att, i);
char *resname = NameStr(attr->attname);
i++;
if (tle->resjunk)
continue;
/*
* For views without aggregates, all attributes are used as keys to identify a
* tuple in a view.
*/
if (!query->hasAggs)
keys = lappend(keys, attr);
/* For views with aggregates, we need to build SET clause for updating aggregate
* values. */
if (query->hasAggs && IsA(tle->expr, Aggref))
{
Aggref *aggref = (Aggref *) tle->expr;
const char *aggname = get_func_name(aggref->aggfnoid);
/*
* We can use function names here because it is already checked if these
* can be used in IMMV by its OID at the definition time.
*/
/* count */
if (!strcmp(aggname, "count"))
append_set_clause_for_count(resname, aggs_set_old, aggs_set_new, aggs_list_buf);
/* sum */
else if (!strcmp(aggname, "sum"))
append_set_clause_for_sum(resname, aggs_set_old, aggs_set_new, aggs_list_buf);
/* avg */
else if (!strcmp(aggname, "avg"))
append_set_clause_for_avg(resname, aggs_set_old, aggs_set_new, aggs_list_buf,
format_type_be(aggref->aggtype));
else
elog(ERROR, "unsupported aggregate function: %s", aggname);
}
}
/* If we have GROUP BY clause, we use its entries as keys. */
if (query->hasAggs && query->groupClause)
{
foreach (lc, query->groupClause)
{
SortGroupClause *sgcl = (SortGroupClause *) lfirst(lc);
TargetEntry *tle = get_sortgroupclause_tle(sgcl, query->targetList);
Form_pg_attribute attr = TupleDescAttr(matviewRel->rd_att, tle->resno - 1);
keys = lappend(keys, attr);
}
}
/* Start maintaining the materialized view. */
OpenMatViewIncrementalMaintenance();
/* Open SPI context. */
if (SPI_connect() != SPI_OK_CONNECT)
elog(ERROR, "SPI_connect failed");
/* For tuple deletion */
if (old_tuplestores && tuplestore_tuple_count(old_tuplestores) > 0)
{
int rc;
EphemeralNamedRelation enr = palloc(sizeof(EphemeralNamedRelationData));
/* convert tuplestores to ENR, and register for SPI */
enr->md.name = pstrdup(old_enr);
enr->md.reliddesc = matviewOid;
enr->md.tupdesc = CreateTupleDescCopy(tupdesc_old);
enr->md.enrtype = ENR_NAMED_TUPLESTORE;
enr->md.enrtuples = tuplestore_tuple_count(old_tuplestores);
enr->reldata = NULL;
rc = SPI_register_relation(enr);
if (rc != SPI_OK_REL_REGISTER)
elog(ERROR, "SPI_register failed");
elogif(Debug_print_ivm, INFO, "IVM apply old enr %s, command_count: %d", enr->md.name, gp_command_count);
if (use_count)
/* apply old delta and get rows to be recalculated */
apply_old_delta_with_count(matviewname, RelationGetRelid(matviewRel), enr->md.name,
keys, aggs_list_buf, aggs_set_old,
count_colname);
else
apply_old_delta(matviewname, enr->md.name, keys);
}
/* For tuple insertion */
if (new_tuplestores && tuplestore_tuple_count(new_tuplestores) > 0)
{
int rc;
EphemeralNamedRelation enr = palloc(sizeof(EphemeralNamedRelationData));
/* convert tuplestores to ENR, and register for SPI */
enr->md.name = pstrdup(new_enr);
enr->md.reliddesc = matviewOid;
enr->md.tupdesc = CreateTupleDescCopy(tupdesc_new);
enr->md.enrtype = ENR_NAMED_TUPLESTORE;
enr->md.enrtuples = tuplestore_tuple_count(new_tuplestores);
enr->reldata = NULL;
rc = SPI_register_relation(enr);
if (rc != SPI_OK_REL_REGISTER)
elog(ERROR, "SPI_register failed");
elogif(Debug_print_ivm, INFO, "IVM apply new enr %s, command_count: %d", enr->md.name, gp_command_count);
/* apply new delta */
if (use_count)
apply_new_delta_with_count(matviewname, enr->md.name,
keys, &target_list_buf,
aggs_set_new, count_colname);
else
apply_new_delta(matviewname, enr->md.name, &target_list_buf);
}
/* We're done maintaining the materialized view. */
CloseMatViewIncrementalMaintenance();
table_close(matviewRel, NoLock);
/* Close SPI context. */
if (SPI_finish() != SPI_OK_FINISH)
elog(ERROR, "SPI_finish failed");
}
/*
* append_set_clause_for_count
*
* Append SET clause string for count aggregation to given buffers.
* Also, append resnames required for calculating the aggregate value.
*/
static void
append_set_clause_for_count(const char *resname, StringInfo buf_old,
StringInfo buf_new,StringInfo aggs_list)
{
/* For tuple deletion */
if (buf_old)
{
/* resname = mv.resname - t.resname */
appendStringInfo(buf_old,
", %s = %s",
quote_qualified_identifier(NULL, resname),
get_operation_string(IVM_SUB, resname, "mv", "t", NULL, NULL));
}
/* For tuple insertion */
if (buf_new)
{
/* resname = mv.resname + diff.resname */
appendStringInfo(buf_new,
", %s = %s",
quote_qualified_identifier(NULL, resname),
get_operation_string(IVM_ADD, resname, "mv", "diff", NULL, NULL));
}
appendStringInfo(aggs_list, ", %s",
quote_qualified_identifier("diff", resname)
);
}
/*
* append_set_clause_for_sum
*
* Append SET clause string for sum aggregation to given buffers.
* Also, append resnames required for calculating the aggregate value.
*/
static void
append_set_clause_for_sum(const char *resname, StringInfo buf_old,
StringInfo buf_new, StringInfo aggs_list)
{
char *count_col = IVM_colname("count", resname);
/* For tuple deletion */
if (buf_old)
{
/* sum = mv.sum - t.sum */
appendStringInfo(buf_old,
", %s = %s",
quote_qualified_identifier(NULL, resname),
get_operation_string(IVM_SUB, resname, "mv", "t", count_col, NULL)
);
/* count = mv.count - t.count */
appendStringInfo(buf_old,
", %s = %s",
quote_qualified_identifier(NULL, count_col),
get_operation_string(IVM_SUB, count_col, "mv", "t", NULL, NULL)
);
}
/* For tuple insertion */
if (buf_new)
{
/* sum = mv.sum + diff.sum */
appendStringInfo(buf_new,
", %s = %s",
quote_qualified_identifier(NULL, resname),
get_operation_string(IVM_ADD, resname, "mv", "diff", count_col, NULL)
);
/* count = mv.count + diff.count */
appendStringInfo(buf_new,
", %s = %s",
quote_qualified_identifier(NULL, count_col),
get_operation_string(IVM_ADD, count_col, "mv", "diff", NULL, NULL)
);
}
appendStringInfo(aggs_list, ", %s, %s",
quote_qualified_identifier("diff", resname),
quote_qualified_identifier("diff", IVM_colname("count", resname))
);
}
/*
* append_set_clause_for_avg
*
* Append SET clause string for avg aggregation to given buffers.
* Also, append resnames required for calculating the aggregate value.
*/
static void
append_set_clause_for_avg(const char *resname, StringInfo buf_old,
StringInfo buf_new, StringInfo aggs_list,
const char *aggtype)
{
char *sum_col = IVM_colname("sum", resname);
char *count_col = IVM_colname("count", resname);
/* For tuple deletion */
if (buf_old)
{
/* avg = (mv.sum - t.sum)::aggtype / (mv.count - t.count) */
appendStringInfo(buf_old,
", %s = %s OPERATOR(pg_catalog./) %s",
quote_qualified_identifier(NULL, resname),
get_operation_string(IVM_SUB, sum_col, "mv", "t", count_col, aggtype),
get_operation_string(IVM_SUB, count_col, "mv", "t", NULL, NULL)
);
/* sum = mv.sum - t.sum */
appendStringInfo(buf_old,
", %s = %s",
quote_qualified_identifier(NULL, sum_col),
get_operation_string(IVM_SUB, sum_col, "mv", "t", count_col, NULL)
);
/* count = mv.count - t.count */
appendStringInfo(buf_old,
", %s = %s",
quote_qualified_identifier(NULL, count_col),
get_operation_string(IVM_SUB, count_col, "mv", "t", NULL, NULL)
);
}
/* For tuple insertion */
if (buf_new)
{
/* avg = (mv.sum + diff.sum)::aggtype / (mv.count + diff.count) */
appendStringInfo(buf_new,
", %s = %s OPERATOR(pg_catalog./) %s",
quote_qualified_identifier(NULL, resname),
get_operation_string(IVM_ADD, sum_col, "mv", "diff", count_col, aggtype),
get_operation_string(IVM_ADD, count_col, "mv", "diff", NULL, NULL)
);
/* sum = mv.sum + diff.sum */
appendStringInfo(buf_new,
", %s = %s",
quote_qualified_identifier(NULL, sum_col),
get_operation_string(IVM_ADD, sum_col, "mv", "diff", count_col, NULL)
);
/* count = mv.count + diff.count */
appendStringInfo(buf_new,
", %s = %s",
quote_qualified_identifier(NULL, count_col),
get_operation_string(IVM_ADD, count_col, "mv", "diff", NULL, NULL)
);
}
appendStringInfo(aggs_list, ", %s, %s, %s",
quote_qualified_identifier("diff", resname),
quote_qualified_identifier("diff", IVM_colname("sum", resname)),
quote_qualified_identifier("diff", IVM_colname("count", resname))
);
}
/*
* get_operation_string
*
* Build a string to calculate the new aggregate values.
*/
static char *
get_operation_string(IvmOp op, const char *col, const char *arg1, const char *arg2,
const char* count_col, const char *castType)
{
StringInfoData buf;
StringInfoData castString;
char *col1 = quote_qualified_identifier(arg1, col);
char *col2 = quote_qualified_identifier(arg2, col);
char op_char = (op == IVM_SUB ? '-' : '+');
initStringInfo(&buf);
initStringInfo(&castString);
if (castType)
appendStringInfo(&castString, "::%s", castType);
if (!count_col)
{
/*
* If the attributes don't have count columns then calc the result
* by using the operator simply.
*/
appendStringInfo(&buf, "(%s OPERATOR(pg_catalog.%c) %s)%s",
col1, op_char, col2, castString.data);
}
else
{
/*
* If the attributes have count columns then consider the condition
* where the result becomes NULL.
*/
char *null_cond = get_null_condition_string(op, arg1, arg2, count_col);
appendStringInfo(&buf,
"(CASE WHEN %s THEN NULL "
"WHEN %s IS NULL THEN %s "
"WHEN %s IS NULL THEN %s "
"ELSE (%s OPERATOR(pg_catalog.%c) %s)%s END)",
null_cond,
col1, col2,
col2, col1,
col1, op_char, col2, castString.data
);
}
return buf.data;
}
/*
* get_null_condition_string
*
* Build a predicate string for CASE clause to check if an aggregate value
* will become NULL after the given operation is applied.
*/
static char *
get_null_condition_string(IvmOp op, const char *arg1, const char *arg2,
const char* count_col)
{
StringInfoData null_cond;
initStringInfo(&null_cond);
switch (op)
{
case IVM_ADD:
appendStringInfo(&null_cond,
"%s OPERATOR(pg_catalog.=) 0 AND %s OPERATOR(pg_catalog.=) 0",
quote_qualified_identifier(arg1, count_col),
quote_qualified_identifier(arg2, count_col)
);
break;
case IVM_SUB:
appendStringInfo(&null_cond,
"%s OPERATOR(pg_catalog.=) %s",
quote_qualified_identifier(arg1, count_col),
quote_qualified_identifier(arg2, count_col)
);
break;
default:
elog(ERROR,"unknown operation");
}
return null_cond.data;
}
/*
* apply_old_delta_with_count
*
* Execute a query for applying a delta table given by deltaname_old
* which contains tuples to be deleted from a materialized view given by
* matviewname. This is used when counting is required, that is, the view
* has aggregate or distinct.
*
* If the view doesn't have aggregates or has GROUP BY, this requires a keys
* list to identify a tuple in the view. If the view has aggregates, this
* requires strings representing resnames of aggregates and SET clause for
* updating aggregate values.
*/
static void
apply_old_delta_with_count(const char *matviewname, Oid matviewRelid, const char *deltaname_old,
List *keys, StringInfo aggs_list, StringInfo aggs_set,
const char *count_colname)
{
const char * tempTableName;
StringInfoData querybuf;
char *match_cond;
bool agg_without_groupby = (list_length(keys) == 0);
/* build WHERE condition for searching tuples to be deleted */
match_cond = get_matching_condition_string(keys);
/* CBDB_IVM_FIXME CBDB does not support multiple-write CTE. Revert to original
* query when it will be supported.
*/
#if 0
initStringInfo(&querybuf);
appendStringInfo(&querybuf,
"WITH t AS (" /* collecting tid of target tuples in the view */
"SELECT diff.%s, " /* count column */
"(diff.%s OPERATOR(pg_catalog.=) mv.%s AND %s) AS for_dlt, "
"mv.ctid "
"%s " /* aggregate columns */
"FROM %s AS mv, %s AS diff "
"WHERE %s" /* tuple matching condition */
"), updt AS (" /* update a tuple if this is not to be deleted */
"UPDATE %s AS mv SET %s = mv.%s OPERATOR(pg_catalog.-) t.%s "
"%s" /* SET clauses for aggregates */
"FROM t WHERE mv.ctid OPERATOR(pg_catalog.=) t.ctid AND NOT for_dlt "
")"
/* delete a tuple if this is to be deleted */
"DELETE FROM %s AS mv USING t "
"WHERE mv.ctid OPERATOR(pg_catalog.=) t.ctid AND for_dlt",
count_colname,
count_colname, count_colname, (agg_without_groupby ? "false" : "true"),
(aggs_list != NULL ? aggs_list->data : ""),
matviewname, deltaname_old,
match_cond,
matviewname, count_colname, count_colname, count_colname,
(aggs_set != NULL ? aggs_set->data : ""),
matviewname);
#else
/* CBDB_IVM_FIXME: use tuplestore to replace temp table. */
tempTableName = MakeDeltaName("temp_old_delta", matviewRelid, gp_command_count);
initStringInfo(&querybuf);
appendStringInfo(&querybuf,
"CREATE TEMP TABLE %s AS SELECT diff.%s, " /* count column */
"(diff.%s OPERATOR(pg_catalog.=) mv.%s AND %s) AS for_dlt, "
"mv.ctid AS tid, mv.gp_segment_id AS gid"
"%s " /* aggregate columns */
"FROM %s AS mv, %s AS diff "
"WHERE %s DISTRIBUTED RANDOMLY", /* tuple matching condition */
tempTableName,
count_colname,
count_colname, count_colname, (agg_without_groupby ? "false" : "true"),
(aggs_list != NULL ? aggs_list->data : ""),
matviewname, deltaname_old,
match_cond);
/* Create the temporary table. */
if (SPI_exec(querybuf.data, 0) != SPI_OK_UTILITY)
elog(ERROR, "SPI_exec failed: %s", querybuf.data);
elogif(Debug_print_ivm, INFO, "IVM apply_old_delta_with_count select: %s", querybuf.data);
/* Search for matching tuples from the view and update or delete if found. */
resetStringInfo(&querybuf);
appendStringInfo(&querybuf,
"UPDATE %s AS mv SET %s = mv.%s OPERATOR(pg_catalog.-) t.%s "
"%s" /* SET clauses for aggregates */
"FROM %s t WHERE mv.ctid OPERATOR(pg_catalog.=) t.tid"
" AND mv.gp_segment_id OPERATOR(pg_catalog.=) t.gid"
" AND NOT for_dlt ",
matviewname, count_colname, count_colname, count_colname,
(aggs_set != NULL ? aggs_set->data : ""), tempTableName);
if (SPI_exec(querybuf.data, 0) != SPI_OK_UPDATE)
elog(ERROR, "SPI_exec failed: %s", querybuf.data);
elogif(Debug_print_ivm, INFO, "IVM apply_old_delta_with_count update: %s", querybuf.data);
resetStringInfo(&querybuf);
appendStringInfo(&querybuf,
"DELETE FROM %s AS mv USING %s t "
"WHERE mv.ctid OPERATOR(pg_catalog.=) t.tid"
" AND mv.gp_segment_id OPERATOR(pg_catalog.=) t.gid"
" AND for_dlt",
matviewname, tempTableName);
#endif
if (SPI_exec(querybuf.data, 0) != SPI_OK_DELETE)
elog(ERROR, "SPI_exec failed: %s", querybuf.data);
elogif(Debug_print_ivm, INFO, "IVM apply_old_delta_with_count delete: %s", querybuf.data);
/* Clean up temp tables. */
resetStringInfo(&querybuf);
appendStringInfo(&querybuf, "DROP TABLE %s", tempTableName);
if (SPI_exec(querybuf.data, 0) != SPI_OK_UTILITY)
elog(ERROR, "SPI_exec failed: %s", querybuf.data);
}
/*
* apply_old_delta
*
* Execute a query for applying a delta table given by deltaname_old
* which contains tuples to be deleted from a materialized view given by
* matviewname. This is used when counting is not required.
*/
static void
apply_old_delta(const char *matviewname, const char *deltaname_old,
List *keys)
{
StringInfoData querybuf;
StringInfoData keysbuf;
char *match_cond;
ListCell *lc;
/* build WHERE condition for searching tuples to be deleted */
match_cond = get_matching_condition_string(keys);
/* build string of keys list */
initStringInfo(&keysbuf);
foreach (lc, keys)
{
Form_pg_attribute attr = (Form_pg_attribute) lfirst(lc);
char *resname = NameStr(attr->attname);
appendStringInfo(&keysbuf, "%s", quote_qualified_identifier("mv", resname));
if (lnext(keys, lc))
appendStringInfo(&keysbuf, ", ");
}
/* Search for matching tuples from the view and update or delete if found. */
initStringInfo(&querybuf);
appendStringInfo(&querybuf,
"DELETE FROM %s WHERE (ctid, gp_segment_id) IN ("
"SELECT tid, sid FROM (SELECT pg_catalog.row_number() over (partition by %s) AS \"__ivm_row_number__\","
"mv.ctid AS tid, mv.gp_segment_id as sid,"
"diff.\"__ivm_count__\""
"FROM %s AS mv, %s AS diff "
"WHERE %s) v "
"WHERE v.\"__ivm_row_number__\" OPERATOR(pg_catalog.<=) v.\"__ivm_count__\")",
matviewname,
keysbuf.data,
matviewname, deltaname_old,
match_cond);
if (SPI_exec(querybuf.data, 0) != SPI_OK_DELETE)
elog(ERROR, "SPI_exec failed: %s", querybuf.data);
elogif(Debug_print_ivm, INFO, "IVM apply_old_delta delete: %s", querybuf.data);
}
/*
* apply_new_delta_with_count
*
* Execute a query for applying a delta table given by deltaname_new
* which contains tuples to be inserted into a materialized view given by
* matviewname. This is used when counting is required, that is, the view
* has aggregate or distinct. Also, when a table in EXISTS sub queries
* is modified.
*
* If the view doesn't have aggregates or has GROUP BY, this requires a keys
* list to identify a tuple in the view. If the view has aggregates, this
* requires strings representing SET clause for updating aggregate values.
*/
static void
apply_new_delta_with_count(const char *matviewname, const char* deltaname_new,
List *keys, StringInfo target_list, StringInfo aggs_set,
const char* count_colname)
{
StringInfoData querybuf;
StringInfoData returning_keys;
ListCell *lc;
char *match_cond = "";
/* build WHERE condition for searching tuples to be updated */
match_cond = get_matching_condition_string(keys);
/* build string of keys list */
initStringInfo(&returning_keys);
if (keys)
{
foreach (lc, keys)
{
Form_pg_attribute attr = (Form_pg_attribute) lfirst(lc);
char *resname = NameStr(attr->attname);
appendStringInfo(&returning_keys, "%s", quote_qualified_identifier("mv", resname));
if (lnext(keys, lc))
appendStringInfo(&returning_keys, ", ");
}
}
else
appendStringInfo(&returning_keys, "NULL");
/* Search for matching tuples from the view and update if found or insert if not. */
initStringInfo(&querybuf);
#if 0
appendStringInfo(&querybuf,
"WITH updt AS (" /* update a tuple if this exists in the view */
"UPDATE %s AS mv SET %s = mv.%s OPERATOR(pg_catalog.+) diff.%s "
"%s " /* SET clauses for aggregates */
"FROM %s AS diff "
"WHERE %s " /* tuple matching condition */
"RETURNING %s" /* returning keys of updated tuples */
") INSERT INTO %s (%s)" /* insert a new tuple if this doesn't exist */
"SELECT %s FROM %s AS diff "
"WHERE NOT EXISTS (SELECT 1 FROM updt AS mv WHERE %s);",
matviewname, count_colname, count_colname, count_colname,
(aggs_set != NULL ? aggs_set->data : ""),
deltaname_new,
match_cond,
returning_keys.data,
matviewname, target_list->data,
target_list->data, deltaname_new,
match_cond);
#else
appendStringInfo(&querybuf,
"UPDATE %s AS mv SET %s = mv.%s OPERATOR(pg_catalog.+) diff.%s "
"%s " /* SET clauses for aggregates */
"FROM %s AS diff "
"WHERE %s ", /* tuple matching condition */
matviewname, count_colname, count_colname, count_colname,
(aggs_set != NULL ? aggs_set->data : ""),
deltaname_new,
match_cond);
if (SPI_exec(querybuf.data, 0) != SPI_OK_UPDATE)
elog(ERROR, "SPI_exec failed: %s", querybuf.data);
elogif(Debug_print_ivm, INFO, "IVM apply_new_delta_with_count update: %s", querybuf.data);
resetStringInfo(&querybuf);
appendStringInfo(&querybuf,
"INSERT INTO %s (%s)" /* insert a new tuple if this doesn't exist */
"SELECT %s FROM %s AS diff "
"WHERE NOT EXISTS (SELECT 1 FROM %s AS mv WHERE %s);",
matviewname, target_list->data,
target_list->data, deltaname_new,
matviewname, match_cond);
#endif
if (SPI_exec(querybuf.data, 0) != SPI_OK_INSERT)
elog(ERROR, "SPI_exec failed: %s", querybuf.data);
elogif(Debug_print_ivm,INFO, "IVM apply_new_delta_with_count insert: %s", querybuf.data);
}
/*
* apply_new_delta
*
* Execute a query for applying a delta table given by deltaname_new
* which contains tuples to be inserted into a materialized view given by
* matviewname. This is used when counting is not required.
*/
static void
apply_new_delta(const char *matviewname, const char *deltaname_new,
StringInfo target_list)
{
StringInfoData querybuf;
/* Search for matching tuples from the view and update or delete if found. */
initStringInfo(&querybuf);
appendStringInfo(&querybuf,
"INSERT INTO %s (%s) SELECT %s FROM ("
"SELECT diff.*, pg_catalog.generate_series(1, diff.\"__ivm_count__\")"
" AS __ivm_generate_series__ "
"FROM %s AS diff) AS v",
matviewname, target_list->data, target_list->data,
deltaname_new);
if (SPI_exec(querybuf.data, 0) != SPI_OK_INSERT)
elog(ERROR, "SPI_exec failed: %s", querybuf.data);
elogif(Debug_print_ivm, INFO, "IVM apply_new_delta: %s", querybuf.data);
}
/*
* get_matching_condition_string
*
* Build a predicate string for looking for a tuple with given keys.
*/
static char *
get_matching_condition_string(List *keys)
{
StringInfoData match_cond;
ListCell *lc;
/* If there is no key columns, the condition is always true. */
if (keys == NIL)
return "true";
initStringInfo(&match_cond);
foreach (lc, keys)
{
Form_pg_attribute attr = (Form_pg_attribute) lfirst(lc);
char *resname = NameStr(attr->attname);
char *mv_resname = quote_qualified_identifier("mv", resname);
char *diff_resname = quote_qualified_identifier("diff", resname);
Oid typid = attr->atttypid;
/* Considering NULL values, we can not use simple = operator. */
appendStringInfo(&match_cond, "(");
generate_equal(&match_cond, typid, mv_resname, diff_resname);
appendStringInfo(&match_cond, " OR (%s IS NULL AND %s IS NULL))",
mv_resname, diff_resname);
if (lnext(keys, lc))
appendStringInfo(&match_cond, " AND ");
}
return match_cond.data;
}
/*
* generate_equal
*
* Generate an equality clause using given operands' default equality
* operator.
*/
static void
generate_equal(StringInfo querybuf, Oid opttype,
const char *leftop, const char *rightop)
{
TypeCacheEntry *typentry;
typentry = lookup_type_cache(opttype, TYPECACHE_EQ_OPR);
if (!OidIsValid(typentry->eq_opr))
ereport(ERROR,
(errcode(ERRCODE_UNDEFINED_FUNCTION),
errmsg("could not identify an equality operator for type %s",
format_type_be_qualified(opttype))));
generate_operator_clause(querybuf,
leftop, opttype,
typentry->eq_opr,
rightop, opttype);
}
/*
* mv_InitHashTables
*/
void
mv_InitHashTables(void)
{
HASHCTL info, ctl;
memset(&info, 0, sizeof(info));
info.keysize = sizeof(Oid);
info.entrysize = sizeof(MV_TriggerHashEntry);
mv_trigger_info = hash_create("MV trigger info",
MV_INIT_QUERYHASHSIZE,
&info, HASH_ELEM | HASH_BLOBS);
memset(&ctl, 0, sizeof(ctl));
ctl.keysize = SNAPSHOT_KEYSIZE;
ctl.entrysize = sizeof(SnapshotDumpEntry);
ctl.hash = string_hash;
mv_trigger_snapshot = ShmemInitHash("MV trigger snapshot",
MV_INIT_SNAPSHOTHASHSIZE,
MV_INIT_SNAPSHOTHASHSIZE,
&ctl, HASH_ELEM | HASH_FUNCTION);
}
Size
mv_TableShmemSize(void)
{
return hash_estimate_size(MV_INIT_SNAPSHOTHASHSIZE, sizeof(SnapshotDumpEntry));
}
/*
* AtAbort_IVM
*
* Clean up hash entries for all materialized views. This is called at
* transaction abort.
*/
void
AtAbort_IVM()
{
AtEOXact_IVM(false);
}
void
AtEOXact_IVM(bool isCommit)
{
ListCell *lc;
List *mvlist = AfterTriggerGetMvList();
foreach(lc, mvlist)
{
Oid matviewOid = lfirst_oid(lc);
{
MV_TriggerHashEntry *entry = (MV_TriggerHashEntry *) hash_search(mv_trigger_info,
(void *) &matviewOid,
HASH_FIND, NULL);
if (entry != NULL)
{
entry->reference--;
ResourceOwner resowner = entry->resowner;
MemoryContext ctx = entry->context;
if (entry->reference == 0)
{
clean_up_IVM_hash_entry(entry, isCommit);
}
if (resowner)
{
ResourceOwnerRelease(resowner,
RESOURCE_RELEASE_BEFORE_LOCKS,
false, /* isCommit */
false); /* isTopLevel */
ResourceOwnerRelease(resowner,
RESOURCE_RELEASE_LOCKS,
false, /* isCommit */
false); /* isTopLevel */
ResourceOwnerRelease(resowner,
RESOURCE_RELEASE_AFTER_LOCKS,
false, /* isCommit */
false); /* isTopLevel */
ResourceOwnerDelete(resowner);
}
if (ctx)
{
MemoryContextReset(ctx);
MemoryContextDelete(ctx);
}
}
}
}
}
/*
* clean_up_IVM_hash_entry
*
* Clean up tuple stores and hash entries for a materialized view after its
* maintenance finished.
*/
static void
clean_up_IVM_hash_entry(MV_TriggerHashEntry *entry, bool is_abort)
{
ListCell *lc;
foreach(lc, entry->tables)
{
MV_TriggerTable *table = (MV_TriggerTable *) lfirst(lc);
if (table->old_tuplestores)
{
list_free(table->old_tuplestores);
table->old_tuplestores = NIL;
}
if (table->new_tuplestores)
{
list_free(table->new_tuplestores);
table->new_tuplestores = NIL;
}
if (!is_abort)
{
if (CurrentResourceOwner == entry->resowner)
{
if (table->slot)
{
ExecDropSingleTupleTableSlot(table->slot);
table->slot = NULL;
}
if (table->rel)
{
table_close(table->rel, NoLock);
table->rel = NULL;
}
}
}
}
if (entry->tables)
{
list_free(entry->tables);
entry->tables = NIL;
}
clean_up_ivm_dsm_entry(entry);
hash_search(mv_trigger_info, (void *) &entry->matview_id, HASH_REMOVE, NULL);
}
/*
* clean_up_ivm_dsm_entry
*
* This function is responsible for cleaning up the DSM entry associated with
* a given materialized view trigger hash entry.
*/
static void
clean_up_ivm_dsm_entry(MV_TriggerHashEntry *entry)
{
if (entry->snapname && entry->snapname[0] != '\0' && Gp_is_writer)
{
bool found;
SnapshotDumpEntry *pDump;
LWLockAcquire(GPIVMResLock, LW_EXCLUSIVE);
pDump = (SnapshotDumpEntry *) hash_search(mv_trigger_snapshot,
(void *)entry->snapname,
HASH_FIND, &found);
if (found)
{
/* Only creater can detach the dsm segment. */
if (pDump->handle && pDump->pid == MyProcPid)
{
Assert(entry->matview_id == pDump->matview_id);
dsm_detach(pDump->segment);
pDump->handle = DSM_HANDLE_INVALID;
}
hash_search(mv_trigger_snapshot, (void *)entry->snapname, HASH_REMOVE, NULL);
entry->snapname = NULL;
}
LWLockRelease(GPIVMResLock);
}
}
/*
* isIvmName
*
* Check if this is an IVM hidden column from the name.
*/
bool
isIvmName(const char *s)
{
if (s)
return (strncmp(s, "__ivm_", 6) == 0);
return false;
}
/*
* This function is responsible for refreshing the materialized view.
* It fills the memory with the result of the query execution.
* The result is then sent to the specified destination receiver.
*/
static uint64
refresh_matview_memoryfill(DestReceiver *dest, Query *query,
QueryEnvironment *queryEnv,
TupleDesc *resultTupleDesc,
const char *queryString, Relation matviewRel)
{
List *rewritten;
PlannedStmt *plan;
QueryDesc *queryDesc;
Query *copied_query;
uint64 processed;
List *saved_dispatch_oids = GetAssignedOidsForDispatch();
/* Lock and rewrite, using a copy to preserve the original query. */
copied_query = copyObject(query);
AcquireRewriteLocks(copied_query, true, false);
rewritten = QueryRewrite(copied_query);
/* SELECT should never rewrite to more or less than one SELECT query */
if (list_length(rewritten) != 1)
elog(ERROR, "unexpected rewrite result for refresh_matview_memoryfill.");
query = (Query *) linitial(rewritten);
query->parentStmtType = PARENTSTMTTYPE_CTAS;
query->intoPolicy = matviewRel->rd_cdbpolicy;
/* Check for user-requested abort. */
CHECK_FOR_INTERRUPTS();
/* Close parallel insert to tuplestore. */
plan = pg_plan_query(query, queryString, CURSOR_OPT_PARALLEL_OK, NULL);
plan->intoClause = makeIvmIntoClause(queryString, matviewRel);
plan->refreshClause = NULL;
/*
* Use a snapshot with an updated command ID to ensure this query sees
* results of any previously executed queries. (This could only matter if
* the planner executed an allegedly-stable function that changed the
* database contents, but let's do it anyway to be safe.)
*/
PushCopiedSnapshot(GetActiveSnapshot());
UpdateActiveSnapshotCommandId();
/* Create a QueryDesc, redirecting output to our tuple receiver */
queryDesc = CreateQueryDesc(plan, queryString,
GetActiveSnapshot(), InvalidSnapshot,
dest, NULL, queryEnv ? queryEnv: NULL, 0);
RestoreOidAssignments(saved_dispatch_oids);
/* call ExecutorStart to prepare the plan for execution */
ExecutorStart(queryDesc, 0);
/* run the plan */
ExecutorRun(queryDesc, ForwardScanDirection, 0L, true);
if (resultTupleDesc)
*resultTupleDesc = CreateTupleDescCopy(queryDesc->tupDesc);
/* and clean up */
ExecutorFinish(queryDesc);
ExecutorEnd(queryDesc);
processed = queryDesc->es_processed;
FreeQueryDesc(queryDesc);
PopActiveSnapshot();
elogif(Debug_print_ivm, INFO, "IVM processed %s, %lu tuples.", queryString, processed);
return processed;
}
/*
*
* Add a preassigned materialized view entry to the hash table.
*/
void
AddPreassignedMVEntry(Oid matview_id, Oid table_id, const char* snapname)
{
MV_TriggerTable *table;
bool found;
ListCell *lc;
MV_TriggerHashEntry *entry;
MemoryContext oldcxt;
ResourceOwner oldowner;
int len = strlen(snapname);
entry = (MV_TriggerHashEntry *) hash_search(mv_trigger_info,
(void *) &matview_id,
HASH_ENTER, &found);
if (!found)
{
entry->context = AllocSetContextCreate(TopMemoryContext,
"IVM Reader Session",
ALLOCSET_DEFAULT_SIZES);
entry->resowner = ResourceOwnerCreate(TopTransactionResourceOwner, "IVM Reader Session");
entry->matview_id = matview_id;
entry->reference = 1;
entry->tables = NIL;
entry->has_old = false;
entry->has_new = false;
entry->pid = MyProcPid;
entry->snapshot = NULL;
entry->snapname = NULL;
}
/* Switch to the new resource owner and memory context */
oldowner = CurrentResourceOwner;
CurrentResourceOwner = entry->resowner;
oldcxt = MemoryContextSwitchTo(entry->context);
/* Copy the snapshot name */
if (entry->snapname)
strncpy(entry->snapname, snapname, len);
else
entry->snapname = pstrdup(snapname);
entry->snapname[len] = '\0';
/* Import the snapshot */
entry->snapshot = ivm_import_snapshot(snapname);
Assert(entry->snapshot);
found = false;
foreach(lc, entry->tables)
{
table = (MV_TriggerTable *) lfirst(lc);
if (table->table_id == table_id)
{
found = true;
break;
}
}
if (!found)
{
table = (MV_TriggerTable *) palloc0(sizeof(MV_TriggerTable));
entry->tables = lappend(entry->tables, table);
}
/* Switch back to the old memory context and resource owner */
MemoryContextSwitchTo(oldcxt);
CurrentResourceOwner = oldowner;
AfterTriggerAppendMvList(matview_id);
return;
}
/*
* ivm_immediate_cleanup
*
* Clean up hash entries for a materialized view after its
* maintenance finished.
*/
Datum
ivm_immediate_cleanup(PG_FUNCTION_ARGS)
{
Oid matview_id = PG_GETARG_OID(0);
bool result = true;
ResourceOwner resowner;
MemoryContext ctx;
MV_TriggerHashEntry *entry = (MV_TriggerHashEntry *) hash_search(mv_trigger_info,
(void *) &matview_id,
HASH_FIND, NULL);
if (entry != NULL)
{
resowner = entry->resowner;
ctx = entry->context;
clean_up_IVM_hash_entry(entry, true);
if (resowner)
{
ResourceOwnerRelease(resowner,
RESOURCE_RELEASE_BEFORE_LOCKS,
false, /* isCommit */
false); /* isTopLevel */
ResourceOwnerRelease(resowner,
RESOURCE_RELEASE_LOCKS,
false, /* isCommit */
false); /* isTopLevel */
ResourceOwnerRelease(resowner,
RESOURCE_RELEASE_AFTER_LOCKS,
false, /* isCommit */
false); /* isTopLevel */
}
if (ctx)
{
MemoryContextReset(ctx);
MemoryContextDelete(ctx);
}
}
PG_RETURN_BOOL(result);
}
/*
* apply_cleanup
*
* Apply immediate cleanup for a materialized view.
*/
static void
apply_cleanup(Oid matview_id)
{
StringInfoData querybuf;
/* Open SPI context. */
if (SPI_connect() != SPI_OK_CONNECT)
elog(ERROR, "SPI_connect failed");
initStringInfo(&querybuf);
appendStringInfo(&querybuf,
"SELECT pg_catalog.ivm_immediate_cleanup(%d::pg_catalog.oid)",
matview_id);
if (SPI_exec(querybuf.data, 0) != SPI_OK_SELECT)
elog(ERROR, "SPI_exec failed: %s", querybuf.data);
elogif(Debug_print_ivm, INFO, "IVM apply_cleanup: %s", querybuf.data);
/* Close SPI context. */
if (SPI_finish() != SPI_OK_FINISH)
elog(ERROR, "SPI_finish failed");
return;
}
/*
* ivm_export_snapshot
*
* Export a snapshot in QEs.
*
* Parameters:
* - matview_id: the OID of the materialized view
* - snapname: the name of the snapshot to export
*/
static void
ivm_export_snapshot(Oid matview_id, char *snapname)
{
StringInfoData querybuf;
/* Open SPI context. */
if (SPI_connect() != SPI_OK_CONNECT)
elog(ERROR, "SPI_connect failed");
initStringInfo(&querybuf);
appendStringInfo(&querybuf,
"SELECT pg_catalog.pg_export_snapshot_def(%d::pg_catalog.oid, '%s')", matview_id, snapname);
if (SPI_exec(querybuf.data, 0) != SPI_OK_SELECT)
elog(ERROR, "SPI_exec failed: %s", querybuf.data);
elogif(Debug_print_ivm, INFO, "IVM ivm_export_snapshot: %s", querybuf.data);
/* Close SPI context. */
if (SPI_finish() != SPI_OK_FINISH)
elog(ERROR, "SPI_finish failed");
return;
}
/*
* pg_export_snapshot_def
* SQL-callable wrapper for export snapshot.
*/
Datum
pg_export_snapshot_def(PG_FUNCTION_ARGS)
{
bool found;
Oid matview_id = PG_GETARG_OID(0);
char *snapname = text_to_cstring(PG_GETARG_TEXT_PP(1));
Snapshot snapshot = GetActiveSnapshot();
SnapshotDumpEntry *pDump;
dsm_segment *segment;
Assert(snapshot->snapshot_type == SNAPSHOT_MVCC);
Assert(snapname);
LWLockAcquire(GPIVMResLock, LW_EXCLUSIVE);
pDump = (SnapshotDumpEntry *) hash_search(mv_trigger_snapshot,
(void *) snapname,
HASH_ENTER_NULL,
&found);
if (!pDump)
ereport(ERROR,
(errcode(ERRCODE_OUT_OF_MEMORY),
errmsg("out of shared memory"),
errhint("out of cross-slice trigger snapshot slots.")));
if (!found)
{
Size sz = EstimateSnapshotSpace(snapshot);
segment = dsm_create(sz, 0);
char *ptr = dsm_segment_address(segment);
SerializeSnapshot(snapshot, ptr);
pDump->segment = segment;
pDump->handle = dsm_segment_handle(segment);
pDump->pid = MyProcPid;
pDump->matview_id = matview_id;
dsm_pin_mapping(segment);
}
LWLockRelease(GPIVMResLock);
PG_RETURN_TEXT_P(cstring_to_text(snapname));
}
/*
* ExecuteTruncateGuts_IVM
*
* This function truncates the given materialized view and its dependent
* objects, and then regenerates the data for the materialized view.
*
* Parameters:
* - matviewRel: the relation of the materialized view to truncate
* - matviewOid: the OID of the materialized view to truncate
* - query: the query to use to regenerate the data for the materialized view
*/
static void
ExecuteTruncateGuts_IVM(Relation matviewRel,
Oid matviewOid,
Query *query)
{
Oid OIDNewHeap;
DestReceiver *dest;
uint64 processed = 0;
Query *dataQuery = rewriteQueryForIMMV(query, NIL);
char relpersistence = matviewRel->rd_rel->relpersistence;
RefreshClause *refreshClause;
/*
* Create the transient table that will receive the regenerated data. Lock
* it against access by any other process until commit (by which time it
* will be gone).
*/
OIDNewHeap = make_new_heap(matviewOid, matviewRel->rd_rel->reltablespace, matviewRel->rd_rel->relam,
relpersistence, ExclusiveLock, false, true);
LockRelationOid(OIDNewHeap, AccessExclusiveLock);
dest = CreateTransientRelDestReceiver(OIDNewHeap, matviewOid, false,
relpersistence, false);
RangeVar *relation = makeRangeVar(get_namespace_name(get_rel_namespace(matviewOid)),
get_rel_name(matviewOid), -1);
refreshClause = MakeRefreshClause(false, false, relation);
dataQuery->intoPolicy = matviewRel->rd_cdbpolicy;
dataQuery->parentStmtType = PARENTSTMTTYPE_REFRESH_MATVIEW;
/* Generate the data */
processed = refresh_matview_datafill(dest, dataQuery, "truncate", refreshClause);
refresh_by_heap_swap(matviewOid, OIDNewHeap, relpersistence);
}
/*
* ivm_import_snapshot
*
* This function imports a snapshot from the hash table based on the given name.
*
* Parameters:
* - idstr: a string representing the id of the snapshot to import
*
* Returns:
* - the imported snapshot
*/
static Snapshot
ivm_import_snapshot(const char *idstr)
{
bool found;
SnapshotDumpEntry *pDump;
Snapshot snapshot = NULL;
LWLockAcquire(GPIVMResLock, LW_SHARED);
pDump = hash_search(mv_trigger_snapshot, (void*)idstr, HASH_FIND, &found);
Assert (found && pDump != NULL);
if (found)
{
Assert(pDump);
if (pDump->pid == MyProcPid)
{
char *ptr = dsm_segment_address(pDump->segment);
snapshot = RestoreSnapshot(ptr);
}
else
{
dsm_segment* segment = dsm_attach(pDump->handle);
char *ptr = dsm_segment_address(segment);
snapshot = RestoreSnapshot(ptr);
dsm_detach(segment);
}
}
LWLockRelease(GPIVMResLock);
return snapshot;
}
/*
* makeIvmIntoClause
*
* This function creates an IntoClause node for IVM (Incremental View Maintenance).
* It sets the 'ivm' field to true, 'rel' field to NULL, 'enrname' field to the input enrname,
* 'distributedBy' field to the distribution policy of the input relation (matviewRel), and 'matviewOid' field
* to the OID of the input relation.
*
* Parameters:
* - enrname: a string representing the named tuplestore
* - matviewRel: a relation object representing the materialized view
*
* Returns:
* - a pointer to the created IntoClause node
*/
static IntoClause*
makeIvmIntoClause(const char *enrname, Relation matviewRel)
{
IntoClause *intoClause = makeNode(IntoClause);
intoClause->ivm = true;
/* rel is NULL means putting tuples into memory.*/
intoClause->rel = NULL;
intoClause->enrname = (char*) enrname;
intoClause->distributedBy = (Node*) make_distributedby_for_rel(matviewRel);
intoClause->matviewOid = RelationGetRelid(matviewRel);
return intoClause;
}
/*
* transientenr_init
*
* This function initializes the transientenr.
*
* Parameters:
* - queryDesc: the query descriptor
*/
void
transientenr_init(QueryDesc *queryDesc)
{
MV_TriggerHashEntry *entry;
bool found;
IntoClause *into = queryDesc->plannedstmt->intoClause;
Oid matviewOid = into->matviewOid;
entry = (MV_TriggerHashEntry *) hash_search(mv_trigger_info,
(void *) &matviewOid,
HASH_FIND, &found);
Assert (found && entry != NULL);
Assert (into->ivm);
queryDesc->dest = CreateDestReceiver(DestPersistentstore);
SetPersistentTstoreDestReceiverParams(queryDesc->dest,
NULL,
entry->resowner,
entry->context,
true,
into->enrname);
}
/*
* ivm_set_ts_persitent_name
*
* This function sets the transition table file name for the IVM trigger.
*/
static void
ivm_set_ts_persitent_name(TriggerData *trigdata, Oid relid, Oid mvid)
{
MemoryContext oldctx;
CmdType cmd;
if (TRIGGER_FIRED_BY_INSERT(trigdata->tg_event))
cmd = CMD_INSERT;
else if (TRIGGER_FIRED_BY_DELETE(trigdata->tg_event))
cmd = CMD_DELETE;
else if (TRIGGER_FIRED_BY_UPDATE(trigdata->tg_event))
cmd = CMD_UPDATE;
else
{
AfterTriggerAppendMvList(mvid);
return;
}
oldctx = MemoryContextSwitchTo(TopMemoryContext);
SetTransitionTableName(relid, cmd, mvid);
MemoryContextSwitchTo(oldctx);
}