blob: 7f34d46943d54c3ed0b1b71e49fc58e89570023b [file]
/*-------------------------------------------------------------------------
*
* directorycmds.c
* directory table creation/manipulation commands
*
* Portions Copyright (c) 1996-2021, PostgreSQL Global Development Group
*
*
* IDENTIFICATION
* src/backend/commands/directorycmds.c
*
*-------------------------------------------------------------------------
*/
#include "postgres.h"
#include <sys/stat.h>
#include "access/htup_details.h"
#include "access/reloptions.h"
#include "access/table.h"
#include "access/xact.h"
#include "catalog/catalog.h"
#include "catalog/dependency.h"
#include "catalog/heap.h"
#include "catalog/index.h"
#include "catalog/indexing.h"
#include "catalog/objectaccess.h"
#include "catalog/oid_dispatch.h"
#include "catalog/pg_collation.h"
#include "catalog/pg_directory_table.h"
#include "catalog/pg_opclass.h"
#include "catalog/pg_proc.h"
#include "catalog/pg_tablespace.h"
#include "catalog/pg_type.h"
#include "catalog/pg_user_mapping.h"
#include "catalog/storage_directory_table.h"
#include "cdb/cdbdisp_query.h"
#include "cdb/cdbdispatchresult.h"
#include "cdb/cdboidsync.h"
#include "cdb/cdbvars.h"
#include "commands/defrem.h"
#include "commands/dirtablecmds.h"
#include "commands/tablespace.h"
#include "miscadmin.h"
#include "nodes/makefuncs.h"
#include "parser/parse_func.h"
#include "storage/ufile.h"
#include "tcop/utility.h"
#include "utils/acl.h"
#include "utils/builtins.h"
#include "utils/faultinjector.h"
#include "utils/lsyscache.h"
#include "utils/rel.h"
#include "utils/syscache.h"
#include "libpq-fe.h"
#include "cdb/cdbdisp_query.h"
#include "cdb/cdbdispatchresult.h"
#include "cdb/cdbvars.h"
#include "funcapi.h"
typedef struct TableFunctionContext
{
Relation relation;
TableScanDesc scanDesc;
TupleTableSlot *slot;
DirectoryTable *dirTable;
} TableFunctionContext;
Datum directory_table(PG_FUNCTION_ARGS);
static Oid
chooseTableSpace(CreateDirectoryTableStmt *stmt)
{
Oid tablespaceId = InvalidOid;
/*
* Select tablespace to use: an explicitly indicated one, or (in the case
* of a partitioned table) the parent's, if it has one.
*/
if (stmt->tablespacename)
{
/*
* Tablespace specified on the command line, or was passed down by
* dispatch.
*/
tablespaceId = get_tablespace_oid(stmt->tablespacename, false);
}
/* still nothing? use the default */
if (!OidIsValid(tablespaceId))
tablespaceId = GetDefaultTablespace(stmt->base.relation->relpersistence, false);
if (!OidIsValid(tablespaceId))
tablespaceId = tablespaceId ? tablespaceId : MyDatabaseTableSpace;
/* Check permissions except when using database's default */
if (OidIsValid(tablespaceId) && tablespaceId != MyDatabaseTableSpace)
{
AclResult aclresult;
aclresult = pg_tablespace_aclcheck(tablespaceId, GetUserId(),
ACL_CREATE);
if (aclresult != ACLCHECK_OK)
aclcheck_error(aclresult, OBJECT_TABLESPACE,
get_tablespace_name(tablespaceId));
}
/* In all cases disallow placing user relations in pg_global */
if (tablespaceId == GLOBALTABLESPACE_OID)
ereport(ERROR,
(errcode(ERRCODE_INVALID_PARAMETER_VALUE),
errmsg("only shared relations can be placed in pg_global tablespace")));
return tablespaceId;
}
void
CreateDirectoryTable(CreateDirectoryTableStmt *stmt, Oid relId)
{
Relation dirRelation;
Datum values[Natts_pg_directory_table];
bool nulls[Natts_pg_directory_table];
HeapTuple tuple;
char *dirTablePath;
Oid spcId = chooseTableSpace(stmt);
if (stmt->location)
{
if (spcId == InvalidOid ||
spcId == DEFAULTTABLESPACE_OID)
dirTablePath = psprintf("base/%s", stmt->location);
else if (spcId == GLOBALTABLESPACE_OID)
dirTablePath = psprintf("global/%s", stmt->location);
else
dirTablePath = psprintf("pg_tblspc/%s", stmt->location);
}
else
{
RelFileNode relFileNode = {0};
relFileNode.spcNode = spcId;
relFileNode.dbNode = MyDatabaseId;
dirTablePath = UFileFormatPathName(relId, &relFileNode);
}
/*
* We will check whether the directory path has existed only when use the local file am.
*/
if (UFileExists(spcId, dirTablePath) && GetTablespaceFileHandler(spcId) == &localFileAm)
ereport(ERROR,
(errcode(ERRCODE_DUPLICATE_OBJECT),
errmsg("directory table path \"%s\" already exists",
dirTablePath)));
/*
* Acquire DirectoryTableLock to ensure that no DROP DIRECTORY TABLE
* or CREATE DIRECTORY TABLE is running concurrently.
*/
LWLockAcquire(DirectoryTableLock, LW_EXCLUSIVE);
UFileEnsurePath(spcId, dirTablePath);
/*
* Advance command counter to ensure the pg_attribute tuple is visible;
* the tuple might be updated to add constraints in previous step.
*/
CommandCounterIncrement();
dirRelation = table_open(DirectoryTableRelationId, RowExclusiveLock);
/*
* Insert tuple into pg_directory_table.
*/
memset(values, 0, sizeof(values));
memset(nulls, false, sizeof(nulls));
values[Anum_pg_directory_table_dttablespace - 1] = spcId;
values[Anum_pg_directory_table_dtrelid - 1] = ObjectIdGetDatum(relId);
values[Anum_pg_directory_table_dtlocation - 1] = CStringGetTextDatum(dirTablePath);
tuple = heap_form_tuple(dirRelation->rd_att, values, nulls);
CatalogTupleInsert(dirRelation, tuple);
/* Add dependency with tablespace */
recordDependencyOnTablespace(RelationRelationId, relId, spcId);
heap_freetuple(tuple);
table_close(dirRelation, RowExclusiveLock);
LWLockRelease(DirectoryTableLock);
}
static Datum
getFileContent(Oid spcId, char *scopedFileUrl)
{
char errorMessage[256];
char buffer[4096];
char *data;
int curPos = 0;
int bytesRead;
bytea *content;
UFile *file;
int64_t fileSize;
file = UFileOpen(spcId,
scopedFileUrl,
O_RDONLY,
errorMessage,
sizeof(errorMessage));
if (file == NULL)
ereport(ERROR,
(errcode(ERRCODE_INTERNAL_ERROR),
errmsg("failed to open file \"%s\": %s,", scopedFileUrl, errorMessage)));
fileSize = UFileSize(file);
if (fileSize > MaxAllocSize - VARHDRSZ)
ereport(ERROR,
(errcode(ERRCODE_OUT_OF_MEMORY),
errmsg("out of memory")));
content = (bytea *) palloc(fileSize + VARHDRSZ);
if (content == NULL)
ereport(ERROR,
(errcode(ERRCODE_OUT_OF_MEMORY),
errmsg("out of memory"),
errdetail("Failed to allocate %ld bytes memory.", fileSize)));
SET_VARSIZE(content, fileSize + VARHDRSZ);
data = VARDATA(content);
while (true)
{
bytesRead = UFileRead(file, buffer, sizeof(buffer));
if (bytesRead == -1)
ereport(ERROR,
(errcode(ERRCODE_INTERNAL_ERROR),
errmsg("failed to read file \"%s\": %s", scopedFileUrl, UFileGetLastError(file))));
if (bytesRead == 0)
break;
memcpy(data + curPos, buffer, bytesRead);
curPos += bytesRead;
}
int ret = UFileClose(file);
if (ret < 0)
ereport(ERROR,
(errcode(ERRCODE_INTERNAL_ERROR),
errmsg("failed to close the file \"%s\": %s", scopedFileUrl, UFileGetLastError(file))));
pfree(file);
PG_RETURN_BYTEA_P(content);
}
static char *
getScopedFileUrl(DirectoryTable *dirTable, char *relativePath)
{
return psprintf("%s/%s", dirTable->location, relativePath);
}
Datum
directory_table(PG_FUNCTION_ARGS)
{
#define DIRECTORY_TABLE_FUNCTION_COLUMNS 7
Oid relId = PG_GETARG_OID(0);
Datum values[DIRECTORY_TABLE_FUNCTION_COLUMNS];
bool nulls[DIRECTORY_TABLE_FUNCTION_COLUMNS];
HeapTuple tuple;
Datum result;
FuncCallContext *funcCtx;
TableFunctionContext *context;
if (SRF_IS_FIRSTCALL())
{
Snapshot snapshot;
TupleDesc newTupDesc;
MemoryContext oldContext;
funcCtx = SRF_FIRSTCALL_INIT();
oldContext = MemoryContextSwitchTo(funcCtx->multi_call_memory_ctx);
newTupDesc = CreateTemplateTupleDesc(DIRECTORY_TABLE_FUNCTION_COLUMNS);
TupleDescInitEntry(newTupDesc, (AttrNumber) 1, "scoped_file_url", TEXTOID, -1, 0);
TupleDescInitEntry(newTupDesc, (AttrNumber) 2, "relative_path", TEXTOID, -1, 0);
TupleDescInitEntry(newTupDesc, (AttrNumber) 3, "tag", TEXTOID, -1, 0);
TupleDescInitEntry(newTupDesc, (AttrNumber) 4, "size", INT8OID, -1, 0);
TupleDescInitEntry(newTupDesc, (AttrNumber) 5, "last_modified", TIMESTAMPTZOID, -1, 0);
TupleDescInitEntry(newTupDesc, (AttrNumber) 6, "md5", TEXTOID, -1, 0);
TupleDescInitEntry(newTupDesc, (AttrNumber) 7, "content", BYTEAOID, -1, 0);
funcCtx->tuple_desc = BlessTupleDesc(newTupDesc);
context = (TableFunctionContext *) palloc0(sizeof(TableFunctionContext));
context->relation = table_open(relId, AccessShareLock);
SIMPLE_FAULT_INJECTOR("directory_table_inject");
if (!RelationIsDirectoryTable(relId))
ereport(ERROR,
(errcode(ERRCODE_INVALID_PARAMETER_VALUE),
errmsg("'%s' is not a directory table",
RelationGetRelationName(context->relation))));
context->slot = MakeSingleTupleTableSlot(RelationGetDescr(context->relation),
table_slot_callbacks(context->relation));
context->dirTable = GetDirectoryTable(RelationGetRelid(context->relation));
snapshot = GetLatestSnapshot();
context->scanDesc = table_beginscan(context->relation, snapshot, 0, NULL);
funcCtx->user_fctx = (void *) context;
MemoryContextSwitchTo(oldContext);
}
funcCtx = SRF_PERCALL_SETUP();
context = (TableFunctionContext *) funcCtx->user_fctx;
if (table_scan_getnextslot(context->scanDesc, ForwardScanDirection, context->slot))
{
Datum attrRelativePath;
Datum attrSize;
Datum attrLastModified;
Datum attrMd5Sum;
Datum attrTags;
bool isNull;
char *scopedFileUrl;
slot_getallattrs(context->slot);
attrRelativePath = slot_getattr(context->slot, 1, &isNull);
Assert(isNull == false);
attrSize = slot_getattr(context->slot, 2, &isNull);
Assert(isNull == false);
attrLastModified = slot_getattr(context->slot, 3, &isNull);
Assert(isNull == false);
attrMd5Sum = slot_getattr(context->slot, 4, &isNull);
Assert(isNull == false);
attrTags = slot_getattr(context->slot, 5, &isNull);
MemSet(values, 0, sizeof(values));
MemSet(nulls, 0, sizeof(nulls));
scopedFileUrl = getScopedFileUrl(context->dirTable, TextDatumGetCString(attrRelativePath));
values[0] = PointerGetDatum(cstring_to_text(scopedFileUrl));
values[1] = attrRelativePath;
values[2] = attrTags;
nulls[2] = isNull;
values[3] = attrSize;
values[4] = attrLastModified;
values[5] = attrMd5Sum;
values[6] = getFileContent(context->dirTable->spcId, scopedFileUrl);
tuple = heap_form_tuple(funcCtx->tuple_desc, values, nulls);
result = HeapTupleGetDatum(tuple);
SRF_RETURN_NEXT(funcCtx, result);
}
table_endscan(context->scanDesc);
ExecDropSingleTupleTableSlot(context->slot);
table_close(context->relation, AccessShareLock);
pfree(context);
funcCtx->user_fctx = NULL;
SRF_RETURN_DONE(funcCtx);
}
Datum
remove_file_segment(PG_FUNCTION_ARGS)
{
Relation relation;
ScanKeyData scankey;
SysScanDesc scan;
HeapTuple tuple;
Oid indexOid;
List *indexOids;
Oid relId;
char *relativePath;
char *fullPathName;
bool exist = false;
DirectoryTable *dirTable;
if (PG_ARGISNULL(0))
ereport(ERROR,
(errcode(ERRCODE_INVALID_PARAMETER_VALUE),
errmsg("relation name cannot be NULL")));
relId = PG_GETARG_OID(0);
if (PG_ARGISNULL(1))
ereport(ERROR,
(errcode(ERRCODE_INVALID_PARAMETER_VALUE),
errmsg("file path cannot be null")));
relativePath = PG_GETARG_CSTRING(1);
dirTable = GetDirectoryTable(relId);
relation = table_open(relId, AccessExclusiveLock);
SIMPLE_FAULT_INJECTOR("remove_file_inject");
indexOids = RelationGetIndexList(relation);
Assert(list_length(indexOids) == 1);
indexOid = list_nth_oid(indexOids, 0);
ScanKeyInit(&scankey,
1, /* relative_path */
BTEqualStrategyNumber, F_TEXTEQ,
CStringGetTextDatum(relativePath));
scan = systable_beginscan(relation, indexOid, true, NULL, 1, &scankey);
tuple = systable_getnext(scan);
if (HeapTupleIsValid(tuple))
{
CatalogTupleDelete(relation, &tuple->t_self);
fullPathName = psprintf("%s/%s", dirTable->location, relativePath);
UFileAddPendingDelete(relation, dirTable->spcId, fullPathName, true);
exist = true;
}
systable_endscan(scan);
list_free(indexOids);
table_close(relation, NoLock);
PG_RETURN_BOOL(exist);
}
Datum
remove_file(PG_FUNCTION_ARGS)
{
Relation relation;
Oid relId;
char *relativePath;
char *query;
int numDeletes;
int i;
CdbPgResults cdbPgresults = {NULL, 0};
if (PG_ARGISNULL(0))
ereport(ERROR,
(errcode(ERRCODE_INVALID_PARAMETER_VALUE),
errmsg("relation name cannot be NULL")));
relId = PG_GETARG_OID(0);
if (PG_ARGISNULL(1))
ereport(ERROR,
(errcode(ERRCODE_INVALID_PARAMETER_VALUE),
errmsg("file path cannot be null")));
relativePath = PG_GETARG_CSTRING(1);
relation = table_open(relId, AccessExclusiveLock);
if (Gp_role != GP_ROLE_DISPATCH)
ereport(ERROR,
(errcode(ERRCODE_GP_COMMAND_ERROR),
errmsg("remove_file() could only be called on QD")));
query = psprintf("select pg_catalog.remove_file_segment(%u, '%s')", relId, relativePath);
CdbDispatchCommand(query, DF_WITH_SNAPSHOT | DF_CANCEL_ON_ERROR, &cdbPgresults);
numDeletes = 0;
for (i = 0; i < cdbPgresults.numResults; i++)
{
int ntuples, nfields;
Datum value;
struct pg_result *pgresult = cdbPgresults.pg_results[i];
ExecStatusType status = PQresultStatus(pgresult);
if (status != PGRES_TUPLES_OK)
{
cdbdisp_clearCdbPgResults(&cdbPgresults);
ereport(ERROR,
(errmsg("unexpected result from segment: %d", status)));
}
ntuples = PQntuples(pgresult);
nfields = PQnfields(pgresult);
if (ntuples != 1 || nfields != 1)
{
cdbdisp_clearCdbPgResults(&cdbPgresults);
ereport(ERROR,
(errmsg("unexpected shape of result from segment (%d rows, %d cols)",
ntuples, nfields)));
}
if (PQgetisnull(pgresult, 0, 0))
value = 0;
else
value = DirectFunctionCall1(boolin,
CStringGetDatum(PQgetvalue(pgresult, 0, 0)));
numDeletes += value;
}
cdbdisp_clearCdbPgResults(&cdbPgresults);
table_close(relation, NoLock);
PG_RETURN_BOOL(numDeletes > 0);
}