| /* ------------------------------------------------------------------------- |
| * |
| * gp_activetable.c |
| * |
| * This code is responsible for detecting active table for databases |
| * quotamodel will call gp_fetch_active_tables() to fetch the active tables |
| * and their size information in each loop. |
| * |
| * Copyright (c) 2018-2020 Pivotal Software, Inc. |
| * Copyright (c) 2020-Present VMware, Inc. or its affiliates |
| * |
| * IDENTIFICATION |
| * diskquota/gp_activetable.c |
| * |
| * ------------------------------------------------------------------------- |
| */ |
| #include "postgres.h" |
| |
| #include "access/htup_details.h" |
| #include "access/relation.h" |
| #include "access/xact.h" |
| #include "catalog/catalog.h" |
| #include "catalog/objectaccess.h" |
| #include "catalog/pg_extension.h" |
| #include "cdb/cdbdisp_query.h" |
| #include "cdb/cdbdispatchresult.h" |
| #include "cdb/cdbvars.h" |
| #include "commands/dbcommands.h" |
| #include "commands/extension.h" |
| #include "executor/spi.h" |
| #include "funcapi.h" |
| #include "libpq-fe.h" |
| #include "storage/smgr.h" |
| #include "utils/faultinjector.h" |
| #include "utils/lsyscache.h" |
| #include "utils/syscache.h" |
| #include "utils/inval.h" |
| #include "utils/array.h" |
| |
| #include "gp_activetable.h" |
| #include "diskquota.h" |
| #include "relation_cache.h" |
| |
| PG_FUNCTION_INFO_V1(diskquota_fetch_table_stat); |
| |
| /* The results set cache for SRF call*/ |
| typedef struct DiskQuotaSetOFCache |
| { |
| HTAB *result; |
| HASH_SEQ_STATUS pos; |
| } DiskQuotaSetOFCache; |
| |
| HTAB *active_tables_map = NULL; // Set<DiskQuotaActiveTableFileEntry> |
| |
| /* |
| * monitored_dbid_cache is a allow list for diskquota |
| * to know which databases it need to monitor. |
| * |
| * dbid will be added to it when creating diskquota extension |
| * dbid will be removed from it when droping diskquota extension |
| */ |
| HTAB *altered_reloid_cache = NULL; // Set<Oid> |
| |
| /* active table hooks which detect the disk file size change. */ |
| static file_create_hook_type prev_file_create_hook = NULL; |
| static file_extend_hook_type prev_file_extend_hook = NULL; |
| static file_truncate_hook_type prev_file_truncate_hook = NULL; |
| static file_unlink_hook_type prev_file_unlink_hook = NULL; |
| static object_access_hook_type prev_object_access_hook = NULL; |
| |
| static void active_table_hook_smgrcreate(RelFileNodeBackend rnode); |
| static void active_table_hook_smgrextend(RelFileNodeBackend rnode); |
| static void active_table_hook_smgrtruncate(RelFileNodeBackend rnode); |
| static void active_table_hook_smgrunlink(RelFileNodeBackend rnode); |
| static void object_access_hook_QuotaStmt(ObjectAccessType access, Oid classId, Oid objectId, int subId, void *arg); |
| |
| static HTAB *get_active_tables_stats(ArrayType *array); |
| static HTAB *get_active_tables_oid(void); |
| static HTAB *pull_active_list_from_seg(void); |
| static void pull_active_table_size_from_seg(HTAB *local_table_stats_map, char *active_oid_array); |
| static StringInfoData convert_map_to_string(HTAB *active_list); |
| static void load_table_size(HTAB *local_table_stats_map); |
| static void report_active_table_helper(const RelFileNodeBackend *relFileNode); |
| static void remove_from_active_table_map(const RelFileNodeBackend *relFileNode); |
| static void report_relation_cache_helper(Oid relid); |
| static void report_altered_reloid(Oid reloid); |
| static Oid get_dbid(ArrayType *array); |
| |
| void init_active_table_hook(void); |
| void init_shm_worker_active_tables(void); |
| void init_lock_active_tables(void); |
| HTAB *gp_fetch_active_tables(bool is_init); |
| |
| /* |
| * Init active_tables_map shared memory |
| */ |
| void |
| init_shm_worker_active_tables(void) |
| { |
| HASHCTL ctl; |
| |
| memset(&ctl, 0, sizeof(ctl)); |
| ctl.keysize = sizeof(DiskQuotaActiveTableFileEntry); |
| ctl.entrysize = sizeof(DiskQuotaActiveTableFileEntry); |
| active_tables_map = DiskquotaShmemInitHash("active_tables", diskquota_max_active_tables, |
| diskquota_max_active_tables, &ctl, HASH_ELEM, DISKQUOTA_TAG_HASH); |
| |
| memset(&ctl, 0, sizeof(ctl)); |
| ctl.keysize = sizeof(Oid); |
| ctl.entrysize = sizeof(Oid); |
| altered_reloid_cache = DiskquotaShmemInitHash("altered_reloid_cache", diskquota_max_active_tables, |
| diskquota_max_active_tables, &ctl, HASH_ELEM, DISKQUOTA_OID_HASH); |
| } |
| |
| /* |
| * Register disk file size change hook to detect active table. |
| */ |
| void |
| init_active_table_hook(void) |
| { |
| prev_file_create_hook = file_create_hook; |
| file_create_hook = active_table_hook_smgrcreate; |
| |
| prev_file_extend_hook = file_extend_hook; |
| file_extend_hook = active_table_hook_smgrextend; |
| |
| prev_file_truncate_hook = file_truncate_hook; |
| file_truncate_hook = active_table_hook_smgrtruncate; |
| |
| prev_file_unlink_hook = file_unlink_hook; |
| file_unlink_hook = active_table_hook_smgrunlink; |
| |
| prev_object_access_hook = object_access_hook; |
| object_access_hook = object_access_hook_QuotaStmt; |
| } |
| |
| /* |
| * File create hook is used to monitor a new file create event |
| */ |
| static void |
| active_table_hook_smgrcreate(RelFileNodeBackend rnode) |
| { |
| if (prev_file_create_hook) (*prev_file_create_hook)(rnode); |
| |
| SIMPLE_FAULT_INJECTOR("diskquota_after_smgrcreate"); |
| report_active_table_helper(&rnode); |
| } |
| |
| /* |
| * File extend hook is used to monitor file size extend event |
| * it could be extending a page for heap table or just monitoring |
| * file write for an append-optimize table. |
| */ |
| static void |
| active_table_hook_smgrextend(RelFileNodeBackend rnode) |
| { |
| if (prev_file_extend_hook) (*prev_file_extend_hook)(rnode); |
| |
| report_active_table_helper(&rnode); |
| quota_check_common(InvalidOid /*reloid*/, &rnode.node); |
| } |
| |
| /* |
| * File truncate hook is used to monitor a new file truncate event |
| */ |
| static void |
| active_table_hook_smgrtruncate(RelFileNodeBackend rnode) |
| { |
| if (prev_file_truncate_hook) (*prev_file_truncate_hook)(rnode); |
| |
| report_active_table_helper(&rnode); |
| } |
| |
| static void |
| active_table_hook_smgrunlink(RelFileNodeBackend rnode) |
| { |
| if (prev_file_unlink_hook) (*prev_file_unlink_hook)(rnode); |
| |
| /* |
| * Since we do not remove the relfilenode if it does not map to any valid |
| * relation oid, we need to do the cleaning here to avoid memory leak |
| */ |
| remove_from_active_table_map(&rnode); |
| remove_cache_entry(InvalidOid, rnode.node.relNode); |
| } |
| |
| static void |
| object_access_hook_QuotaStmt(ObjectAccessType access, Oid classId, Oid objectId, int subId, void *arg) |
| { |
| if (prev_object_access_hook) (*prev_object_access_hook)(access, classId, objectId, subId, arg); |
| |
| /* if is 'drop extension diskquota' */ |
| if (classId == ExtensionRelationId && access == OAT_DROP) |
| { |
| if (get_extension_oid("diskquota", true) == objectId) |
| { |
| invalidate_database_rejectmap(MyDatabaseId); |
| diskquota_stop_worker(); |
| } |
| return; |
| } |
| |
| /* TODO: do we need to use "&&" instead of "||"? */ |
| if (classId != RelationRelationId || subId != 0) |
| { |
| return; |
| } |
| |
| if (objectId < FirstNormalObjectId) |
| { |
| return; |
| } |
| |
| switch (access) |
| { |
| case OAT_POST_CREATE: |
| report_relation_cache_helper(objectId); |
| break; |
| case OAT_POST_ALTER: |
| SIMPLE_FAULT_INJECTOR("object_access_post_alter"); |
| report_altered_reloid(objectId); |
| break; |
| default: |
| break; |
| } |
| } |
| |
| static void |
| report_altered_reloid(Oid reloid) |
| { |
| /* |
| * We don't collect altered relations' reloid on mirrors |
| * and QD. |
| */ |
| if (IsRoleMirror() || IS_QUERY_DISPATCHER()) return; |
| |
| LWLockAcquire(diskquota_locks.altered_reloid_cache_lock, LW_EXCLUSIVE); |
| hash_search(altered_reloid_cache, &reloid, HASH_ENTER, NULL); |
| LWLockRelease(diskquota_locks.altered_reloid_cache_lock); |
| } |
| |
| static void |
| report_relation_cache_helper(Oid relid) |
| { |
| bool found; |
| Relation rel; |
| char relkind; |
| |
| /* We do not collect the active table in mirror segments */ |
| if (IsRoleMirror()) |
| { |
| return; |
| } |
| |
| /* |
| * Do not collect active table info when the database is not under monitoring. |
| * this operation is read-only and does not require absolutely exact. |
| * read the cache with out shared lock. |
| */ |
| LWLockAcquire(diskquota_locks.monitored_dbid_cache_lock, LW_SHARED); |
| hash_search(monitored_dbid_cache, &MyDatabaseId, HASH_FIND, &found); |
| LWLockRelease(diskquota_locks.monitored_dbid_cache_lock); |
| if (!found) |
| { |
| return; |
| } |
| |
| rel = diskquota_relation_open(relid); |
| if (rel == NULL) |
| { |
| return; |
| } |
| |
| relkind = rel->rd_rel->relkind; |
| |
| RelationClose(rel); |
| |
| if (relkind != RELKIND_FOREIGN_TABLE && relkind != RELKIND_COMPOSITE_TYPE && relkind != RELKIND_VIEW) |
| update_relation_cache(relid); |
| } |
| |
| /* |
| * Common function for reporting active tables |
| * Currently, any file events(create, extend. truncate) are |
| * treated the same and report_active_table_helper just put |
| * the corresponding relFileNode into the active_tables_map |
| */ |
| static void |
| report_active_table_helper(const RelFileNodeBackend *relFileNode) |
| { |
| DiskQuotaActiveTableFileEntry *entry; |
| DiskQuotaActiveTableFileEntry item; |
| bool found = false; |
| Oid dbid = relFileNode->node.dbNode; |
| |
| /* We do not collect the active table in mirror segments */ |
| if (IsRoleMirror()) |
| { |
| return; |
| } |
| |
| LWLockAcquire(diskquota_locks.monitored_dbid_cache_lock, LW_SHARED); |
| /* do not collect active table info when the database is not under monitoring. |
| * this operation is read-only and does not require absolutely exact. |
| * read the cache with out shared lock */ |
| hash_search(monitored_dbid_cache, &dbid, HASH_FIND, &found); |
| LWLockRelease(diskquota_locks.monitored_dbid_cache_lock); |
| if (!found) |
| { |
| return; |
| } |
| found = false; |
| |
| MemSet(&item, 0, sizeof(DiskQuotaActiveTableFileEntry)); |
| item.dbid = relFileNode->node.dbNode; |
| item.relfilenode = relFileNode->node.relNode; |
| item.tablespaceoid = relFileNode->node.spcNode; |
| |
| LWLockAcquire(diskquota_locks.active_table_lock, LW_EXCLUSIVE); |
| entry = hash_search(active_tables_map, &item, HASH_ENTER_NULL, &found); |
| if (entry && !found) *entry = item; |
| |
| if (!found && entry == NULL) |
| { |
| /* |
| * We may miss the file size change of this relation at current |
| * refresh interval. |
| */ |
| ereport(WARNING, (errmsg("Share memory is not enough for active tables."))); |
| } |
| LWLockRelease(diskquota_locks.active_table_lock); |
| } |
| |
| /* |
| * Remove relfilenode from the active table map if exists. |
| */ |
| static void |
| remove_from_active_table_map(const RelFileNodeBackend *relFileNode) |
| { |
| DiskQuotaActiveTableFileEntry item = {0}; |
| |
| item.dbid = relFileNode->node.dbNode; |
| item.relfilenode = relFileNode->node.relNode; |
| item.tablespaceoid = relFileNode->node.spcNode; |
| |
| LWLockAcquire(diskquota_locks.active_table_lock, LW_EXCLUSIVE); |
| hash_search(active_tables_map, &item, HASH_REMOVE, NULL); |
| LWLockRelease(diskquota_locks.active_table_lock); |
| } |
| |
| /* |
| * Interface of activetable module |
| * This function is called by quotamodel module. |
| * Disk quota worker process need to collect |
| * active table disk usage from all the segments. |
| * And aggregate the table size on each segment |
| * to get the real table size at cluster level. |
| */ |
| HTAB * |
| gp_fetch_active_tables(bool is_init) |
| { |
| HTAB *local_table_stats_map = NULL; |
| HASHCTL ctl; |
| HTAB *local_active_table_oid_maps; |
| StringInfoData active_oid_list; |
| |
| Assert(Gp_role == GP_ROLE_DISPATCH); |
| |
| memset(&ctl, 0, sizeof(ctl)); |
| ctl.keysize = sizeof(TableEntryKey); |
| ctl.entrysize = sizeof(DiskQuotaActiveTableEntry); |
| ctl.hcxt = CurrentMemoryContext; |
| |
| local_table_stats_map = diskquota_hash_create("local active table map with relfilenode info", 1024, &ctl, |
| HASH_ELEM | HASH_CONTEXT, DISKQUOTA_TAG_HASH); |
| |
| if (is_init) |
| { |
| load_table_size(local_table_stats_map); |
| } |
| else |
| { |
| /* step 1: fetch active oids from all the segments */ |
| local_active_table_oid_maps = pull_active_list_from_seg(); |
| active_oid_list = convert_map_to_string(local_active_table_oid_maps); |
| |
| ereport(DEBUG1, |
| (errcode(ERRCODE_INTERNAL_ERROR), errmsg("[diskquota] active_old_list = %s", active_oid_list.data))); |
| |
| /* step 2: fetch active table sizes based on active oids */ |
| pull_active_table_size_from_seg(local_table_stats_map, active_oid_list.data); |
| |
| hash_destroy(local_active_table_oid_maps); |
| pfree(active_oid_list.data); |
| } |
| return local_table_stats_map; |
| } |
| |
| /* |
| * Function to get the table size from each segments |
| * There are 4 modes: |
| * |
| * - FETCH_ACTIVE_OID: gather active table oid from all the segments, since |
| * table may only be modified on a subset of the segments, we need to firstly |
| * gather the active table oid list from all the segments. |
| * |
| * - FETCH_ACTIVE_SIZE: calculate the active table size based on the active |
| * table oid list. |
| * |
| * - ADD_DB_TO_MONITOR: add MyDatabaseId to the monitored db cache so that |
| * active tables in the current database will be recorded. This is used each |
| * time a worker starts. |
| * |
| * - REMOVE_DB_FROM_BEING_MONITORED: remove MyDatabaseId from the monitored |
| * db cache so that active tables in the current database will be recorded. |
| * This is used when DROP EXTENSION. |
| */ |
| Datum |
| diskquota_fetch_table_stat(PG_FUNCTION_ARGS) |
| { |
| FuncCallContext *funcctx; |
| int32 mode = PG_GETARG_INT32(0); |
| AttInMetadata *attinmeta; |
| bool isFirstCall = true; |
| Oid dbid; |
| |
| HTAB *localCacheTable = NULL; |
| DiskQuotaSetOFCache *cache = NULL; |
| DiskQuotaActiveTableEntry *results_entry = NULL; |
| |
| #ifdef FAULT_INJECTOR |
| if (SIMPLE_FAULT_INJECTOR("ereport_warning_from_segment") == FaultInjectorTypeSkip) |
| { |
| ereport(WARNING, (errmsg("[Fault Injector] This is a warning reported from segment"))); |
| } |
| #endif |
| |
| /* Init the container list in the first call and get the results back */ |
| if (SRF_IS_FIRSTCALL()) |
| { |
| MemoryContext oldcontext; |
| TupleDesc tupdesc; |
| int ret_code = SPI_connect(); |
| if (ret_code != SPI_OK_CONNECT) |
| { |
| ereport(ERROR, (errcode(ERRCODE_INTERNAL_ERROR), |
| errmsg("unable to connect to execute internal query. return code: %d.", ret_code))); |
| } |
| SPI_finish(); |
| |
| /* create a function context for cross-call persistence */ |
| funcctx = SRF_FIRSTCALL_INIT(); |
| |
| /* switch to memory context appropriate for multiple function calls */ |
| oldcontext = MemoryContextSwitchTo(funcctx->multi_call_memory_ctx); |
| |
| if (Gp_role == GP_ROLE_DISPATCH || Gp_role == GP_ROLE_UTILITY) |
| { |
| ereport(ERROR, (errmsg("This function must not be called on master or by user"))); |
| } |
| |
| switch (mode) |
| { |
| case FETCH_ACTIVE_OID: |
| localCacheTable = get_active_tables_oid(); |
| break; |
| case FETCH_ACTIVE_SIZE: |
| localCacheTable = get_active_tables_stats(PG_GETARG_ARRAYTYPE_P(1)); |
| break; |
| /*TODO: add another UDF to update the monitored_db_cache */ |
| case ADD_DB_TO_MONITOR: |
| dbid = get_dbid(PG_GETARG_ARRAYTYPE_P(1)); |
| update_monitor_db(dbid, ADD_DB_TO_MONITOR); |
| PG_RETURN_NULL(); |
| case REMOVE_DB_FROM_BEING_MONITORED: |
| dbid = get_dbid(PG_GETARG_ARRAYTYPE_P(1)); |
| update_monitor_db(dbid, REMOVE_DB_FROM_BEING_MONITORED); |
| PG_RETURN_NULL(); |
| case PAUSE_DB_TO_MONITOR: |
| dbid = get_dbid(PG_GETARG_ARRAYTYPE_P(1)); |
| update_monitor_db(dbid, PAUSE_DB_TO_MONITOR); |
| PG_RETURN_NULL(); |
| case RESUME_DB_TO_MONITOR: |
| dbid = get_dbid(PG_GETARG_ARRAYTYPE_P(1)); |
| update_monitor_db(dbid, RESUME_DB_TO_MONITOR); |
| PG_RETURN_NULL(); |
| default: |
| ereport(ERROR, (errmsg("Unused mode number %d, transaction will be aborted", mode))); |
| break; |
| } |
| |
| /* |
| * total number of active tables to be returned, each tuple contains |
| * one active table stat |
| */ |
| funcctx->max_calls = localCacheTable ? (uint32)hash_get_num_entries(localCacheTable) : 0; |
| |
| /* |
| * prepare attribute metadata for next calls that generate the tuple |
| */ |
| tupdesc = DiskquotaCreateTemplateTupleDesc(3); |
| TupleDescInitEntry(tupdesc, (AttrNumber)1, "TABLE_OID", OIDOID, -1, 0); |
| TupleDescInitEntry(tupdesc, (AttrNumber)2, "TABLE_SIZE", INT8OID, -1, 0); |
| TupleDescInitEntry(tupdesc, (AttrNumber)3, "GP_SEGMENT_ID", INT2OID, -1, 0); |
| |
| attinmeta = TupleDescGetAttInMetadata(tupdesc); |
| funcctx->attinmeta = attinmeta; |
| |
| /* Prepare SetOf results HATB */ |
| cache = (DiskQuotaSetOFCache *)palloc(sizeof(DiskQuotaSetOFCache)); |
| cache->result = localCacheTable; |
| hash_seq_init(&(cache->pos), localCacheTable); |
| |
| MemoryContextSwitchTo(oldcontext); |
| } |
| else |
| { |
| isFirstCall = false; |
| } |
| |
| funcctx = SRF_PERCALL_SETUP(); |
| |
| if (isFirstCall) |
| { |
| funcctx->user_fctx = (void *)cache; |
| } |
| else |
| { |
| cache = (DiskQuotaSetOFCache *)funcctx->user_fctx; |
| } |
| |
| /* return the results back to SPI caller */ |
| while ((results_entry = (DiskQuotaActiveTableEntry *)hash_seq_search(&(cache->pos))) != NULL) |
| { |
| Datum result; |
| Datum values[3]; |
| bool nulls[3]; |
| HeapTuple tuple; |
| |
| memset(values, 0, sizeof(values)); |
| memset(nulls, false, sizeof(nulls)); |
| |
| values[0] = ObjectIdGetDatum(results_entry->reloid); |
| values[1] = Int64GetDatum(results_entry->tablesize); |
| values[2] = Int16GetDatum(results_entry->segid); |
| |
| tuple = heap_form_tuple(funcctx->attinmeta->tupdesc, values, nulls); |
| |
| result = HeapTupleGetDatum(tuple); |
| |
| SRF_RETURN_NEXT(funcctx, result); |
| } |
| |
| /* finished, do the clear staff */ |
| hash_destroy(cache->result); |
| pfree(cache); |
| SRF_RETURN_DONE(funcctx); |
| } |
| |
| static Oid |
| get_dbid(ArrayType *array) |
| { |
| Assert(ARR_ELEMTYPE(array) == OIDOID); |
| char *ptr; |
| bool typbyval; |
| int16 typlen; |
| char typalign; |
| Oid dbid; |
| |
| get_typlenbyvalalign(ARR_ELEMTYPE(array), &typlen, &typbyval, &typalign); |
| ptr = ARR_DATA_PTR(array); |
| dbid = DatumGetObjectId(fetch_att(ptr, typbyval, typlen)); |
| return dbid; |
| } |
| |
| /* |
| * Call pg_table_size to calcualte the |
| * active table size on each segments. |
| */ |
| static HTAB * |
| get_active_tables_stats(ArrayType *array) |
| { |
| int ndim = ARR_NDIM(array); |
| int *dims = ARR_DIMS(array); |
| int nitems; |
| int16 typlen; |
| bool typbyval; |
| char typalign; |
| char *ptr; |
| bits8 *bitmap; |
| int bitmask; |
| int i; |
| Oid relOid; |
| int segId; |
| HTAB *local_table = NULL; |
| HASHCTL ctl; |
| TableEntryKey key; |
| DiskQuotaActiveTableEntry *entry; |
| bool found; |
| |
| Assert(ARR_ELEMTYPE(array) == OIDOID); |
| |
| nitems = ArrayGetNItems(ndim, dims); |
| |
| get_typlenbyvalalign(ARR_ELEMTYPE(array), &typlen, &typbyval, &typalign); |
| |
| ptr = ARR_DATA_PTR(array); |
| bitmap = ARR_NULLBITMAP(array); |
| bitmask = 1; |
| |
| memset(&ctl, 0, sizeof(ctl)); |
| ctl.keysize = sizeof(TableEntryKey); |
| ctl.entrysize = sizeof(DiskQuotaActiveTableEntry); |
| ctl.hcxt = CurrentMemoryContext; |
| local_table = diskquota_hash_create("local table map", 1024, &ctl, HASH_ELEM | HASH_CONTEXT, DISKQUOTA_TAG_HASH); |
| |
| for (i = 0; i < nitems; i++) |
| { |
| /* |
| * handle array containing NULL case for general inupt, but the active |
| * table oid array would not contain NULL in fact |
| */ |
| if (bitmap && (*bitmap & bitmask) == 0) |
| { |
| continue; |
| } |
| else |
| { |
| relOid = DatumGetObjectId(fetch_att(ptr, typbyval, typlen)); |
| segId = GpIdentity.segindex; |
| key.reloid = relOid; |
| key.segid = segId; |
| |
| entry = (DiskQuotaActiveTableEntry *)hash_search(local_table, &key, HASH_ENTER, &found); |
| if (!found) |
| { |
| entry->reloid = relOid; |
| entry->segid = segId; |
| entry->tablesize = calculate_table_size(relOid); |
| } |
| |
| ptr = att_addlength_pointer(ptr, typlen, ptr); |
| ptr = (char *)att_align_nominal(ptr, typalign); |
| } |
| |
| /* advance bitmap pointer if any */ |
| if (bitmap) |
| { |
| bitmask <<= 1; |
| if (bitmask == 0x100) |
| { |
| bitmap++; |
| bitmask = 1; |
| } |
| } |
| } |
| |
| return local_table; |
| } |
| |
| /* |
| * SetLocktagRelationOid |
| * Set up a locktag for a relation, given only relation OID |
| */ |
| static inline void |
| SetLocktagRelationOid(LOCKTAG *tag, Oid relid) |
| { |
| Oid dbid; |
| |
| if (IsSharedRelation(relid)) |
| dbid = InvalidOid; |
| else |
| dbid = MyDatabaseId; |
| |
| SET_LOCKTAG_RELATION(*tag, dbid, relid); |
| } |
| |
| static bool |
| is_relation_being_altered(Oid relid) |
| { |
| LOCKTAG locktag; |
| SetLocktagRelationOid(&locktag, relid); |
| VirtualTransactionId *vxid_list = GetLockConflicts(&locktag, AccessShareLock, NULL); |
| bool being_altered = VirtualTransactionIdIsValid(*vxid_list); /* if vxid_list is empty */ |
| pfree(vxid_list); |
| return being_altered; |
| } |
| |
| /* |
| * Check whether the cached relfilenode is stale compared to the given one |
| * due to delayed cache invalidation messages. |
| * |
| * NOTE: It will return false if the relation is currently uncommitted. |
| */ |
| static bool |
| is_cached_relfilenode_stale(Oid relOid, RelFileNode rnode) |
| { |
| /* |
| * Since we don't take any lock on relation, need to check for cache |
| * invalidation messages manually. |
| */ |
| AcceptInvalidationMessages(); |
| HeapTuple tp = SearchSysCacheCopy1(RELOID, ObjectIdGetDatum(relOid)); |
| |
| /* |
| * Tuple is not valid if |
| * - The relation has not been committed yet, or |
| * - The relation has been deleted |
| */ |
| if (!HeapTupleIsValid(tp)) return false; |
| Form_pg_class reltup = (Form_pg_class)GETSTRUCT(tp); |
| |
| /* |
| * If cache invalidation messages are not delievered in time, the |
| * relfilenode in the tuple of the relation is stale. In that case, |
| * the relfilenode in the relation tuple is not equal to the one in |
| * the active table map. |
| */ |
| Oid cached_relfilenode = reltup->relfilenode; |
| bool is_stale = cached_relfilenode != rnode.relNode; |
| heap_freetuple(tp); |
| return is_stale; |
| } |
| |
| /* |
| * Get local active table with table oid and table size info. |
| * This function first copies active table map from shared memory |
| * to local active table map with refilenode info. Then traverses |
| * the local map and find corresponding table oid and table file |
| * size. Finally stores them into local active table map and return. |
| */ |
| static HTAB * |
| get_active_tables_oid(void) |
| { |
| HASHCTL ctl; |
| HTAB *local_active_table_file_map = NULL; |
| HTAB *local_active_table_stats_map = NULL; |
| HTAB *local_altered_reloid_cache = NULL; |
| HASH_SEQ_STATUS iter; |
| DiskQuotaActiveTableFileEntry *active_table_file_entry; |
| DiskQuotaActiveTableEntry *active_table_entry; |
| Oid *altered_reloid_entry; |
| |
| Oid relOid; |
| |
| refresh_monitored_dbid_cache(); |
| |
| memset(&ctl, 0, sizeof(ctl)); |
| ctl.keysize = sizeof(DiskQuotaActiveTableFileEntry); |
| ctl.entrysize = sizeof(DiskQuotaActiveTableFileEntry); |
| ctl.hcxt = CurrentMemoryContext; |
| local_active_table_file_map = diskquota_hash_create("local active table map with relfilenode info", 1024, &ctl, |
| HASH_ELEM | HASH_CONTEXT, DISKQUOTA_TAG_HASH); |
| |
| memset(&ctl, 0, sizeof(ctl)); |
| ctl.keysize = sizeof(Oid); |
| ctl.entrysize = sizeof(Oid); |
| ctl.hcxt = CurrentMemoryContext; |
| local_altered_reloid_cache = diskquota_hash_create("local_altered_reloid_cache", 1024, &ctl, |
| HASH_ELEM | HASH_CONTEXT, DISKQUOTA_OID_HASH); |
| |
| /* Move active table from shared memory to local active table map */ |
| LWLockAcquire(diskquota_locks.active_table_lock, LW_EXCLUSIVE); |
| |
| hash_seq_init(&iter, active_tables_map); |
| |
| /* copy active table from shared memory into local memory */ |
| while ((active_table_file_entry = (DiskQuotaActiveTableFileEntry *)hash_seq_search(&iter)) != NULL) |
| { |
| bool found; |
| DiskQuotaActiveTableFileEntry *entry; |
| |
| if (active_table_file_entry->dbid != MyDatabaseId) |
| { |
| continue; |
| } |
| |
| /* Add the active table entry into local hash table */ |
| entry = hash_search(local_active_table_file_map, active_table_file_entry, HASH_ENTER, &found); |
| if (entry) *entry = *active_table_file_entry; |
| hash_search(active_tables_map, active_table_file_entry, HASH_REMOVE, NULL); |
| } |
| // TODO: hash_seq_term(&iter); |
| LWLockRelease(diskquota_locks.active_table_lock); |
| |
| memset(&ctl, 0, sizeof(ctl)); |
| /* only use Oid as key here, segid is not needed */ |
| ctl.keysize = sizeof(Oid); |
| ctl.entrysize = sizeof(DiskQuotaActiveTableEntry); |
| ctl.hcxt = CurrentMemoryContext; |
| local_active_table_stats_map = diskquota_hash_create("local active table map with relfilenode info", 1024, &ctl, |
| HASH_ELEM | HASH_CONTEXT, DISKQUOTA_OID_HASH); |
| |
| remove_committed_relation_from_cache(); |
| |
| /* |
| * scan whole local map, get the oid of each table and calculate the size |
| * of them |
| */ |
| hash_seq_init(&iter, local_active_table_file_map); |
| |
| while ((active_table_file_entry = (DiskQuotaActiveTableFileEntry *)hash_seq_search(&iter)) != NULL) |
| { |
| bool found; |
| RelFileNode rnode; |
| Oid prelid; |
| |
| /* The session of db1 should not see the table inside db2. */ |
| if (active_table_file_entry->dbid != MyDatabaseId) continue; |
| |
| rnode.dbNode = active_table_file_entry->dbid; |
| rnode.relNode = active_table_file_entry->relfilenode; |
| rnode.spcNode = active_table_file_entry->tablespaceoid; |
| relOid = get_relid_by_relfilenode(rnode); |
| |
| /* If relfilenode is not prepared for some relation, just skip it. */ |
| if (!OidIsValid(relOid)) continue; |
| |
| /* skip system catalog tables */ |
| if (relOid < FirstNormalObjectId) |
| { |
| hash_search(local_active_table_file_map, active_table_file_entry, HASH_REMOVE, NULL); |
| } |
| else |
| { |
| prelid = get_primary_table_oid(relOid, true); |
| active_table_entry = hash_search(local_active_table_stats_map, &prelid, HASH_ENTER, &found); |
| if (active_table_entry && !found) |
| { |
| active_table_entry->reloid = prelid; |
| /* we don't care segid and tablesize here */ |
| active_table_entry->tablesize = 0; |
| active_table_entry->segid = -1; |
| } |
| /* |
| * Do NOT remove relation from the active table map if it is being |
| * altered or its cached relfilenode is stale so that we can check it |
| * again in the next epoch. |
| */ |
| if (!is_relation_being_altered(relOid) && !is_cached_relfilenode_stale(relOid, rnode)) |
| { |
| hash_search(local_active_table_file_map, active_table_file_entry, HASH_REMOVE, NULL); |
| } |
| } |
| } |
| |
| // TODO: hash_seq_term(&iter); |
| |
| /* Adding the remaining relfilenodes back to the map in the shared memory */ |
| LWLockAcquire(diskquota_locks.active_table_lock, LW_EXCLUSIVE); |
| hash_seq_init(&iter, local_active_table_file_map); |
| while ((active_table_file_entry = (DiskQuotaActiveTableFileEntry *)hash_seq_search(&iter)) != NULL) |
| { |
| /* TODO: handle possible ERROR here so that the bgworker will not go down. */ |
| hash_search(active_tables_map, active_table_file_entry, HASH_ENTER, NULL); |
| } |
| /* TODO: hash_seq_term(&iter); */ |
| LWLockRelease(diskquota_locks.active_table_lock); |
| |
| LWLockAcquire(diskquota_locks.altered_reloid_cache_lock, LW_SHARED); |
| hash_seq_init(&iter, altered_reloid_cache); |
| while ((altered_reloid_entry = (Oid *)hash_seq_search(&iter)) != NULL) |
| { |
| bool found; |
| Oid altered_oid = *altered_reloid_entry; |
| if (OidIsValid(*altered_reloid_entry)) |
| { |
| active_table_entry = hash_search(local_active_table_stats_map, &altered_oid, HASH_ENTER, &found); |
| if (!found && active_table_entry) |
| { |
| active_table_entry->reloid = altered_oid; |
| /* We don't care segid and tablesize here. */ |
| active_table_entry->tablesize = 0; |
| active_table_entry->segid = -1; |
| } |
| } |
| hash_search(local_altered_reloid_cache, &altered_oid, HASH_ENTER, NULL); |
| } |
| LWLockRelease(diskquota_locks.altered_reloid_cache_lock); |
| |
| hash_seq_init(&iter, local_altered_reloid_cache); |
| while ((altered_reloid_entry = (Oid *)hash_seq_search(&iter)) != NULL) |
| { |
| if (OidIsValid(*altered_reloid_entry) && !is_relation_being_altered(*altered_reloid_entry)) |
| { |
| hash_search(local_altered_reloid_cache, altered_reloid_entry, HASH_REMOVE, NULL); |
| } |
| } |
| |
| LWLockAcquire(diskquota_locks.altered_reloid_cache_lock, LW_EXCLUSIVE); |
| hash_seq_init(&iter, altered_reloid_cache); |
| while ((altered_reloid_entry = (Oid *)hash_seq_search(&iter)) != NULL) |
| { |
| bool found; |
| Oid altered_reloid = *altered_reloid_entry; |
| hash_search(local_altered_reloid_cache, &altered_reloid, HASH_FIND, &found); |
| if (!found) |
| { |
| hash_search(altered_reloid_cache, &altered_reloid, HASH_REMOVE, NULL); |
| } |
| } |
| LWLockRelease(diskquota_locks.altered_reloid_cache_lock); |
| |
| /* |
| * If cannot convert relfilenode to relOid, put them back to shared memory |
| * and wait for the next check. |
| */ |
| if (hash_get_num_entries(local_active_table_file_map) > 0) |
| { |
| bool found; |
| DiskQuotaActiveTableFileEntry *entry; |
| |
| hash_seq_init(&iter, local_active_table_file_map); |
| LWLockAcquire(diskquota_locks.active_table_lock, LW_EXCLUSIVE); |
| while ((active_table_file_entry = (DiskQuotaActiveTableFileEntry *)hash_seq_search(&iter)) != NULL) |
| { |
| entry = hash_search(active_tables_map, active_table_file_entry, HASH_ENTER_NULL, &found); |
| if (entry) *entry = *active_table_file_entry; |
| } |
| LWLockRelease(diskquota_locks.active_table_lock); |
| } |
| hash_destroy(local_active_table_file_map); |
| hash_destroy(local_altered_reloid_cache); |
| return local_active_table_stats_map; |
| } |
| |
| /* |
| * Load table size info from diskquota.table_size table. |
| * This is called when system startup, disk quota rejectmap |
| * and other shared memory will be warmed up by table_size table. |
| */ |
| static void |
| load_table_size(HTAB *local_table_stats_map) |
| { |
| TupleDesc tupdesc; |
| int i; |
| bool found; |
| TableEntryKey key; |
| DiskQuotaActiveTableEntry *quota_entry; |
| SPIPlanPtr plan; |
| Portal portal; |
| char *sql = "select tableid, size, segid from diskquota.table_size"; |
| |
| if ((plan = SPI_prepare(sql, 0, NULL)) == NULL) |
| ereport(ERROR, (errmsg("[diskquota] SPI_prepare(\"%s\") failed", sql))); |
| if ((portal = SPI_cursor_open(NULL, plan, NULL, NULL, true)) == NULL) |
| ereport(ERROR, (errmsg("[diskquota] SPI_cursor_open(\"%s\") failed", sql))); |
| |
| SPI_cursor_fetch(portal, true, 10000); |
| |
| if (SPI_tuptable == NULL) |
| { |
| ereport(ERROR, (errmsg("[diskquota] load_table_size SPI_cursor_fetch failed"))); |
| } |
| |
| tupdesc = SPI_tuptable->tupdesc; |
| if (tupdesc->natts != 3 || ((tupdesc)->attrs[0]).atttypid != OIDOID || ((tupdesc)->attrs[1]).atttypid != INT8OID || |
| ((tupdesc)->attrs[2]).atttypid != INT2OID) |
| { |
| if (tupdesc->natts != 3) |
| { |
| ereport(WARNING, (errmsg("[diskquota] tupdesc->natts: %d", tupdesc->natts))); |
| } |
| else |
| { |
| ereport(WARNING, (errmsg("[diskquota] attrs: %d, %d, %d", tupdesc->attrs[0].atttypid, |
| tupdesc->attrs[1].atttypid, tupdesc->attrs[2].atttypid))); |
| } |
| ereport(ERROR, (errmsg("[diskquota] table \"table_size\" is corrupted in database \"%s\"," |
| " please recreate diskquota extension", |
| get_database_name(MyDatabaseId)))); |
| } |
| |
| while (SPI_processed > 0) |
| { |
| /* push the table oid and size into local_table_stats_map */ |
| for (i = 0; i < SPI_processed; i++) |
| { |
| HeapTuple tup = SPI_tuptable->vals[i]; |
| Datum dat; |
| Oid reloid; |
| int64 size; |
| int16 segid; |
| bool isnull; |
| |
| dat = SPI_getbinval(tup, tupdesc, 1, &isnull); |
| if (isnull) continue; |
| reloid = DatumGetObjectId(dat); |
| |
| dat = SPI_getbinval(tup, tupdesc, 2, &isnull); |
| if (isnull) continue; |
| size = DatumGetInt64(dat); |
| dat = SPI_getbinval(tup, tupdesc, 3, &isnull); |
| if (isnull) continue; |
| segid = DatumGetInt16(dat); |
| key.reloid = reloid; |
| key.segid = segid; |
| |
| quota_entry = (DiskQuotaActiveTableEntry *)hash_search(local_table_stats_map, &key, HASH_ENTER, &found); |
| quota_entry->reloid = reloid; |
| quota_entry->tablesize = size; |
| quota_entry->segid = segid; |
| } |
| SPI_freetuptable(SPI_tuptable); |
| SPI_cursor_fetch(portal, true, 10000); |
| } |
| |
| SPI_freetuptable(SPI_tuptable); |
| SPI_cursor_close(portal); |
| SPI_freeplan(plan); |
| } |
| |
| /* |
| * Convert a hash map with oids into a string array |
| * This function is used to prepare the second array parameter |
| * of function diskquota_fetch_table_stat. |
| */ |
| static StringInfoData |
| convert_map_to_string(HTAB *local_active_table_oid_maps) |
| { |
| HASH_SEQ_STATUS iter; |
| StringInfoData buffer; |
| DiskQuotaActiveTableEntry *entry; |
| uint32 count = 0; |
| uint32 nitems = hash_get_num_entries(local_active_table_oid_maps); |
| |
| initStringInfo(&buffer); |
| appendStringInfo(&buffer, "{"); |
| |
| hash_seq_init(&iter, local_active_table_oid_maps); |
| |
| while ((entry = (DiskQuotaActiveTableEntry *)hash_seq_search(&iter)) != NULL) |
| { |
| count++; |
| if (count != nitems) |
| { |
| appendStringInfo(&buffer, "%d,", entry->reloid); |
| } |
| else |
| { |
| appendStringInfo(&buffer, "%d", entry->reloid); |
| } |
| } |
| appendStringInfo(&buffer, "}"); |
| |
| return buffer; |
| } |
| |
| /* |
| * Get active table size from all the segments based on |
| * active table oid list. |
| * Function diskquota_fetch_table_stat is called to calculate |
| * the table size on the fly. |
| */ |
| static HTAB * |
| pull_active_list_from_seg(void) |
| { |
| CdbPgResults cdb_pgresults = {NULL, 0}; |
| int i, j; |
| char *sql = NULL; |
| HTAB *local_active_table_oid_map = NULL; |
| HASHCTL ctl; |
| DiskQuotaActiveTableEntry *entry; |
| |
| memset(&ctl, 0, sizeof(ctl)); |
| ctl.keysize = sizeof(Oid); |
| ctl.entrysize = sizeof(DiskQuotaActiveTableEntry); |
| ctl.hcxt = CurrentMemoryContext; |
| local_active_table_oid_map = diskquota_hash_create("local active table map with relfilenode info", 1024, &ctl, |
| HASH_ELEM | HASH_CONTEXT, DISKQUOTA_OID_HASH); |
| |
| /* first get all oid of tables which are active table on any segment */ |
| sql = "select * from diskquota.diskquota_fetch_table_stat(0, '{}'::oid[])"; |
| |
| /* any errors will be catch in upper level */ |
| CdbDispatchCommand(sql, DF_NONE, &cdb_pgresults); |
| for (i = 0; i < cdb_pgresults.numResults; i++) |
| { |
| Oid reloid; |
| bool found; |
| |
| PGresult *pgresult = cdb_pgresults.pg_results[i]; |
| |
| if (PQresultStatus(pgresult) != PGRES_TUPLES_OK) |
| { |
| cdbdisp_clearCdbPgResults(&cdb_pgresults); |
| ereport(ERROR, (errmsg("[diskquota] fetching active tables, encounter unexpected result from segment: %d", |
| PQresultStatus(pgresult)))); |
| } |
| |
| /* push the active table oid into local_active_table_oid_map */ |
| for (j = 0; j < PQntuples(pgresult); j++) |
| { |
| reloid = atooid(PQgetvalue(pgresult, j, 0)); |
| |
| entry = (DiskQuotaActiveTableEntry *)hash_search(local_active_table_oid_map, &reloid, HASH_ENTER, &found); |
| |
| if (!found) |
| { |
| entry->reloid = reloid; |
| entry->tablesize = 0; |
| entry->segid = -1; |
| } |
| } |
| } |
| cdbdisp_clearCdbPgResults(&cdb_pgresults); |
| |
| return local_active_table_oid_map; |
| } |
| |
| /* |
| * Get active table list from all the segments. |
| * Since when loading data, there is case where only subset for |
| * segment doing the real loading. As a result, the same table |
| * maybe active on some segments while not active on others. We |
| * haven't store the table size for each segment on master(to save |
| * memory), so when re-calculate the table size, we need to sum the |
| * table size on all of the segments. |
| */ |
| static void |
| pull_active_table_size_from_seg(HTAB *local_table_stats_map, char *active_oid_array) |
| { |
| CdbPgResults cdb_pgresults = {NULL, 0}; |
| StringInfoData sql_command; |
| int i; |
| int j; |
| |
| initStringInfo(&sql_command); |
| appendStringInfo(&sql_command, "select * from diskquota.diskquota_fetch_table_stat(1, '%s'::oid[])", |
| active_oid_array); |
| CdbDispatchCommand(sql_command.data, DF_NONE, &cdb_pgresults); |
| pfree(sql_command.data); |
| |
| SEGCOUNT = cdb_pgresults.numResults; |
| if (SEGCOUNT <= 0) |
| { |
| ereport(ERROR, (errmsg("[diskquota] there is no active segment, SEGCOUNT is %d", SEGCOUNT))); |
| } |
| |
| /* sum table size from each segment into local_table_stats_map */ |
| for (i = 0; i < cdb_pgresults.numResults; i++) |
| { |
| Size tableSize; |
| bool found; |
| Oid reloid; |
| int segId; |
| TableEntryKey key; |
| DiskQuotaActiveTableEntry *entry; |
| |
| PGresult *pgresult = cdb_pgresults.pg_results[i]; |
| |
| if (PQresultStatus(pgresult) != PGRES_TUPLES_OK) |
| { |
| cdbdisp_clearCdbPgResults(&cdb_pgresults); |
| ereport(ERROR, (errmsg("[diskquota] fetching active tables, encounter unexpected result from segment: %d", |
| PQresultStatus(pgresult)))); |
| } |
| |
| for (j = 0; j < PQntuples(pgresult); j++) |
| { |
| reloid = atooid(PQgetvalue(pgresult, j, 0)); |
| tableSize = (Size)atoll(PQgetvalue(pgresult, j, 1)); |
| key.reloid = reloid; |
| /* for diskquota extension version is 1.0, pgresult doesn't contain segid */ |
| if (PQnfields(pgresult) == 3) |
| { |
| /* get the segid, tablesize for each table */ |
| segId = atoi(PQgetvalue(pgresult, j, 2)); |
| key.segid = segId; |
| entry = (DiskQuotaActiveTableEntry *)hash_search(local_table_stats_map, &key, HASH_ENTER, &found); |
| |
| if (!found) |
| { |
| /* receive table size info from the first segment */ |
| entry->reloid = reloid; |
| entry->segid = segId; |
| } |
| entry->tablesize = tableSize; |
| } |
| |
| /* when segid is -1, the tablesize is the sum of tablesize of master and all segments */ |
| key.segid = -1; |
| entry = (DiskQuotaActiveTableEntry *)hash_search(local_table_stats_map, &key, HASH_ENTER, &found); |
| |
| if (!found) |
| { |
| /* receive table size info from the first segment */ |
| entry->reloid = reloid; |
| entry->tablesize = tableSize; |
| entry->segid = -1; |
| } |
| else |
| { |
| /* sum table size from all the segments */ |
| entry->tablesize = entry->tablesize + tableSize; |
| } |
| } |
| } |
| cdbdisp_clearCdbPgResults(&cdb_pgresults); |
| return; |
| } |