blob: 8be3a08957b4066e7ad640ce7bc16e6094eed2aa [file] [log] [blame]
/*
* Licensed to the Apache Software Foundation (ASF) under one
* or more contributor license agreements. See the NOTICE file
* distributed with this work for additional information
* regarding copyright ownership. The ASF licenses this file
* to you under the Apache License, Version 2.0 (the
* "License"); you may not use this file except in compliance
* with the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing,
* software distributed under the License is distributed on an
* "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
* KIND, either express or implied. See the License for the
* specific language governing permissions and limitations
* under the License.
*/
#include "postgres.h"
#include "miscadmin.h"
#include "access/read_cache.h"
#include "cdb/cdbvars.h"
#include "optimizer/clauses.h"
#include "optimizer/newPlanner.h"
#include "storage/shmem.h"
#include "utils/hsearch.h"
#include "executor/cwrapper/cached-result.h"
#define READ_CACHE_SIZE (102400)
// oid->revision num
static HTAB *ReadCacheHash = NULL;
typedef struct ReadCacheHashData {
Oid relid;
int64_t revisionNum;
} ReadCacheHashData;
static HTAB *ReadCacheHashEntryPendingRevise = NULL;
typedef struct ReadCacheHashEntryPendingReviseData {
Oid relid;
bool drop;
int64_t count;
} ReadCacheHashEntryPendingReviseData;
static int64_t ReadCacheLookupHashEntry(Oid relid);
void InitReadCache(void) {
HASHCTL info;
int hash_flags;
MemSet(&info, 0, sizeof(info));
info.keysize = sizeof(Oid);
info.entrysize = sizeof(ReadCacheHashData);
info.hash = tag_hash;
hash_flags = (HASH_ELEM | HASH_FUNCTION);
ReadCacheHash = ShmemInitHash("Read Cache", READ_CACHE_SIZE, READ_CACHE_SIZE,
&info, hash_flags);
if (!ReadCacheHash)
ereport(FATAL, (errcode(ERRCODE_OUT_OF_MEMORY),
errmsg("not enough shared memory for read cache init")));
CacheFileCleanUp();
}
Size ReadCacheShmemSize(void) {
return hash_estimate_size((Size)READ_CACHE_SIZE, sizeof(ReadCacheHashData));
}
void ReadCacheHashEntryReviseOnCommit(Oid relid, bool dropTable) {
if (Gp_role != GP_ROLE_DISPATCH) return;
if (NULL == ReadCacheHashEntryPendingRevise) {
HASHCTL info;
int hash_flags;
MemSet(&info, 0, sizeof(info));
info.keysize = sizeof(Oid);
info.entrysize = sizeof(ReadCacheHashEntryPendingReviseData);
info.hash = tag_hash;
hash_flags = (HASH_ELEM | HASH_FUNCTION);
ReadCacheHashEntryPendingRevise =
hash_create("ReadCacheHashEntryPendingRevise", 10, &info, hash_flags);
if (!ReadCacheHashEntryPendingRevise)
elog(ERROR, "failed to create ReadCacheHashEntryPendingRevise");
}
bool found;
ReadCacheHashEntryPendingReviseData *node =
(ReadCacheHashEntryPendingReviseData *)hash_search(
ReadCacheHashEntryPendingRevise, (void *)&relid, HASH_ENTER, &found);
if (found) {
if (dropTable) node->drop = true;
++node->count;
} else {
node->relid = relid;
node->drop = false;
node->count = 1;
}
}
void AtEOXact_ReadCache(bool commit) {
if (Gp_role != GP_ROLE_DISPATCH) return;
if (ReadCacheHashEntryPendingRevise) {
if (commit && hash_get_num_entries(ReadCacheHashEntryPendingRevise) > 0) {
ReadCacheHashEntryPendingReviseData *pending;
HASH_SEQ_STATUS status;
hash_seq_init(&status, ReadCacheHashEntryPendingRevise);
LWLockAcquire(ReadCacheLock, LW_EXCLUSIVE);
while ((pending = (ReadCacheHashEntryPendingReviseData *)hash_seq_search(
&status)) != NULL) {
bool found;
if (pending->drop) {
hash_search(ReadCacheHash, (void *)&pending->relid, HASH_REMOVE,
&found);
} else {
ReadCacheHashData *entry = (ReadCacheHashData *)hash_search(
ReadCacheHash, (void *)&pending->relid, HASH_ENTER_NULL, &found);
if (!entry)
ereport(
FATAL,
(errcode(ERRCODE_OUT_OF_MEMORY),
errmsg("not enough shared memory for read cache add entry")));
if (found) {
entry->revisionNum += pending->count;
} else {
entry->revisionNum = pending->count;
}
}
}
LWLockRelease(ReadCacheLock);
}
hash_destroy(ReadCacheHashEntryPendingRevise);
ReadCacheHashEntryPendingRevise = NULL;
}
}
void resetReadCache(bool enable) {
if (enable_secure_filesystem || !magma_cache_read) enable = false;
CachedResultReset(MyCachedResult, enable);
}
void initReadCache(PlannedStmt *plannedStmt, const char *sourceText) {
if (magma_cache_read && CachedResultSafeGuard(MyCachedResult)) {
// check the relation oid the plan depends on
bool toContinue = (plannedStmt->relationOids != NIL);
ListCell *cell = NULL;
foreach (cell, plannedStmt->relationOids) {
if (!RelationIsMagmaTable2(lfirst_oid(cell))) {
toContinue = false;
break;
}
}
if (toContinue) {
CommonPlanContext ctx;
ctx.base.node = (Node *)plannedStmt;
toContinue = !plan_tree_walker((Node *)plannedStmt->planTree,
check_volatile_functions, &ctx);
}
if (toContinue) {
outfast_workfile_mgr_init(plannedStmt->rtable);
int32_t len1 = -1;
char *str1 = nodeToBinaryStringFast((Node *)plannedStmt->planTree, &len1);
int32_t len2 = -1;
char *str2 = nodeToBinaryStringFast((Node *)plannedStmt->subplans, &len2);
int32_t len3 = -1;
char *str3 = nodeToBinaryStringFast((Node *)plannedStmt->rtable, &len3);
outfast_workfile_mgr_end();
int32_t relNum = list_length(plannedStmt->relationOids);
int64_t *relids = (int64_t *)palloc(relNum * sizeof(int64_t));
int64_t *revisions = (int64_t *)palloc(relNum * sizeof(int64_t));
LWLockAcquire(ReadCacheLock, LW_SHARED);
int32_t i = 0;
foreach (cell, plannedStmt->relationOids) {
Oid relid = lfirst_oid(cell);
relids[i] = relid;
revisions[i] = ReadCacheLookupHashEntry(relid);
++i;
}
LWLockRelease(ReadCacheLock);
CachedResultInit(MyCachedResult, GetCurrentTransactionId(), relids,
revisions, relNum, sourceText, str1, len1, str2, len2,
str3, len3);
pfree(relids);
pfree(revisions);
} else {
CachedResultReset(MyCachedResult, false);
}
}
}
bool readCacheEnabled() {
return enableOushuDbExtensiveFeatureSupport() ?
CachedResultIsRead(MyCachedResult) : false;
}
bool readCacheEof() { return CachedResultReadEof(MyCachedResult); }
char *readCache(int32_t *len) { return CachedResultRead(MyCachedResult, len); }
bool writeCacheEnabled() {
return enableOushuDbExtensiveFeatureSupport() ?
CachedResultIsWrite(MyCachedResult) : false;
}
void writeCache(const char *buf, int32_t size) {
CachedResultWrite(MyCachedResult, buf, size);
}
void commitReadCache() {
if (writeCacheEnabled()) {
CachedResultEndWrite(MyCachedResult);
}
resetReadCache(false);
}
static int64_t ReadCacheLookupHashEntry(Oid relid) {
int64_t revisionNum = 0;
bool found;
LWLockAcquire(ReadCacheLock, LW_SHARED);
ReadCacheHashData *entry = (ReadCacheHashData *)hash_search(
ReadCacheHash, (void *)&relid, HASH_FIND, &found);
LWLockRelease(ReadCacheLock);
if (entry) revisionNum = entry->revisionNum;
if (ReadCacheHashEntryPendingRevise) {
ReadCacheHashEntryPendingReviseData *node =
(ReadCacheHashEntryPendingReviseData *)hash_search(
ReadCacheHashEntryPendingRevise, (void *)&relid, HASH_FIND, &found);
if (node) {
revisionNum += node->count;
revisionNum *= -1;
}
}
return revisionNum;
}