blob: 167af2af9782ed43fbd3ce25b0aa6ea57129aa4d [file] [log] [blame]
/*
* Licensed to the Apache Software Foundation (ASF) under one
* or more contributor license agreements. See the NOTICE file
* distributed with this work for additional information
* regarding copyright ownership. The ASF licenses this file
* to you under the Apache License, Version 2.0 (the
* "License"); you may not use this file except in compliance
* with the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing,
* software distributed under the License is distributed on an
* "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
* KIND, either express or implied. See the License for the
* specific language governing permissions and limitations
* under the License.
*/
/*-------------------------------------------------------------------------
*
* cdbquerydispatchingcontext.c
*
*-------------------------------------------------------------------------
*/
#include "postgres.h"
#include <sys/fcntl.h>
#include "access/genam.h"
#include "access/heapam.h"
#include "access/pxfuriparser.h"
#include "access/parquetsegfiles.h"
#include "access/tuptoaster.h"
#include "catalog/aoseg.h"
#include "catalog/catalog.h"
#include "catalog/catquery.h"
#include "catalog/pg_amop.h"
#include "catalog/pg_amproc.h"
#include "catalog/pg_aggregate.h"
#include "catalog/pg_authid.h"
#include "catalog/pg_language.h"
#include "catalog/pg_type.h"
#include "catalog/pg_authid.h"
#include "catalog/pg_attrdef.h"
#include "catalog/pg_attribute_encoding.h"
#include "catalog/pg_constraint.h"
#include "catalog/pg_exttable.h"
#include "catalog/pg_magic_oid.h"
#include "catalog/pg_namespace.h"
#include "catalog/pg_opclass.h"
#include "catalog/pg_operator.h"
#include "catalog/pg_proc.h"
#include "catalog/pg_tablespace.h"
#include "catalog/toasting.h"
#include "cdb/cdbdispatchedtablespaceinfo.h"
#include "cdb/cdbfilesystemcredential.h"
#include "cdb/cdbpartition.h"
#include "cdb/cdbquerycontextdispatching.h"
#include "cdb/cdbvars.h"
#include "commands/dbcommands.h"
#include "libpq/pqformat.h"
#include "miscadmin.h"
#include "optimizer/prep.h"
#include "storage/fd.h"
#include "utils/acl.h"
#include "utils/builtins.h"
#include "utils/fmgroids.h"
#include "utils/inval.h"
#include "utils/lsyscache.h"
#include "utils/memutils.h"
#include "utils/syscache.h"
#include "utils/uri.h"
/**
* all opened file on shared storage will be removed after transaction terminated.
*/
struct DropListItemData
{
char *path;
struct DropListItemData *next;
};
typedef struct DropListItemData DropListItemData;
typedef struct DropListItemData *DropListItem;
static DropListItem DropList = NULL;
int QueryContextDispatchingSizeMemoryLimit = 100 * 1024; /* KB */
/*
* |------type id : 1 byte---------|
* |---content length : 4 byte-----|
* |---variable length content-----|
*/
/*
* |------type id : 1 byte---------|
* |-----fixed length content -----|
*/
enum QueryContextDispatchingItemType
{
MasterXid, TablespaceLocation, TupleType, EmptyTable, FileSystemCredential, Namespace
};
typedef enum QueryContextDispatchingItemType QueryContextDispatchingItemType;
enum QueryContextDispatchingObjType
{
RelationType,
TablespaceType,
NamespaceType,
ProcType,
AggType,
LangType,
TypeType,
OpType,
OpClassType
};
typedef enum QueryContextDispatchingObjType QueryContextDispatchingObjType;
struct QueryContextDispatchingHashKey
{
Oid objid;
QueryContextDispatchingObjType type;
};
typedef struct QueryContextDispatchingHashKey QueryContextDispatchingHashKey;
struct QueryContextDispatchingHashEntry
{
QueryContextDispatchingHashKey key;
Oid aoseg_relid; /* pg_aoseg_* or pg_aocsseg_* relid */
/* aoseg_index_relid in future */
};
typedef struct QueryContextDispatchingHashEntry QueryContextDispatchingHashEntry;
static void
prepareDispatchedCatalogAuthUser(QueryContextInfo *ctx, Oid userOid);
static void
prepareDispatchedCatalogFunction(QueryContextInfo *ctx, Oid procOid);
static void
prepareDispatchedCatalogLanguage(QueryContextInfo *ctx, Oid langOid);
static void
prepareDispatchedCatalogOpClassForType(QueryContextInfo *ctx, Oid typeOid);
static void
prepareDispatchedCatalogType(QueryContextInfo *ctx, Oid typeOid);
static void
prepareDispatchedCatalogOperator(QueryContextInfo *ctx, Oid opOid);
static void
prepareDispatchedCatalogOpClass(QueryContextInfo *ctx, Oid opclass);
static void
prepareDispatchedCatalogAggregate(QueryContextInfo *ctx, Oid aggfuncoid);
static void
prepareDispatchedCatalogFunctionExpr(QueryContextInfo *cxt, Expr *expr);
static void
prepareDispatchedCatalogForAoParquet(QueryContextInfo *cxt, Oid relid,
bool forInsert, List *segnos);
static void
prepareAllDispatchedCatalogTablespace(QueryContextInfo *ctx);
static void
prepareDispatchedCatalogExternalTable(QueryContextInfo *cxt, Oid relid);
static void
prepareDfsAddressForDispatch(QueryContextInfo* cxt);
static void
addOperatorToDispatchContext(QueryContextInfo *ctx, Oid opOid);
static void
WriteData(QueryContextInfo *cxt, const char *buffer, int size);
static void GetExtTableLocationsArray(HeapTuple tuple, TupleDesc exttab_desc,
Datum **array, int *array_size);
static char*
GetExtTableFirstLocation(Datum *array);
static void
AddFileSystemCredentialForPxfTable();
/**
* construct the file location for query context dispatching.
*/
static void
GetQueryContextDipsatchingFileLocation(char **buffer)
{
int len;
static int count = 0;
char *path = NULL, *pos;
Oid tablespace = get_database_dts(MyDatabaseId);
GetFilespacePathForTablespace(tablespace, &path);
if (NULL == path)
{
elog(ERROR, "cannot get distributed table space location for current database,"
" table space oid is %u",
tablespace);
}
pos = path + strlen(path) - 1;
*pos = '\0';
len = strlen(path);
len += 64;
*buffer = palloc(len);
snprintf(*buffer, len, "%s-1/session_%d_id_%d_query_context", path, gp_session_id, ++count);
pfree(path);
}
/*
* create a QueryContextInfo and fill the location of file on shared storage.
*
* also add auxiliary information such as master's transaction into QueryContextInfo.
*/
QueryContextInfo *
CreateQueryContextInfo(void)
{
QueryContextInfo *retval = makeNode(QueryContextInfo);
retval->htab = createPrepareDispatchedCatalogRelationDisctinctHashTable();
GetQueryContextDipsatchingFileLocation(&retval->sharedPath);
AddAuxInfoToQueryContextInfo(retval);
retval->finalized = false;
return retval;
}
/*
* On QE, construct a QueryContextInfo from a file on shared storage.
*/
void
InitQueryContextInfoFromFile(QueryContextInfo *cxt)
{
Assert(NULL != cxt->sharedPath);
cxt->file = PathNameOpenFile(cxt->sharedPath, O_RDONLY, 0);
if (cxt->file < 0)
{
ereport(ERROR,
(errcode_for_file_access(),
errmsg("cannot open file %s %m", cxt->sharedPath),
errdetail("%s", HdfsGetLastError())));
}
}
/*
* close a QueryContextInfo and close its file if it is opened.
*/
void
FinalizeQueryContextInfo(QueryContextInfo *cxt)
{
Assert(cxt);
Assert(!cxt->finalized);
if (Debug_querycontext_print && Gp_role != GP_ROLE_EXECUTE)
{
int size = cxt->cursor;
if (cxt->useFile)
{
size = FileNonVirtualTell(cxt->file);
if (size < 0)
{
ereport(ERROR,
(errcode_for_file_access(),
errmsg("failed to get file point position"),
errdetail("%s", HdfsGetLastError())));
}
}
elog(LOG, "query context size: %d bytes, passed by %s",
size, (cxt->useFile? "shared storage" : "dispatching"));
}
if (cxt->htab)
hash_destroy(cxt->htab);
if (enable_secure_filesystem && Gp_role != GP_ROLE_EXECUTE)
{
int size;
char *credential;
StringInfoData buffer;
initStringInfo(&buffer);
credential = serialize_filesystem_credentials(&size);
if (credential && size > 0)
{
pq_sendint(&buffer, (int) FileSystemCredential, sizeof(char));
pq_sendint(&buffer, size, sizeof(int));
WriteData(cxt, buffer.data, buffer.len);
WriteData(cxt, credential, size);
}
}
if (cxt->useFile)
{
FileClose(cxt->file);
cxt->file = 0;
}
cxt->finalized = true;
}
/*
* cleanup and destory a QueryContextInfo
*/
void
DropQueryContextInfo(QueryContextInfo *cxt)
{
Assert(NULL != cxt);
if (TestFileValid(cxt->file))
FileClose(cxt->file);
if (cxt->sharedPath)
pfree(cxt->sharedPath);
if (cxt->buffer)
pfree(cxt->buffer);
if (cxt->errTblOid)
list_free(cxt->errTblOid);
pfree(cxt);
}
/*
* add a file to drop list.
*/
static void
AddToDropList(const char *path)
{
MemoryContext old;
old = MemoryContextSwitchTo(TopTransactionContext);
DropListItem p = palloc(sizeof(DropListItemData));
p->path = palloc(strlen(path) + 1);
strcpy(p->path, path);
p->next = DropList;
DropList = p;
MemoryContextSwitchTo(old);
}
/*
* called when transaction commit/rollback to drop all pending files.
*/
void
AtEOXact_QueryContext(void)
{
DropListItem p, q;
for (p = DropList; NULL != p;)
{
Assert(p->path);
RemovePath(p->path, FALSE);
q = p;
p = p->next;
pfree(q->path);
pfree(q);
}
DropList = NULL;
}
/*
* read from file until get expected size of bytes or reach end of file.
* return TRUE if read successfully, return FALSE when reach end of file.
*
* report error if reach end of file and errorOnEof is true.
*/
static bool
ReadFully(File file, const char *path, char *buffer, int size,
bool errorOnEof)
{
int done, todo = size;
while (todo > 0)
{
done = FileRead(file, buffer + size - todo, todo);
if (0 == done && errorOnEof)
{
ereport(ERROR,
(errcode(ERRCODE_IO_ERROR),
errmsg( "rebuild query context failed:" " can not read data since unexpected eof")));
}
else if (0 == done)
{
return FALSE;
}
else if (done < 0)
{
ereport(ERROR,
(errcode(ERRCODE_IO_ERROR),
errmsg( "rebuild query context failed:" " can not read data from %s", path),
errdetail("%s", HdfsGetLastError())));
}
todo -= done;
}
return TRUE;
}
/*
* read from QueryContextInfo until get expected size of bytes or reach end of file.
* return TRUE if read successfully, return FALSE when reach end of file.
*
* report error if reach end of file and errorOnEof is true.
*/
static bool
ReadData(QueryContextInfo *cxt, char *buffer, int size,
bool errorOnEof)
{
Assert(NULL != cxt);
Assert(NULL != buffer);
Assert(size > 0);
if (!cxt->useFile)
{
Assert(cxt->buffer);
if (cxt->size == cxt->cursor && errorOnEof)
{
ereport(ERROR,
(errcode(ERRCODE_IO_ERROR), errmsg( "rebuild query context failed: "
"can not read data since unexpected eof")));
}
else if (cxt->size == cxt->cursor)
{
return FALSE;
}
else if (cxt->size - cxt->cursor < size)
{
ereport(ERROR,
(errcode(ERRCODE_IO_ERROR), errmsg( "rebuild query context failed: "
"can not read data since unexpected eof")));
}
memcpy(buffer, cxt->buffer + cxt->cursor, size);
cxt->cursor += size;
return TRUE;
}
else
{
Assert(TestFileValid(cxt->file));
return ReadFully(cxt->file, cxt->sharedPath, buffer, size, errorOnEof);
}
}
/*
* write data into QueryContextInfo
*/
static void
WriteData(QueryContextInfo *cxt, const char *buffer, int size)
{
Assert(NULL != cxt);
Assert(NULL != buffer);
Assert(size > 0);
if (cxt->useFile)
{
Assert(TestFileValid(cxt->file));
if (size != FileWrite(cxt->file, buffer, size))
{
ereport(ERROR,
(errcode(ERRCODE_IO_ERROR),
errmsg( "prepare query context failed: " "can not write data into file: %s", cxt->sharedPath),
errdetail("%s", HdfsGetLastError())));
}
return;
}
/* switch to file*/
if (false /*disable this feature*/ && cxt->cursor + size > QueryContextDispatchingSizeMemoryLimit *1024)
{
elog(LOG, "Switch to use file on shared storage : %s "
"instead of dispatching the query context "
"since memory usage exceed the gp_query_context_mem_limit", cxt->sharedPath);
cxt->useFile = TRUE;
char *p;
char *dir = palloc(strlen(cxt->sharedPath) + 1);
strcpy(dir, cxt->sharedPath);
p = strrchr(dir, '/');
if (NULL == p)
{
ereport(ERROR,
(errcode(ERRCODE_INTERNAL_ERROR), errmsg("failed to construct"
" query context directory, file path: %s", cxt->sharedPath)));
}
*p = 0;
MakeDirectory(dir, 0700);
cxt->file = PathNameOpenFile(cxt->sharedPath, O_CREAT | O_WRONLY, 0500);
if (cxt->file < 0)
{
ereport(ERROR,
(errcode_for_file_access(),
errmsg("cannot create file %s %m", cxt->sharedPath),
errdetail("%s", HdfsGetLastError())));
}
AddToDropList(cxt->sharedPath);
if (cxt->cursor > 0)
{
if (cxt->cursor != FileWrite(cxt->file, cxt->buffer, cxt->cursor))
{
ereport(ERROR,
(errcode(ERRCODE_IO_ERROR),
errmsg( "prepare query context failed: " "can not write data into file: %s", cxt->sharedPath),
errdetail("%s", HdfsGetLastError())));
}
}
if (cxt->buffer)
pfree(cxt->buffer);
cxt->buffer = NULL;
cxt->size = cxt->cursor = 0;
WriteData(cxt, buffer, size);
}
else
{
if (cxt->cursor + size > cxt->size)
{
cxt->size = (1 + cxt->size) * 2;
if (cxt->size < cxt->cursor + size)
cxt->size = cxt->cursor + size;
if (cxt->buffer)
cxt->buffer = repalloc(cxt->buffer, cxt->size);
else
cxt->buffer = palloc(cxt->size);
}
memcpy(cxt->buffer + cxt->cursor, buffer, size);
cxt->cursor += size;
}
}
/*
* get and set master's transaction on QE
*/
static void
RebuildMasterTransactionId(QueryContextInfo *cxt)
{
Assert(Gp_role == GP_ROLE_EXECUTE);
TransactionId masterXid;
char buffer[4];
ReadData(cxt, buffer, sizeof(buffer), TRUE);
masterXid = (TransactionId) ntohl(*(uint32 *)buffer);
SetMasterTransactionId(masterXid);
}
static void
RebuildFilesystemCredentials(QueryContextInfo *cxt, HTAB ** currentFilesystemCredentials,
MemoryContext *currentFilesystemCredentialsMemoryContext)
{
Assert(Gp_role == GP_ROLE_EXECUTE);
int len;
char buffer[4], *binary;
Insist(enable_secure_filesystem);
ReadData(cxt, buffer, sizeof(buffer), TRUE);
len = (int) ntohl(*(uint32 *)buffer);
binary = palloc(len);
ReadData(cxt, binary, len, TRUE);
deserialize_filesystem_credentials(binary, len, currentFilesystemCredentials,
currentFilesystemCredentialsMemoryContext);
pfree(binary);
}
/*
* build a new empty in-memory table with given relid.
*/
static void
RebuildEmptyTable(QueryContextInfo *cxt)
{
Oid relid;
char buffer[4];
Relation rel;
InMemHeapRelation inmemrel;
ReadData(cxt, buffer, sizeof(buffer), TRUE);
relid = (Oid) ntohl(*(uint32 *)buffer);
inmemrel = OidGetInMemHeapRelation(relid, INMEM_HEAP_MAPPING);
Assert(NULL == inmemrel);
rel = heap_open(relid, AccessShareLock);
inmemrel = InMemHeap_Create(relid, rel, TRUE, 10, AccessShareLock,
RelationGetRelationName(rel), FALSE, 0, INMEM_HEAP_MAPPING);
Assert(NULL != inmemrel);
}
/*
* get and set a tablespace location on QE.
*/
static void
RebuildTablespaceLocation(QueryContextInfo *cxt)
{
Assert(Gp_role == GP_ROLE_EXECUTE);
int32 len;
Oid tspoid;
char location[FilespaceLocationBlankPaddedWithNullTermLen];
char *buffer, *loc;
buffer = palloc(sizeof(int32));
ReadData(cxt, buffer, sizeof(int32), TRUE);
len = (TransactionId) ntohl(*(uint32 *)buffer);
buffer = repalloc(buffer, len);
ReadData(cxt, buffer, len, TRUE);
tspoid = (Oid) ntohl(*(uint32 *)buffer);
loc = buffer + sizeof(Oid);
snprintf(location, FilespaceLocationBlankPaddedWithNullTermLen,
buffer + sizeof(Oid), 0);
DispatchedFilespace_AddForTablespace(tspoid, location);
pfree(buffer);
}
static void
EnableInMemHeapWithIndexScan(Oid relid, bool *enable, int *attrno)
{
Assert(NULL != enable && NULL != attrno);
switch(relid)
{
case AttributeRelationId:
*enable = true;
*attrno = Anum_pg_attribute_attrelid;
break;
default:
*enable = false;
*attrno = 0;
}
}
/*
* get and insert a tuple into in-memory table.
*/
static void
RebuildTupleForRelation(QueryContextInfo *cxt)
{
Assert(Gp_role == GP_ROLE_EXECUTE);
Oid relid;
int32 len, contentid, tuplen;
char *buffer;
Relation rel;
InMemHeapRelation inmemrel;
HeapTuple tuple;
buffer = palloc(sizeof(int32));
ReadData(cxt, buffer, sizeof(int32), TRUE);
len = (int32) ntohl(*(uint32*)buffer);
ReadData(cxt, buffer, sizeof(int32), TRUE);
relid = (Oid) ntohl(*(uint32*)buffer);
ReadData(cxt, buffer, sizeof(int32), TRUE);
contentid = (int32) ntohl(*(uint32*)buffer);
tuplen = len - sizeof(Oid) - sizeof(int32);
Assert(tuplen > 0);
buffer = repalloc(buffer, tuplen);
ReadData(cxt, buffer, tuplen, TRUE);
if (contentid == MASTER_CONTENT_ID || contentid == GpIdentity.segindex)
{
tuple = (HeapTuple) buffer;
tuple->t_data = (HeapTupleHeader) ((char *) tuple
+ sizeof(HeapTupleData));
inmemrel = OidGetInMemHeapRelation(relid, INMEM_HEAP_MAPPING);
if (NULL == inmemrel)
{
bool indexOk;
int attrno;
rel = heap_open(relid, AccessShareLock);
EnableInMemHeapWithIndexScan(relid, &indexOk, &attrno);
inmemrel = InMemHeap_Create(relid, rel, TRUE, 10, AccessShareLock,
RelationGetRelationName(rel), indexOk, attrno, INMEM_HEAP_MAPPING);
Assert(NULL != inmemrel);
}
InMemHeap_Insert(inmemrel, tuple, contentid);
if (Debug_querycontext_print)
{
elog( LOG, "Query Context: rebuild tuple for relation %s, relid %u:",
inmemrel->relname, inmemrel->relid);
}
if (Debug_querycontext_print_tuple)
{
elog( LOG, "rebuild query context tuple for relation:"
" %s, total bytes: %d",inmemrel->relname,
(int)(len + sizeof(char) + sizeof(int32)));
}
pfree(buffer);
}
}
/*
* Deserialize the Namespace data
*/
static void
RebuildNamespace(QueryContextInfo *cxt)
{
MemoryContext oldContext;
int len;
char buffer[4], *binary;
oldContext = MemoryContextSwitchTo(MessageContext);
ReadData(cxt, buffer, sizeof(buffer), TRUE);
len = (int) ntohl(*(uint32 *) buffer);
binary = palloc(len);
if(ReadData(cxt, binary, len, TRUE))
{
dfs_address = pstrdup(binary);
} else {
pfree(binary);
MemoryContextSwitchTo(oldContext);
elog(ERROR, "Couldn't rebuild Namespace");
}
pfree(binary);
MemoryContextSwitchTo(oldContext);
}
/*
* rebuild execute context
*/
void
RebuildQueryContext(QueryContextInfo *cxt, HTAB **currentFilesystemCredentials,
MemoryContext *currentFilesystemCredentialsMemoryContext)
{
char type;
while (TRUE)
{
if (!ReadData(cxt, &type, sizeof(char), FALSE))
break;
switch (type)
{
case MasterXid:
RebuildMasterTransactionId(cxt);
break;
case TablespaceLocation:
RebuildTablespaceLocation(cxt);
break;
case TupleType:
RebuildTupleForRelation(cxt);
break;
case EmptyTable:
RebuildEmptyTable(cxt);
break;
case FileSystemCredential:
RebuildFilesystemCredentials(cxt, currentFilesystemCredentials,
currentFilesystemCredentialsMemoryContext);
break;
case Namespace:
RebuildNamespace(cxt);
break;
default:
ereport(ERROR,
(errcode(ERRCODE_GP_INTERNAL_ERROR), errmsg( "unrecognized "
"QueryContextDispatchingItemType %d", (int) type)));
}
}
cxt->finalized = false;
/*
* This is a bit overkill, but since we don't yet have a decent way to
* determine individual cache entries affected by the dispatched
* metadata, we should release all the system cache here. Our good-old
* shared invalidation message is not necessarily applicable here;
* backends in segments never change the metadata and we care the
* metadata changes only within the session.
*/
ResetSystemCaches();
}
/*
* collect QD's transaction id and other global variables
*/
void
AddAuxInfoToQueryContextInfo(QueryContextInfo *cxt)
{
Assert(NULL != cxt);
Assert(!cxt->finalized);
StringInfoData buffer;
initStringInfo(&buffer);
/*
* add master's transaction id. It will be used in external table
*/
pq_sendint(&buffer, (int) MasterXid, sizeof(char));
pq_sendint(&buffer, GetMasterTransactionId(), sizeof(TransactionId));
/*
* may add other here
*/
WriteData(cxt, buffer.data, buffer.len);
pfree(buffer.data);
}
/*
* in some special case such as pg_aoseg_xxx,
* it is a heap relation but act as a catalog.
* and it can be empty.
*
* if it is empty, send this flag to QE to build a empty in-memory table.
*/
void
AddEmptyTableFlagToContextInfo(QueryContextInfo *cxt, Oid relid)
{
StringInfoData buffer;
initStringInfo(&buffer);
pq_sendint(&buffer, EmptyTable, sizeof(char));
pq_sendint(&buffer, relid, sizeof(Oid));
WriteData(cxt, buffer.data, buffer.len);
pfree(buffer.data);
}
/*
* collect tuples for one catalog table
*/
void
AddTupleToContextInfo(QueryContextInfo *cxt, Oid relid,
const char *relname, HeapTuple tuple, int32 contentid)
{
Assert(NULL != cxt);
Assert(!cxt->finalized);
StringInfoData header;
initStringInfo(&header);
int32 tuplen = sizeof(HeapTupleData) + tuple->t_len;
/* send one byte flag */
pq_sendint(&header, TupleType, sizeof(char));
/* send content length, sizeof(relid) + sizeof(contentid) + tuple */
int32 len = sizeof(Oid) + sizeof(int32) + tuplen;
pq_sendint(&header, len, sizeof(int32));
/* send relid */
pq_sendint(&header, relid, sizeof(Oid));
pq_sendint(&header, contentid, sizeof(int32));
WriteData(cxt, header.data, header.len);
WriteData(cxt, (const char *) tuple, sizeof(HeapTupleData));
WriteData(cxt, (const char *) tuple->t_data, tuple->t_len);
pfree(header.data);
if (Debug_querycontext_print_tuple)
{
elog(LOG, "query context dispatching tuple for relation: %s, "
"total bytes: %d", relname, (int)(len + sizeof(char) + sizeof(int32)));
}
}
/*
* collect tuple and related toasted tuples for one catalog table
*/
void
AddTupleWithToastsToContextInfo(QueryContextInfo *cxt, Oid relid,
const char *relname, HeapTuple tuple, int32 contentid)
{
Relation rel, toastrel, toastidx;
ScanKeyData toastkey;
IndexScanDesc toastscan;
HeapTuple ttup;
TupleDesc tupdesc, toasttupDesc;
Form_pg_attribute *att;
Datum *d;
bool * null;
int32 nextidx, numchunks;
char toast_relname[NAMEDATALEN];
Assert(NULL != cxt);
// first add this tuple
AddTupleToContextInfo(cxt, relid, relname, tuple, contentid);
// add related toast tuples
rel = heap_open(relid, AccessShareLock);
Assert(NULL != rel);
tupdesc = rel->rd_att;
att = tupdesc->attrs;
d = (Datum *)palloc(sizeof(Datum)*tupdesc->natts);
null = (bool *)palloc(sizeof(bool)*tupdesc->natts);
heap_deform_tuple(tuple, tupdesc, d, null);
snprintf(toast_relname, sizeof(toast_relname), "pg_toast_%u", relid);
for (uint i = 0; i < tupdesc->natts; i++)
{
if (att[i]->attisdropped)
continue;
if (att[i]->attlen == -1)
{
if(!null[i]){
struct varlena *v=(struct varlena *) DatumGetPointer(d[i]);
if(VARATT_IS_EXTENDED(v) && VARATT_IS_EXTERNAL(v)){
struct varatt_external *va_ext = copy_varatt_external(v);
numchunks = ((va_ext->va_extsize - 1) / TOAST_MAX_CHUNK_SIZE) + 1;
/*
* Open the toast relation and its index
*/
toastrel = heap_open(va_ext->va_toastrelid, AccessShareLock);
toasttupDesc = toastrel->rd_att;
toastidx = index_open(toastrel->rd_rel->reltoastidxid, AccessShareLock);
/*
* Setup a scan key to fetch from the index by va_valueid
*/
ScanKeyInit(&toastkey,
(AttrNumber) 1,
BTEqualStrategyNumber, F_OIDEQ,
ObjectIdGetDatum(va_ext->va_valueid));
/*
* Read the chunks by index
*
* Note that because the index is actually on (valueid, chunkidx) we will
* see the chunks in chunkidx order, even though we didn't explicitly ask
* for it.
*/
nextidx = 0;
toastscan = index_beginscan(toastrel, toastidx,
SnapshotToast, 1, &toastkey);
while ((ttup = index_getnext(toastscan, ForwardScanDirection)) != NULL)
{
AddTupleToContextInfo(cxt, va_ext->va_toastrelid, toast_relname, ttup, contentid);
nextidx ++;
}
/*
* Final checks that we successfully fetched the datum
*/
if (nextidx != numchunks)
elog(ERROR, "missing chunk number %d for toast value %u",
nextidx, va_ext->va_valueid);
/*
* End scan and close relations
*/
index_endscan(toastscan);
index_close(toastidx, AccessShareLock);
heap_close(toastrel, AccessShareLock);
pfree(va_ext);
}
}
}
}
pfree(d);
pfree(null);
heap_close(rel, AccessShareLock);
}
/*
* collect tablespace location.
*/
void
AddTablespaceLocationToContextInfo(QueryContextInfo *cxt, Oid tspoid,
const char *fmt)
{
Assert(NULL != cxt);
Assert(!cxt->finalized);
StringInfoData header;
initStringInfo(&header);
int len = strlen(fmt);
pq_sendint(&header, TablespaceLocation, sizeof(char));
pq_sendint(&header, sizeof(Oid) + len + 1, sizeof(int32));
pq_sendint(&header, tspoid, sizeof(Oid));
WriteData(cxt, header.data, header.len);
WriteData(cxt, fmt, len + 1);
pfree(header.data);
}
static bool
alreadyAddedForDispatching(HTAB *rels, Oid objid, QueryContextDispatchingObjType type) {
bool found = false;
Assert(NULL != rels);
QueryContextDispatchingHashKey item;
item.objid = objid;
item.type = type;
hash_search(rels, &item, HASH_ENTER, &found);
return found;
}
/*
* collect pg_namespace tuples for oid.
* add them to in-memory heap table for dispatcher
*/
void
prepareDispatchedCatalogNamespace(QueryContextInfo *cxt, Oid namespace)
{
HeapTuple tuple;
if (namespace == PG_CATALOG_NAMESPACE || namespace == PG_TOAST_NAMESPACE
|| namespace == PG_BITMAPINDEX_NAMESPACE
|| namespace == PG_PUBLIC_NAMESPACE
|| namespace == PG_AOSEGMENT_NAMESPACE)
return;
if (alreadyAddedForDispatching(cxt->htab, namespace, NamespaceType))
return;
tuple = SearchSysCache(NAMESPACEOID, ObjectIdGetDatum(namespace), 0, 0, 0);
if (!HeapTupleIsValid(tuple))
elog(ERROR, "cache lookup failed for namespace %u", namespace);
AddTupleWithToastsToContextInfo(cxt, NamespaceRelationId, "pg_namespace", tuple,
MASTER_CONTENT_ID);
ReleaseSysCache(tuple);
}
static void
prepareDispatchedCatalogFileSystemCredential(const char *path)
{
if (!enable_secure_filesystem)
return;
Insist(Gp_role != GP_ROLE_EXECUTE);
add_filesystem_credential(path);
}
/*
* collect pg_tablespace tuples for oid.
* add them to in-memory heap table for dispatcher
*/
void
prepareDispatchedCatalogTablespace(QueryContextInfo *cxt, Oid tablespace)
{
char *path = NULL, *pos;
char location[FilespaceLocationBlankPaddedWithNullTermLen];
if (IsBuiltinTablespace(tablespace) || tablespace == InvalidOid )
return;
if (alreadyAddedForDispatching(cxt->htab, tablespace, TablespaceType))
return;
GetFilespacePathForTablespace(tablespace, &path);
Assert(NULL != path);
Assert(strlen(path) < FilespaceLocationBlankPaddedWithNullTermLen);
prepareDispatchedCatalogFileSystemCredential(path);
pos = path + strlen(path) - 1;
// Assert('0' == *pos);
*(pos+1) = '\0';
snprintf(location, FilespaceLocationBlankPaddedWithNullTermLen,
"%s", path);
pfree(path);
AddTablespaceLocationToContextInfo(cxt, tablespace, location);
}
/*
* collect all pg_tablespace tuples for oid.
* add them to in-memory heap table for dispatcher.
*/
static void
prepareAllDispatchedCatalogTablespace(QueryContextInfo *ctx)
{
Relation rel;
HeapScanDesc scandesc;
HeapTuple tuple;
/* Scan through all tablespaces */
rel = heap_open(TableSpaceRelationId, AccessShareLock);
scandesc = heap_beginscan(rel, SnapshotNow, 0, NULL);
tuple = heap_getnext(scandesc, ForwardScanDirection);
while (HeapTupleIsValid(tuple))
{
Oid tsOid;
tsOid = HeapTupleGetOid(tuple);
/*Don't include shared relations */
if (!IsBuiltinTablespace(tsOid))
{
prepareDispatchedCatalogTablespace(ctx, tsOid);
}
tuple = heap_getnext(scandesc, ForwardScanDirection);
}
heap_endscan(scandesc);
heap_close(rel, AccessShareLock);
return;
}
/*
* to check if a type is a composite type,
* dispatching type info if it is.
*/
static void
prepareDispatchedCatalogCompositeType(QueryContextInfo *cxt,
Oid typeid)
{
HeapTuple tuple;
Form_pg_type attr;
Assert(OidIsValid(typeid));
tuple = SearchSysCache(TYPEOID, ObjectIdGetDatum(typeid), 0, 0, 0);
if (!HeapTupleIsValid(tuple))
return;
attr = (Form_pg_type) GETSTRUCT(tuple);
if (attr->typtype == TYPTYPE_COMPOSITE)
prepareDispatchedCatalogSingleRelation(cxt, attr->typrelid, FALSE, 0);
ReleaseSysCache(tuple);
}
/*
* collect relation's attribute info for dispatching
*/
static void
prepareDispatchedCatalogAttribute(QueryContextInfo *cxt,
Oid reloid)
{
Relation rel;
HeapTuple attrtuple;
ScanKeyData skey;
SysScanDesc scandesc;
Form_pg_attribute attr;
rel = heap_open(AttributeRelationId, AccessShareLock);
Assert(NULL != rel);
ScanKeyInit(&skey, Anum_pg_attribute_attrelid, BTEqualStrategyNumber,
F_OIDEQ, ObjectIdGetDatum(reloid));
scandesc = systable_beginscan(rel, AttributeAttrelidIndexId, TRUE, SnapshotNow, 1,
&skey);
attrtuple = systable_getnext(scandesc);
while (HeapTupleIsValid(attrtuple))
{
attr = (Form_pg_attribute) GETSTRUCT(attrtuple);
if (attr->attnum > 0)
{
/* check attribute type */
if (OidIsValid(attr->atttypid))
prepareDispatchedCatalogCompositeType(cxt, attr->atttypid);
}
AddTupleToContextInfo(cxt, AttributeRelationId, "pg_attribute",
attrtuple, MASTER_CONTENT_ID);
attrtuple = systable_getnext(scandesc);
}
systable_endscan(scandesc);
heap_close(rel, AccessShareLock);
}
/*
* collect relation's defalut values for dispatching
*/
static void
prepareDispatchedCatalogAttributeDefault(QueryContextInfo *cxt,
Oid reloid)
{
Relation rel;
Datum value;
HeapTuple attrdeftuple;
ScanKeyData skey;
SysScanDesc scandesc;
rel = heap_open(AttrDefaultRelationId, AccessShareLock);
ScanKeyInit(&skey, Anum_pg_attrdef_adrelid, BTEqualStrategyNumber, F_OIDEQ,
ObjectIdGetDatum(reloid));
scandesc = systable_beginscan(rel, InvalidOid, FALSE, SnapshotNow, 1,
&skey);
attrdeftuple = systable_getnext(scandesc);
while (HeapTupleIsValid(attrdeftuple))
{
/*
* check for sequence
*/
Expr *expr = NULL;
bool isNull;
value = fastgetattr(attrdeftuple, Anum_pg_attrdef_adbin,
RelationGetDescr(rel), &isNull);
Assert(FALSE == isNull);
expr = stringToNode(TextDatumGetCString(value));
if (expr)
{
prepareDispatchedCatalogFunctionExpr(cxt, expr);
pfree(expr);
}
AddTupleWithToastsToContextInfo(cxt, AttrDefaultRelationId, "pg_attrdef",
attrdeftuple, MASTER_CONTENT_ID);
attrdeftuple = systable_getnext(scandesc);
}
systable_endscan(scandesc);
heap_close(rel, AccessShareLock);
}
/*
* collect relation's attribute encoding for dispatching
*/
static void
prepareDispatchedCatalogAttributeEncoding(QueryContextInfo *cxt,
Oid reloid)
{
Relation rel;
HeapTuple tuple;
ScanKeyData skey;
SysScanDesc scandesc;
Form_pg_attribute_encoding attr;
rel = heap_open(AttributeEncodingRelationId, AccessShareLock);
Assert(NULL != rel);
ScanKeyInit(&skey, Anum_pg_attribute_encoding_attrelid, BTEqualStrategyNumber,
F_OIDEQ, ObjectIdGetDatum(reloid));
scandesc = systable_beginscan(rel, AttributeEncodingAttrelidIndexId, TRUE, SnapshotNow, 1,
&skey);
tuple = systable_getnext(scandesc);
while (HeapTupleIsValid(tuple))
{
attr = (Form_pg_attribute_encoding) GETSTRUCT(tuple);
AddTupleWithToastsToContextInfo(cxt, AttributeEncodingRelationId, "pg_attribute_encoding",
tuple, MASTER_CONTENT_ID);
tuple = systable_getnext(scandesc);
}
systable_endscan(scandesc);
heap_close(rel, AccessShareLock);
}
/*
* collect relation's type info for dispatching
*/
static void
prepareDispatchedCatalogTypeByRelation(QueryContextInfo *cxt,
Oid relid, HeapTuple classtuple)
{
Datum typeid;
HeapTuple typetuple;
typeid = SysCacheGetAttr(RELOID, classtuple, Anum_pg_class_reltype, NULL );
if (InvalidOid == DatumGetObjectId(typeid))
elog(ERROR, "reltype in pg_class for %u is invalid", relid);
typetuple = SearchSysCache(TYPEOID, typeid, 0, 0, 0);
AddTupleWithToastsToContextInfo(cxt, TypeRelationId, "pg_type", typetuple,
MASTER_CONTENT_ID);
/*
* TODO:
* 1) to check typelem for array type
* 2) to check referenced tuple in pg_proc
* 3) to check text type attributes
*/
ReleaseSysCache(typetuple);
}
/*
* collect pg_constraint for metadata dispatching
*/
static void
prepareDispatchedCatalogConstraint(QueryContextInfo *cxt,
Oid relid)
{
Relation rel;
ScanKeyData skey;
SysScanDesc scandesc;
HeapTuple constuple;
rel = heap_open(ConstraintRelationId, AccessShareLock);
ScanKeyInit(&skey, Anum_pg_constraint_conrelid, BTEqualStrategyNumber,
F_OIDEQ, ObjectIdGetDatum(relid));
scandesc = systable_beginscan(rel, InvalidOid, FALSE, SnapshotNow, 1,
&skey);
constuple = systable_getnext(scandesc);
while (constuple)
{
AddTupleWithToastsToContextInfo(cxt, ConstraintRelationId, "pg_constraint",
constuple, MASTER_CONTENT_ID);
constuple = systable_getnext(scandesc);
}
systable_endscan(scandesc);
heap_close(rel, AccessShareLock);
}
/*
* parse fast_sequence for dispatch
*/
static void
prepareDispatchedCatalogFastSequence(QueryContextInfo *cxt, Oid relid,
List *segnos)
{
}
/*
* collect pg_class/pg_type/pg_attribute tuples for oid.
* add them to in-memory heap table for dispatcher.
*/
void
prepareDispatchedCatalogSingleRelation(QueryContextInfo *cxt, Oid relid,
bool forInsert, List *segnos)
{
HeapTuple classtuple;
Datum namespace, tablespace, toastrelid, relkind, relstorage, relname;
Assert(relid != InvalidOid);
/*
* buildin object, dispatch nothing
*/
if (relid < FirstNormalObjectId)
return;
if (alreadyAddedForDispatching(cxt->htab, relid, RelationType))
{
if (forInsert)
{
/*
* We will be here whenever the same relation is used in
* an INSERT clause and a FROM clause (GPSQL-872).
*/
prepareDispatchedCatalogFastSequence(cxt, relid, segnos);
}
return;
}
/* find relid in pg_class */
classtuple = SearchSysCache(RELOID, ObjectIdGetDatum(relid), 0, 0, 0);
if (!HeapTupleIsValid(classtuple))
elog(ERROR, "cache lookup failed for relation %u", relid);
if (Debug_querycontext_print)
{
Form_pg_class tup = (Form_pg_class) GETSTRUCT(classtuple);
elog(LOG, "Query Context: prepare object: relid = %u, relname = %s, "
"relnamespace = %u, relfilenode = %u, relkind = %c",
relid, tup->relname.data, tup->relnamespace,
tup->relfilenode, tup->relkind);
}
AddTupleWithToastsToContextInfo(cxt, RelationRelationId, "pg_class", classtuple,
MASTER_CONTENT_ID);
/* collect pg_namespace info */
namespace = SysCacheGetAttr(RELOID, classtuple, Anum_pg_class_relnamespace,
NULL );
if (InvalidOid == DatumGetObjectId(namespace))
elog(ERROR, "relnamespace field in pg_class of %u is invalid", relid);
prepareDispatchedCatalogNamespace(cxt, DatumGetObjectId(namespace));
/* collect pg_tablespace info */
tablespace = SysCacheGetAttr(RELOID, classtuple,
Anum_pg_class_reltablespace, NULL );
/* collect toast info */
toastrelid = SysCacheGetAttr(RELOID, classtuple,
Anum_pg_class_reltoastrelid, NULL );
if (InvalidOid != DatumGetObjectId(toastrelid))
{
/*TODO*/
Insist(!"cannot handle toast table right now!");
}
/* collect pg_type info */
prepareDispatchedCatalogTypeByRelation(cxt, relid, classtuple);
/* collect pg_attribute info */
prepareDispatchedCatalogAttribute(cxt, relid);
/* collect pg_attrdef info */
/* Only INSERT statement will use column default value*/
if(forInsert){
prepareDispatchedCatalogAttributeDefault(cxt, relid);
}
/* collect pg_attribute_encoding info */
prepareDispatchedCatalogAttributeEncoding(cxt, relid);
/* collect pg_constraint info */
prepareDispatchedCatalogConstraint(cxt, relid);
relkind = SysCacheGetAttr(RELOID, classtuple, Anum_pg_class_relkind, NULL);
relstorage = SysCacheGetAttr(RELOID, classtuple, Anum_pg_class_relstorage,
NULL);
relname = SysCacheGetAttr(RELOID, classtuple, Anum_pg_class_relname,
NULL);
/* The dfs tablespace oid must be dispatched. */
if (DatumGetObjectId(tablespace) == InvalidOid && relstorage_is_ao(relstorage))
prepareDispatchedCatalogTablespace(cxt, get_database_dts(MyDatabaseId));
else
prepareDispatchedCatalogTablespace(cxt, DatumGetObjectId(tablespace));
switch (DatumGetChar(relstorage))
{
case RELSTORAGE_AOROWS:
case RELSTORAGE_PARQUET:
prepareDispatchedCatalogForAoParquet(cxt, relid, forInsert, segnos);
break;
case RELSTORAGE_EXTERNAL:
prepareDispatchedCatalogExternalTable(cxt, relid);
break;
case RELSTORAGE_VIRTUAL:
break;
case RELSTORAGE_HEAP:
/* support catalog tables (gp_dist_random et al.) */
break;
case RELSTORAGE_FOREIGN:
/* TODO */
elog(ERROR, "not implemented relstorage: %c, relname: %s",
DatumGetChar(relstorage), DatumGetCString(relname));
break;
default:
Insist(!"never get here");
break;
}
ReleaseSysCache(classtuple);
}
static Oid
getActiveSegrelid(Oid relid)
{
Relation pg_appendonly_rel;
TupleDesc pg_appendonly_dsc;
HeapTuple tuple;
ScanKeyData key;
SysScanDesc aoscan;
Oid retval;
Datum temp;
Snapshot saveSnapshot = ActiveSnapshot;
ActiveSnapshot = CopySnapshot(ActiveSnapshot);
ActiveSnapshot->curcid = GetCurrentCommandId();
pg_appendonly_rel = heap_open(AppendOnlyRelationId, AccessShareLock);
pg_appendonly_dsc = RelationGetDescr(pg_appendonly_rel);
ScanKeyInit(&key, Anum_pg_appendonly_relid, BTEqualStrategyNumber, F_OIDEQ,
ObjectIdGetDatum(relid));
aoscan = systable_beginscan(pg_appendonly_rel, AppendOnlyRelidIndexId, TRUE,
ActiveSnapshot, 1, &key);
tuple = systable_getnext(aoscan);
if (!HeapTupleIsValid(tuple))
ereport(ERROR,
(errcode(ERRCODE_UNDEFINED_OBJECT), errmsg("missing pg_appendonly"
" entry for relation \"%s\"", get_rel_name(relid))));
temp = heap_getattr(tuple, Anum_pg_appendonly_segrelid, pg_appendonly_dsc,
NULL );
retval = DatumGetObjectId(temp);
/* Finish up scan and close pg_appendonly catalog. */
systable_endscan(aoscan);
heap_close(pg_appendonly_rel, AccessShareLock);
/* Restore the old snapshot */
ActiveSnapshot = saveSnapshot;
return retval;
}
/*
* parse pg_appendonly for dispatch
*/
static void
prepareDispatchedCatalogGpAppendOnly(QueryContextInfo *cxt,
Oid relid, Oid *segrelid, Oid *segidxid)
{
Relation pg_appendonly_rel;
TupleDesc pg_appendonly_dsc;
HeapTuple tuple;
ScanKeyData key;
SysScanDesc aoscan;
Datum temp;
/*
* used to check if currently transaction can be serialized
*/
Oid activeSegrelid = getActiveSegrelid(relid);
/*
* Check the pg_appendonly relation to be certain the ao table
* is there.
*/
pg_appendonly_rel = heap_open(AppendOnlyRelationId, AccessShareLock);
pg_appendonly_dsc = RelationGetDescr(pg_appendonly_rel);
ScanKeyInit(&key, Anum_pg_appendonly_relid, BTEqualStrategyNumber, F_OIDEQ,
ObjectIdGetDatum(relid));
aoscan = systable_beginscan(pg_appendonly_rel, AppendOnlyRelidIndexId, TRUE,
SnapshotNow, 1, &key);
tuple = systable_getnext(aoscan);
if (!HeapTupleIsValid(tuple))
ereport(ERROR,
(errcode(ERRCODE_UNDEFINED_OBJECT), errmsg("missing pg_appendonly"
" entry for relation \"%s\"", get_rel_name(relid))));
AddTupleWithToastsToContextInfo(cxt, AppendOnlyRelationId, "pg_appendonly", tuple,
MASTER_CONTENT_ID);
temp = heap_getattr(tuple, Anum_pg_appendonly_segrelid, pg_appendonly_dsc,
NULL );
*segrelid = DatumGetObjectId(temp);
if (*segrelid != activeSegrelid)
{
/*
* the pg_aoseg table for this relation is changed in other concurrent transaction,
* it usually happened with a concurrent alter statement.
* abort this transaction
*/
elog(ERROR, "could not serialize access due to read/write/alter dependencies among transactions");
}
temp = heap_getattr(tuple, Anum_pg_appendonly_segidxid, pg_appendonly_dsc,
NULL );
*segidxid = DatumGetObjectId(temp);
if (Debug_querycontext_print)
{
elog(LOG, "Query Context: prepare pg_appendonly for rel = %u, "
"segrelid = %u", relid, *segrelid);
}
/* Finish up scan and close pg_appendonly catalog. */
systable_endscan(aoscan);
heap_close(pg_appendonly_rel, AccessShareLock);
}
/*
* parse AO relation range table and collect metadata used for QE
* add them to in-memory heap table for dispatcher.
*/
static void
prepareDispatchedCatalogForAoParquet(QueryContextInfo *cxt, Oid relid,
bool forInsert, List *segnos)
{
Relation rel;
Oid segrelid, segidxid;
QueryContextDispatchingHashKey hkey;
QueryContextDispatchingHashEntry *hentry = NULL;
bool found;
rel = heap_open(relid, AccessShareLock);
Assert((RelationIsAoRows(rel) || RelationIsParquet(rel)));
/*
* add tuple in pg_appendonly
*/
prepareDispatchedCatalogGpAppendOnly(cxt, RelationGetRelid(rel), &segrelid,
&segidxid);
Assert(segrelid != InvalidOid);
hkey.objid = relid;
hkey.type = RelationType;
hentry = hash_search(cxt->htab, &hkey, HASH_FIND, &found);
Assert(found);
Assert(hentry);
/*
* Save pg_aoseg_* / pg_aocsseg_* relid in HTAB. We may need it
* later, e.g. to add gp_fastsequence tuple to dispatch context.
* HTAB lookups are much faster than scanning pg_appendonly
* catalog table.
*/
hentry->aoseg_relid = segrelid;
/*
* add pg_aoseg_XXX/pg_paqseg_XXX's metadata
*/
prepareDispatchedCatalogSingleRelation(cxt, segrelid, FALSE, 0);
/*
* Although we have segrelid available here, we do not pass it to
* prepareDispatchedCatalogFastSequence(). We record it in hash
* table instead and look it up from the hash table inside the
* function. This is redundant but necessary to be able to call
* this function from elsewhere. This change is made as part of
* fix for GPSQL-872.
*/
if (forInsert)
prepareDispatchedCatalogFastSequence(cxt, relid, segnos);
heap_close(rel, AccessShareLock);
}
/*
* parse AO/CO relation range table and collect metadata used for QE
* add them to in-memory heap table for dispatcher.
*/
static void
prepareDispatchedCatalogExternalTable(QueryContextInfo *cxt,
Oid relid)
{
Relation pg_exttable_rel;
TupleDesc pg_exttable_dsc;
HeapTuple tuple;
ScanKeyData key;
SysScanDesc extscan;
Datum *array;
int array_size;
char *location = NULL;
Relation rel;
rel = heap_open(relid, AccessShareLock);
pg_exttable_rel = heap_open(ExtTableRelationId, RowExclusiveLock);
pg_exttable_dsc = RelationGetDescr(pg_exttable_rel);
ScanKeyInit(&key, Anum_pg_exttable_reloid, BTEqualStrategyNumber, F_OIDEQ,
ObjectIdGetDatum(rel->rd_id));
extscan = systable_beginscan(pg_exttable_rel, ExtTableReloidIndexId, true,
SnapshotNow, 1, &key);
tuple = systable_getnext(extscan);
if (!HeapTupleIsValid(tuple))
ereport(ERROR,
(errcode(ERRCODE_UNDEFINED_OBJECT), errmsg("missing "
"pg_exttable entry for relation \"%s\"", RelationGetRelationName(rel))));
GetExtTableLocationsArray(tuple, pg_exttable_dsc,
&array, &array_size);
location = GetExtTableFirstLocation(array);
Insist(location != NULL);
if (IS_PXF_URI(location))
{
Insist(array_size == 1);
AddFileSystemCredentialForPxfTable();
prepareDfsAddressForDispatch(cxt);
}
AddTupleWithToastsToContextInfo(cxt, ExtTableRelationId, "pg_exttable", tuple,
MASTER_CONTENT_ID);
systable_endscan(extscan);
heap_close(pg_exttable_rel, RowExclusiveLock);
heap_close(rel, AccessShareLock);
}
static bool collect_func_walker(Node *node, QueryContextInfo *context)
{
if (node == NULL)
return false;
if(IsA(node, SubPlan))
{
SubPlan *subplan = (SubPlan *) node;
if (subplan->testexpr)
{
return expression_tree_walker(subplan->testexpr, collect_func_walker, context);
}
if (subplan->args)
{
expression_tree_walker((Node*) subplan->args, collect_func_walker, context);
}
return false;
}
if (IsA(node, Const))
{
/*
*When constants are part of the expression we should dispacth
*the type of the constant.
*/
Const *var = (Const *) node;
prepareDispatchedCatalogType(context, var->consttype);
}
if (IsA(node, Var))
{
/*
* This handler is for user defined types.
* prepareDispatchedCatalogType() checks if the oid of this type
* is greater than FirstNormalObjectId so we do not need to test
* that here.
*/
Var *var = (Var *) node;
prepareDispatchedCatalogType(context, var->vartype);
}
if (IsA(node, RowExpr))
{
/*
* RowExpr is ROW(x,y, ...). It may appear as argument to a UDF
* in place of a composite type value. E.g.
*
* SELECT foo(ROW(x,y,z), ...) FROM ...
*
* Assumption: By the time we reach here, ROW() is mapped to the
* correct composite type given by RowExpr->row_typeid.
*/
RowExpr *rowexpr = (RowExpr *) node;
prepareDispatchedCatalogType(context, rowexpr->row_typeid);
/* TODO: Should we return false from here? */
}
if (IsA(node, FuncExpr))
{
FuncExpr *func = (FuncExpr *) node;
AclMode needAcl = ACL_NO_RIGHTS;
switch (func->funcid)
{
case SETVAL_FUNC_OID:
needAcl = ACL_UPDATE;
case NEXTVAL_FUNC_OID:
needAcl |= ACL_USAGE;
{
Const *arg;
Oid seqoid;
arg = (Const *) linitial(func->args);
if (arg->xpr.type == T_Const && arg->consttype == REGCLASSOID)
{
seqoid = DatumGetObjectId(arg->constvalue);
/*
* aclchecks for nextval are done on the segments. But in HAWQ
* we are executing as bootstrap user on the segments. So any
* aclchecks on segments defeats the purpose. Do the aclchecks
* on the master, prior to dispatch
*/
if (pg_class_aclcheck(seqoid, GetUserId(), needAcl) != ACLCHECK_OK)
ereport(ERROR,
(errcode(ERRCODE_INSUFFICIENT_PRIVILEGE),
errmsg("permission denied for sequence %s",
get_rel_name(seqoid))));
prepareDispatchedCatalogSingleRelation(context,
seqoid, FALSE, 0);
}
else
{
ereport(ERROR, (errcode(ERRCODE_CDB_FEATURE_NOT_YET),
errmsg("Non const argument in function "
"\"NEXTVAL\" is not support yet.")));
}
}
break;
default:
/* Check if this is a UDF*/
if(func->funcid > FirstNormalObjectId)
{
/* do aclcheck on master, because HAWQ segments lacks user knowledge */
AclResult aclresult;
aclresult = pg_proc_aclcheck(func->funcid, GetUserId(), ACL_EXECUTE);
if (aclresult != ACLCHECK_OK)
aclcheck_error(aclresult, ACL_KIND_PROC, get_func_name(func->funcid));
/* build the dispacth for the function itself */
prepareDispatchedCatalogFunction(context, func->funcid);
}
/* Treat pg_database_size specially */
else if (PgDatabaseSizeOidProcId == func->funcid || PgDatabaseSizeNameProcId == func->funcid)
{
/* dispatch all the table space information for calculating database size */
prepareAllDispatchedCatalogTablespace(context);
}
break;
}
/*
* Continue recursion in expression_tree_walker as arguments subtree may
* have FuncExpr's. E.g. f(g() + h())
*/
return expression_tree_walker((Node *)func->args,
collect_func_walker, context);
}
if (IsA(node, Aggref))
{
Aggref *aggnode = (Aggref *) node;
if (aggnode->aggorder != NULL)
{
ListCell *lc;
foreach (lc, aggnode->aggorder->sortClause)
{
SortClause *sc = (SortClause *) lfirst(lc);
prepareDispatchedCatalogOperator(context, sc->sortop);
}
}
/*
* An aggregate has two catalog entries, one in pg_aggregate
* and the other in pg_proc. Both need to be dispatched to
* segments.
*/
prepareDispatchedCatalogAggregate(context, aggnode->aggfnoid);
prepareDispatchedCatalogFunction(context, aggnode->aggfnoid);
return expression_tree_walker((Node *)aggnode->args,
collect_func_walker, context);
}
if (IsA(node, WindowRef))
{
/*
* Window functions are nothing but aggregate functions
* invoked with "OVER(...)" clause. Therefore, treat them as
* aggregates.
*/
WindowRef * wnode = (WindowRef *) node;
prepareDispatchedCatalogAggregate(context, wnode->winfnoid);
prepareDispatchedCatalogFunction(context, wnode->winfnoid);
/*
* Should return type of the aggregate (window) function be
* included explicity?
*/
return expression_tree_walker((Node *)wnode->args,
collect_func_walker, context);
}
if (IsA(node, OpExpr))
{
OpExpr *op = (OpExpr *) node;
if (OidIsValid(op->opfuncid))
prepareDispatchedCatalogFunction(context, op->opfuncid);
if (OidIsValid(op->opno))
prepareDispatchedCatalogOperator(context, op->opno);
}
else if (IsA(node, RelabelType))
{
RelabelType *relable = (RelabelType *) node;
prepareDispatchedCatalogType(context, relable->resulttype);
}
else if (IsA(node, ConvertRowtypeExpr))
{
ConvertRowtypeExpr *expr = (ConvertRowtypeExpr *) node;
prepareDispatchedCatalogType(context, expr->resulttype);
}
else if (IsA(node, Sort))
{
Sort *sort = (Sort *) node;
int i;
for (i = 0; i < sort->numCols; i++)
{
prepareDispatchedCatalogOperator(context, sort->sortOperators[i]);
}
}
else if (IsA(node, SortClause))
{
SortClause *sc = (SortClause *) node;
prepareDispatchedCatalogOperator(context, sc->sortop);
}
else if (IsA(node, GroupClause))
{
GroupClause *gc = (GroupClause *) node;
prepareDispatchedCatalogOperator(context, gc->sortop);
}
return plan_tree_walker(node, collect_func_walker, context);
}
/*
Add the pg_authid tuple for the current user to the QueryContext
*/
static void
prepareDispatchedCatalogAuthUser(QueryContextInfo *cxt, Oid userOid)
{
HeapTuple authtuple;
Assert(userOid != InvalidOid);
/*
* builtin object, dispatch nothing
*/
if (userOid < FirstNormalObjectId)
return;
if (alreadyAddedForDispatching(cxt->htab, userOid, ProcType))
return;
/* find user oid in pg_authid */
cqContext *pcqCtx;
pcqCtx = caql_beginscan(NULL,
cql("SELECT * FROM pg_authid "
"WHERE oid = :1 ",
ObjectIdGetDatum(userOid)));
authtuple = caql_getnext(pcqCtx);
if (!HeapTupleIsValid(authtuple))
elog(ERROR, "pg_authid lookup failed for user %u", userOid);
AddTupleToContextInfo(cxt, AuthIdRelationId, "pg_authid", authtuple, MASTER_CONTENT_ID);
caql_endscan(pcqCtx);
}
/*
* recursively scan the expression chain and collect function information
*/
static void
prepareDispatchedCatalogFunctionExpr(QueryContextInfo *cxt, Expr *expr)
{
Assert(NULL != cxt && NULL != expr);
collect_func_walker((Node *)expr, cxt);
}
static void
prepareDispatchedCatalogFunction(QueryContextInfo *cxt, Oid procOid)
{
HeapTuple proctuple;
Datum datum;
Oid oidval;
bool isNull = false;
Assert(procOid != InvalidOid);
/*
* buildin object, dispatch nothing
*/
if (procOid < FirstNormalObjectId)
return;
if (alreadyAddedForDispatching(cxt->htab, procOid, ProcType))
return;
/* find relid in pg_class */
cqContext *pcqCtx;
pcqCtx = caql_beginscan(NULL,
cql("SELECT * FROM pg_proc "
" WHERE oid = :1 ",
ObjectIdGetDatum(procOid)));
proctuple = caql_getnext(pcqCtx);
if (!HeapTupleIsValid(proctuple))
elog(ERROR, "cache lookup failed for proc %u", procOid);
datum = caql_getattr(pcqCtx, Anum_pg_proc_prolang, &isNull);
if (!isNull && datum)
{
oidval = DatumGetObjectId(datum);
prepareDispatchedCatalogLanguage(cxt, oidval);
}
datum = caql_getattr(pcqCtx, Anum_pg_proc_pronamespace, &isNull);
if (!isNull && datum)
{
oidval = DatumGetObjectId(datum);
prepareDispatchedCatalogNamespace(cxt, oidval);
}
datum = caql_getattr(pcqCtx, Anum_pg_proc_prorettype, &isNull);
if (!isNull && datum)
{
oidval = DatumGetObjectId(datum);
prepareDispatchedCatalogType(cxt, oidval);
}
AddTupleWithToastsToContextInfo(cxt, ProcedureRelationId, "pg_proc", proctuple, MASTER_CONTENT_ID);
caql_endscan(pcqCtx);
}
static void
prepareDispatchedCatalogLanguage(QueryContextInfo *ctx, Oid langOid)
{
HeapTuple langtuple;
Assert(langOid != InvalidOid);
/*
* buildin object, dispatch nothing
*/
if (langOid < FirstNormalObjectId)
return;
if (alreadyAddedForDispatching(ctx->htab, langOid, LangType))
return;
/* look up the tuple in pg_language */
cqContext *pcqCtx;
pcqCtx = caql_beginscan(NULL,
cql("SELECT * FROM pg_language "
" WHERE oid = :1 ",
ObjectIdGetDatum(langOid)));
langtuple = caql_getnext(pcqCtx);
if (!HeapTupleIsValid(langtuple))
elog(ERROR, "cache lookup failed for lang %u", langOid);
AddTupleWithToastsToContextInfo(ctx, LanguageRelationId, "pg_language", langtuple, MASTER_CONTENT_ID);
/* dispatch the language handler function*/
bool lang_handler_isNull = false;
Datum lang_handler_Datum = caql_getattr(pcqCtx, Anum_pg_language_lanplcallfoid, &lang_handler_isNull);
if (!lang_handler_isNull && lang_handler_Datum)
{
prepareDispatchedCatalogFunction(ctx, DatumGetObjectId(lang_handler_Datum));
}
caql_endscan(pcqCtx);
}
static void
prepareDispatchedCatalogType(QueryContextInfo *ctx, Oid typeOid)
{
HeapTuple typetuple;
bool isNull = false;
Datum typeDatum = 0;
Assert(typeOid != InvalidOid);
/*
* buildin object, dispatch nothing
*/
if (typeOid < FirstNormalObjectId)
return;
if (alreadyAddedForDispatching(ctx->htab, typeOid, TypeType))
return;
/* look up the tuple in pg_type */
cqContext *pcqCtx;
pcqCtx = caql_beginscan(NULL,
cql("SELECT * FROM pg_type "
" WHERE oid = :1 ",
ObjectIdGetDatum(typeOid)));
typetuple = caql_getnext(pcqCtx);
if (!HeapTupleIsValid(typetuple))
elog(ERROR, "cache lookup failed for type %u", typeOid);
/*
* In case of arrays of user defined types we should also
* dispatch the underlying type defined by typelem attribute
* if it is also a user defined type.
*/
typeDatum = caql_getattr(pcqCtx, Anum_pg_type_typelem, &isNull);
if (!isNull && OidIsValid(DatumGetObjectId(typeDatum)))
{
prepareDispatchedCatalogType(ctx, DatumGetObjectId(typeDatum));
}
/*
* We need to dispatch all the accompanying functions for user defined types.
*/
typeDatum = caql_getattr(pcqCtx, Anum_pg_type_typinput, &isNull);
if (!isNull && OidIsValid(DatumGetObjectId(typeDatum)))
{
prepareDispatchedCatalogFunction(ctx, DatumGetObjectId(typeDatum));
}
typeDatum = caql_getattr(pcqCtx, Anum_pg_type_typoutput, &isNull);
if (!isNull && OidIsValid(DatumGetObjectId(typeDatum)))
{
prepareDispatchedCatalogFunction(ctx, DatumGetObjectId(typeDatum));
}
typeDatum = caql_getattr(pcqCtx, Anum_pg_type_typreceive, &isNull);
if (!isNull && OidIsValid(DatumGetObjectId(typeDatum)))
{
prepareDispatchedCatalogFunction(ctx, DatumGetObjectId(typeDatum));
}
typeDatum = caql_getattr(pcqCtx, Anum_pg_type_typsend, &isNull);
if (!isNull && OidIsValid(DatumGetObjectId(typeDatum)))
{
prepareDispatchedCatalogFunction(ctx, DatumGetObjectId(typeDatum));
}
typeDatum = caql_getattr(pcqCtx, Anum_pg_type_typreceive, &isNull);
if (!isNull && typeDatum)
{
prepareDispatchedCatalogFunction(ctx, DatumGetObjectId(typeDatum));
}
typeDatum = caql_getattr(pcqCtx, Anum_pg_type_typsend, &isNull);
if (!isNull && typeDatum)
{
prepareDispatchedCatalogFunction(ctx, DatumGetObjectId(typeDatum));
}
AddTupleWithToastsToContextInfo(ctx, TypeRelationId, "pg_type", typetuple,
MASTER_CONTENT_ID);
/*
* ROW(f1, f2, ...) expression may not have corresponding pg_type
* tuple and related metadata included in the context. If it is
* already included, the following is a no-op.
*/
typeDatum = caql_getattr(pcqCtx, Anum_pg_type_typrelid, &isNull);
if (!isNull && OidIsValid(DatumGetObjectId(typeDatum)))
{
prepareDispatchedCatalogSingleRelation(ctx, DatumGetObjectId(typeDatum),
FALSE, 0);
}
caql_endscan(pcqCtx);
/*
* For user defined types we also dispatch associated operator classes.
* This is to safe guard against certain cases like Sort Operator and
* Hash Aggregates where the plan tree does not explicitly contain all the
* operators that might be needed by the executor on the segments.
*/
prepareDispatchedCatalogOpClassForType(ctx, typeOid);
}
/* For user defined type dispatch the operator class. */
static void
prepareDispatchedCatalogOpClassForType(QueryContextInfo *ctx, Oid typeOid)
{
cqContext *pcqCtx;
HeapTuple tuple;
Datum value;
if (!OidIsValid(typeOid))
return;
/*
* builtin object, dispatch nothing.
*/
if (typeOid < FirstNormalObjectId)
return;
pcqCtx = caql_beginscan(NULL,
cql("SELECT oid FROM pg_opclass"
" WHERE opcintype = :1",
ObjectIdGetDatum(typeOid)));
while (HeapTupleIsValid(tuple = caql_getnext(pcqCtx)))
{
value = HeapTupleGetOid(tuple);
if (OidIsValid(DatumGetObjectId(value)))
prepareDispatchedCatalogOpClass(ctx, DatumGetObjectId(value));
}
caql_endscan(pcqCtx);
}
/*
* This function does not do the actual work. It checks if
* there is an operator class associated with this operator and
* if found delegates the work to prepareDispatchedOpClass else
* delegates work to addOperatorToDispatchContext.
*/
static void
prepareDispatchedCatalogOperator(QueryContextInfo *ctx, Oid opOid)
{
cqContext *pcqCtx;
bool isnull;
bool foundOperatorClass = false;
/*
* builtin object, dispatch nothing.
*/
if (opOid < FirstNormalObjectId)
return;
if (alreadyAddedForDispatching(ctx->htab, opOid, OpType))
return;
/*
* Check if we have an operator class associated with this operator.
* If yes, the prepareDispatchedOpClass will handle adding this operator
* to the context.
*/
pcqCtx = caql_beginscan(NULL,
cql("SELECT * FROM pg_amop"
" WHERE amopopr = :1",
ObjectIdGetDatum(opOid)));
while (HeapTupleIsValid(caql_getnext(pcqCtx)))
{
foundOperatorClass = true;
Oid opclass;
opclass = DatumGetObjectId(
caql_getattr(pcqCtx, Anum_pg_amop_amopclaid, &isnull));
prepareDispatchedCatalogOpClass(ctx, opclass);
}
caql_endscan(pcqCtx);
/*
* Otherwise, call addOperatorToDispatchContext to add this operator from
* here and bail out.
*/
if (!foundOperatorClass)
{
addOperatorToDispatchContext(ctx, opOid);
}
}
/* just add the pg_operator tuple and associated operator functions */
static void
addOperatorToDispatchContext(QueryContextInfo *ctx, Oid opOid)
{
cqContext *pcqCtx;
HeapTuple tuple;
Datum value;
bool isnull;
/*
* builtin object, dispatch nothing.
*/
if (opOid < FirstNormalObjectId)
return;
if (alreadyAddedForDispatching(ctx->htab, opOid, OpType))
return;
pcqCtx = caql_beginscan(NULL,
cql("SELECT * FROM pg_operator"
" WHERE oid = :1",
ObjectIdGetDatum(opOid)));
tuple = caql_getnext(pcqCtx);
if (!HeapTupleIsValid(tuple))
elog(ERROR, "cache lookup failed for operator %u", opOid);
AddTupleToContextInfo(ctx, OperatorRelationId, "pg_operator", tuple,
MASTER_CONTENT_ID);
value = caql_getattr(pcqCtx, Anum_pg_operator_oprcode, &isnull);
if (!isnull && OidIsValid(DatumGetObjectId(value)))
prepareDispatchedCatalogFunction(ctx, DatumGetObjectId(value));
/* sort operators */
value = caql_getattr(pcqCtx, Anum_pg_operator_oprlsortop, &isnull);
if (!isnull && OidIsValid(DatumGetObjectId(value)))
addOperatorToDispatchContext(ctx, DatumGetObjectId(value));
value = caql_getattr(pcqCtx, Anum_pg_operator_oprrsortop, &isnull);
if (!isnull && OidIsValid(DatumGetObjectId(value)))
addOperatorToDispatchContext(ctx, DatumGetObjectId(value));
/* merge join LT compare operator */
value = caql_getattr(pcqCtx, Anum_pg_operator_oprltcmpop, &isnull);
if (!isnull && OidIsValid(DatumGetObjectId(value)))
{
Oid ltproc;
ltproc = get_opcode(DatumGetObjectId(value));
if (OidIsValid(ltproc))
prepareDispatchedCatalogFunction(ctx, ltproc);
}
/* merge join GT compare operator */
value = caql_getattr(pcqCtx, Anum_pg_operator_oprgtcmpop, &isnull);
if (!isnull && OidIsValid(DatumGetObjectId(value)))
{
Oid gtproc;
gtproc = get_opcode(DatumGetObjectId(value));
if (OidIsValid(gtproc))
prepareDispatchedCatalogFunction(ctx, gtproc);
}
caql_endscan(pcqCtx);
}
/*
* For opclass, we dispatch pg_opclass, pg_amproc, and pg_amop tuples.
* Loop through all the underlying operators and dispatch them too.
*/
static void
prepareDispatchedCatalogOpClass(QueryContextInfo *ctx, Oid opclass)
{
cqContext *pcqCtx;
HeapTuple tuple;
Datum value;
bool isnull;
/*
* builtin object, dispatch nothing.
*/
if (!OidIsValid(opclass))
return;
if (opclass < FirstNormalObjectId)
return;
if (alreadyAddedForDispatching(ctx->htab, opclass, OpClassType))
return;
/* pg_opclass */
pcqCtx = caql_beginscan(NULL,
cql("SELECT * FROM pg_opclass"
" WHERE oid = :1",
ObjectIdGetDatum(opclass)));
tuple = caql_getnext(pcqCtx);
if (!HeapTupleIsValid(tuple))
elog(ERROR, "cache lookup failed for operator class %u", opclass);
AddTupleToContextInfo(ctx, OperatorClassRelationId, "pg_opclass",
tuple, MASTER_CONTENT_ID);
caql_endscan(pcqCtx);
/* pg_amproc */
pcqCtx = caql_beginscan(NULL,
cql("SELECT * FROM pg_amproc"
" WHERE amopclaid = :1",
ObjectIdGetDatum(opclass)));
while (HeapTupleIsValid(tuple = caql_getnext(pcqCtx)))
{
AddTupleToContextInfo(ctx, AccessMethodProcedureRelationId,
"pg_amproc", tuple, MASTER_CONTENT_ID);
value = caql_getattr(pcqCtx, Anum_pg_amproc_amproc, &isnull);
if (!isnull && OidIsValid(DatumGetObjectId(value)))
prepareDispatchedCatalogFunction(ctx, DatumGetObjectId(value));
}
caql_endscan(pcqCtx);
/* pg_amop and dispatch all underlying operators */
/*
* Ordinary operator expression does not need anything than underlying
* function, but in case of HJ, we need hash function information too.
* Since it's almost impossible to know if this operator is used for HJ
* or others, we just send everything that might be needed.
*/
pcqCtx = caql_beginscan(NULL,
cql("SELECT * FROM pg_amop"
" WHERE amopclaid = :1",
ObjectIdGetDatum(opclass)));
while (HeapTupleIsValid(tuple = caql_getnext(pcqCtx)))
{
AddTupleToContextInfo(ctx, AccessMethodOperatorRelationId, "pg_amop",
tuple, MASTER_CONTENT_ID);
value = caql_getattr(pcqCtx, Anum_pg_amop_amopopr, &isnull);
if (!isnull && OidIsValid(DatumGetObjectId(value)))
{
addOperatorToDispatchContext(ctx, DatumGetObjectId(value));
}
}
caql_endscan(pcqCtx);
}
static void
prepareDispatchedCatalogAggregate(QueryContextInfo *cxt, Oid aggfuncoid)
{
HeapTuple aggtuple;
bool isNull = false;
Datum procDatum = 0;
Assert(aggfuncoid != InvalidOid);
/*
* buildin object, dispatch nothing
*/
if (aggfuncoid < FirstNormalObjectId)
return;
if (alreadyAddedForDispatching(cxt->htab, aggfuncoid, AggType))
return;
/* find relid in pg_class */
cqContext *pcqCtx;
pcqCtx = caql_beginscan(NULL,
cql("SELECT * FROM pg_aggregate "
" WHERE aggfnoid = :1 ",
ObjectIdGetDatum(aggfuncoid)));
aggtuple = caql_getnext(pcqCtx);
if (!HeapTupleIsValid(aggtuple))
elog(ERROR, "cache lookup failed for aggregate %u", aggfuncoid);
/*
* Include all possible pg_proc tuples in query context this
* aggregate may refer to. Excluding agginvtransfn and
* agginvprelimfn because they are not put to any use anywhere in
* the codebase.
*/
procDatum = caql_getattr(pcqCtx, Anum_pg_aggregate_aggtransfn, &isNull);
if (!isNull && procDatum)
{
prepareDispatchedCatalogFunction(cxt, DatumGetObjectId(procDatum));
}
procDatum = caql_getattr(pcqCtx, Anum_pg_aggregate_aggprelimfn, &isNull);
if (!isNull && procDatum)
{
prepareDispatchedCatalogFunction(cxt, DatumGetObjectId(procDatum));
}
procDatum = caql_getattr(pcqCtx, Anum_pg_aggregate_aggfinalfn, &isNull);
if (!isNull && procDatum)
{
prepareDispatchedCatalogFunction(cxt, DatumGetObjectId(procDatum));
}
AddTupleWithToastsToContextInfo(cxt, AggregateRelationId, "pg_aggregate",
aggtuple, MASTER_CONTENT_ID);
caql_endscan(pcqCtx);
}
static List *
findSegnofromMap(Oid relid, List *segnoMaps)
{
ListCell *relid_to_segno;
List *mysegnos = NIL;
foreach(relid_to_segno, segnoMaps)
{
SegfileMapNode *n = (SegfileMapNode *) lfirst(relid_to_segno);
if (n->relid == relid)
{
Assert(n->segnos != NIL);
mysegnos = n->segnos;
break;
}
}
return mysegnos;
}
/*
* parse Relation and collect metadata used for QE add them to in-memory heap
* table for dispatcher.
*/
void
prepareDispatchedCatalogRelation(QueryContextInfo *cxt, Oid relid,
bool forInsert, List *segnoMaps)
{
List *children = NIL;
ListCell *child;
if (rel_is_partitioned(relid))
{
children = find_all_inheritors(relid);
foreach(child, children)
{
Oid myrelid = lfirst_oid(child);
if (forInsert)
{
List *mysegnos = findSegnofromMap(myrelid, segnoMaps);
Assert(mysegnos != NIL);
prepareDispatchedCatalogSingleRelation(cxt, myrelid, TRUE,
mysegnos);
}
else
{
prepareDispatchedCatalogSingleRelation(cxt, myrelid, FALSE, 0);
}
}
list_free(children);
}
else
{
if (forInsert && NULL != segnoMaps)
{
List *mysegnos = findSegnofromMap(relid, segnoMaps);
Assert(mysegnos != NIL);
prepareDispatchedCatalogSingleRelation(cxt, relid, TRUE, mysegnos);
}
else
{
prepareDispatchedCatalogSingleRelation(cxt, relid, FALSE, 0);
}
}
}
/*
* parse target list to handle function.
*/
void
prepareDispatchedCatalogTargets(QueryContextInfo *cxt, List *targets)
{
ListCell *lc;
Assert(NULL != targets);
Assert(NULL != cxt);
foreach(lc, targets)
{
TargetEntry *te = lfirst(lc);
if (te->resjunk)
continue;
if (te->expr)
prepareDispatchedCatalogFunctionExpr(cxt, te->expr);
}
}
/*
* parse plan for some functions in plan's targets.
*/
void
prepareDispatchedCatalogPlan(QueryContextInfo *cxt, Plan *plan)
{
if (!plan)
return;
Assert(NULL != cxt);
/*
Add session user pg_authid tuple to the context
*/
prepareDispatchedCatalogAuthUser(cxt, GetSessionUserId());
collect_func_walker((Node *)plan, cxt);
}
/*
* parse range table and collect metadata used for QE,
* add them to in-memory heap table for dispatcher.
*/
void
prepareDispatchedCatalog(QueryContextInfo *cxt, List *rtable)
{
ListCell *lc;
Assert(NULL != rtable);
Assert(NULL != cxt);
foreach(lc, rtable)
{
RangeTblEntry *rte = lfirst(lc);
switch (rte->rtekind)
{
case RTE_RELATION: /*ordinary relation reference */
/*
* Relations in range table are already included by calls to
* prepareDispatchedCatalogRelation from ExecutorStart().
*/
break;
case RTE_FUNCTION: /*function in FROM */
if (rte->funcexpr)
prepareDispatchedCatalogFunctionExpr(cxt, (Expr *)rte->funcexpr);
break;
case RTE_SUBQUERY: /*subquery in FROM */
if (rte->subquery)
{
List *sub_rtable = rte->subquery->rtable;
List *sub_targets = rte->subquery->targetList;
if (sub_rtable)
prepareDispatchedCatalog(cxt, sub_rtable);
if (sub_targets)
prepareDispatchedCatalogTargets(cxt, sub_targets);
}
break;
case RTE_VALUES: /*VALUES (<exprlist>), (<exprlist>): ... */
if (rte->values_lists)
{
ListCell *lc;
foreach(lc, rte->values_lists)
{
List *exprs = lfirst(lc);
if (exprs)
{
ListCell *cell;
foreach(cell, rte->values_lists)
{
Expr *expr = lfirst(cell);
prepareDispatchedCatalogFunctionExpr(cxt, expr);
}
}
}
}
break;
case RTE_JOIN: /*join */
case RTE_VOID: /*CDB: deleted RTE */
break;
case RTE_SPECIAL: /*special rule relation (NEW or OLD) */
elog(ERROR, "dispatch metedata for RTE_SPECIAL node is not implemented");
break;
case RTE_CTE: /*CommonTableExpr in FROM */
/* nothing to do for cte */
break;
case RTE_TABLEFUNCTION: /*CDB: Functions over multiset input */
/*TODO */
elog(ERROR, "dispatch metedata for RTE_TABLEFUNCTION node is not implemented");
break;
default:
Insist(!"never get here");
break;
}
}
}
/*
* create a hash table used to filter out the duplicated table
*/
HTAB *
createPrepareDispatchedCatalogRelationDisctinctHashTable(void)
{
HASHCTL info;
HTAB *rels = NULL;
MemSet(&info, 0, sizeof(info));
info.hash = tag_hash;
info.keysize = sizeof(QueryContextDispatchingHashKey);
info.entrysize = sizeof(QueryContextDispatchingHashEntry);
rels = hash_create("all relations", 10, &info, HASH_FUNCTION | HASH_ELEM);
return rels;
}
QueryContextDispatchingSendBack
CreateQueryContextDispatchingSendBack(int nfile)
{
QueryContextDispatchingSendBack rc =
palloc0(sizeof(struct QueryContextDispatchingSendBackData));
rc->numfiles = nfile;
rc->eof = palloc0(nfile * sizeof(int64));
rc->uncompressed_eof = palloc0(nfile * sizeof(int64));
return rc;
}
void
DropQueryContextDispatchingSendBack(QueryContextDispatchingSendBack sendback)
{
if (sendback)
{
if (sendback->eof)
pfree(sendback->eof);
if (sendback->uncompressed_eof)
pfree(sendback->uncompressed_eof);
pfree(sendback);
}
}
void
UpdateCatalogModifiedOnSegments(QueryContextDispatchingSendBack sendback)
{
Assert(NULL != sendback);
AppendOnlyEntry *aoEntry = GetAppendOnlyEntry(sendback->relid, SnapshotNow);
Assert(aoEntry != NULL);
Relation rel = heap_open(sendback->relid, AccessShareLock);
if (RelationIsAoRows(rel))
{
Insist(sendback->numfiles == 1);
UpdateFileSegInfo(rel, aoEntry, sendback->segno,
sendback->eof[0], sendback->uncompressed_eof[0], sendback->insertCount,
sendback->varblock);
}
else if (RelationIsParquet(rel))
{
Insist(sendback->numfiles == 1);
UpdateParquetFileSegInfo(rel, aoEntry, sendback->segno,
sendback->eof[0], sendback->uncompressed_eof[0], sendback->insertCount);
}
heap_close(rel, AccessShareLock);
/*
* make the change available
*/
CommandCounterIncrement();
}
StringInfo
PreSendbackChangedCatalog(int aocount)
{
StringInfo buf = makeStringInfo();
pq_beginmessage(buf, 'h');
/*
* 2, send number of relations
*/
pq_sendint(buf, aocount, 4);
return buf;
}
void
AddSendbackChangedCatalogContent(StringInfo buf,
QueryContextDispatchingSendBack sendback)
{
int i;
/*
* 3, send relation id
*/
pq_sendint(buf, sendback->relid, 4);
/*
* 4, send insertCount
*/
pq_sendint64(buf, sendback->insertCount);
/*
* 5, send segment file no.
*/
pq_sendint(buf, sendback->segno, 4);
/*
* 6, send varblock count
*/
pq_sendint64(buf, sendback->varblock);
/*
* 7, send number of files
*/
pq_sendint(buf, sendback->numfiles, 4);
for (i = 0; i < sendback->numfiles; ++i)
{
/*
* 8, send eof
*/
pq_sendint64(buf, sendback->eof[i]);
/*
* 9 send uncompressed eof
*/
pq_sendint64(buf, sendback->uncompressed_eof[i]);
}
}
void
FinishSendbackChangedCatalog(StringInfo buf)
{
pq_endmessage(buf);
pfree(buf);
}
static void GetExtTableLocationsArray(HeapTuple tuple,
TupleDesc exttab_desc,
Datum **array,
int *array_size)
{
Datum locations;
bool isNull = false;
locations = heap_getattr(tuple,
Anum_pg_exttable_location,
exttab_desc,
&isNull);
Insist(!isNull);
deconstruct_array(DatumGetArrayTypeP(locations),
TEXTOID, -1, false, 'i',
array, NULL, array_size);
Insist(*array_size > 0);
}
static char* GetExtTableFirstLocation(Datum *array)
{
return DatumGetCString(DirectFunctionCall1(textout, array[0]));
}
/*
* Using HAWQ's HDFS location, dispatch HDFS credentials to
* segments.
*
* The function uses a hdfs uri in the form hdfs://host:port/path
* or hdfs://nameservice/path where
* this value is taken from pg_filespace_entry which is populated
* based on hawq-site.xml's hawq_dfs_url entry.
*
* prepareDispatchedCatalogFileSystemCredential will store the token
* using port == 0 in HA case (otherwise the supplied port).
*/
static void AddFileSystemCredentialForPxfTable()
{
char* dfs_address = NULL;
if (!enable_secure_filesystem)
return;
get_hdfs_location_from_filespace(&dfs_address);
elog(DEBUG2, "about to acquire delegation token for %s", dfs_address);
prepareDispatchedCatalogFileSystemCredential(dfs_address);
pfree(dfs_address);
}
List *
fetchSegFileInfos(Oid relid, List *segnos)
{
List *result = NIL;
if ((segnos != NIL) && (list_length(segnos) > 0))
{
ListCell *lc;
char storageChar;
AppendOnlyEntry *aoEntry = NULL;
foreach (lc, segnos)
{
int segno = lfirst_int(lc);
ResultRelSegFileInfo *segfileinfo = makeNode(ResultRelSegFileInfo);
segfileinfo->segno = segno;
result = lappend(result, segfileinfo);
}
storageChar = get_relation_storage_type(relid);
/*
* Based on the pg_appendonly information, fetch
* the segment file information associated with
* this relation and this segno.
*/
if (RELSTORAGE_AOROWS == storageChar)
{
/*
* Get pg_appendonly information for this table.
*/
aoEntry = GetAppendOnlyEntry(relid, SnapshotNow);
AOFetchSegFileInfo(aoEntry, result, SnapshotNow);
}
else if (RELSTORAGE_PARQUET == storageChar)
{
/*
* Get pg_appendonly information for this table.
*/
aoEntry = GetAppendOnlyEntry(relid, SnapshotNow);
ParquetFetchSegFileInfo(aoEntry, result, SnapshotNow);
}
else
{
/*
* Get range info for current magma hash table, which is already
* available when get block location and do datalocality
*/
/*
* TODO(Zongtian): vsegno -> rg list -> {range list1, ..., range list N}
*/
Assert(RELSTORAGE_EXTERNAL == storageChar);
}
}
return result;
}
static ResultRelSegFileInfoMapNode *
fetchSegFileInfoMapNode(Oid relid, List *segnos)
{
ResultRelSegFileInfoMapNode *result = makeNode(ResultRelSegFileInfoMapNode);
result->relid = relid;
result->segfileinfos = fetchSegFileInfos(relid, segnos);
return result;
}
/*
* GetResultRelSegFileInfos
*
* Given the relid and its segnos for the result relation,
* fetch the segment file infomation associated those
* segment number.
*/
List *
GetResultRelSegFileInfos(Oid relid, List *segnomaps, List *existing_seginfomaps)
{
ResultRelSegFileInfoMapNode *result = NULL;
if (rel_is_partitioned(relid))
{
List *children = NIL;
ListCell *child;
children = find_all_inheritors(relid);
foreach (child, children)
{
Oid myrelid = lfirst_oid(child);
List *mysegnos = findSegnofromMap(myrelid, segnomaps);
result = fetchSegFileInfoMapNode(myrelid, mysegnos);
existing_seginfomaps = lappend(existing_seginfomaps, result);
}
list_free(children);
}
else
{
List *mysegnos = findSegnofromMap(relid, segnomaps);
result = fetchSegFileInfoMapNode(relid, mysegnos);
existing_seginfomaps = lappend(existing_seginfomaps, result);
}
return existing_seginfomaps;
}
/*
* prepareDfsAddressForDispatch
*
* Given the cxt use the currently set value of cxt->sharedPath
* and add it to cxt->buffer so that it is dispatched from
* master to segment. This is only required in the case of
* a secure filesystem.
*/
static void
prepareDfsAddressForDispatch(QueryContextInfo* cxt)
{
if (!enable_secure_filesystem)
return;
const char *namespace = cxt->sharedPath;
elog(DEBUG2, "*********************** MASTER SHARED PATH VALUE %s *********************", cxt->sharedPath);
/*
* Need to account for '\0' when dispatching length
*/
int size = strlen(namespace) + 1;
StringInfoData buffer;
initStringInfo(&buffer);
pq_sendint(&buffer, (int) Namespace, sizeof(char));
pq_sendint(&buffer, size, sizeof(int));
WriteData(cxt, buffer.data, buffer.len);
WriteData(cxt, namespace, size);
pfree(buffer.data);
}