blob: da160cefc4a9a6361a9435394253b35d6b625794 [file] [log] [blame]
/*
* Licensed to the Apache Software Foundation (ASF) under one
* or more contributor license agreements. See the NOTICE file
* distributed with this work for additional information
* regarding copyright ownership. The ASF licenses this file
* to you under the Apache License, Version 2.0 (the
* "License"); you may not use this file except in compliance
* with the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/
/*-------------------------------------------------------------------------
*
* aosegfiles.c
* routines to support manipulation of the pg_aoseg_<oid> relation
* that accompanies each Append Only relation.
*
* Portions Copyright (c) 2008, Greenplum Inc
* Portions Copyright (c) 1996-2006, PostgreSQL Global Development Group
* Portions Copyright (c) 1994, Regents of the University of California
*
*-------------------------------------------------------------------------
*/
#include "postgres.h"
#include "funcapi.h"
#include "miscadmin.h"
#include "access/heapam.h"
#include "access/genam.h"
#include "access/aosegfiles.h"
#include "catalog/pg_type.h"
#include "catalog/pg_proc.h"
#include "catalog/dependency.h"
#include "catalog/indexing.h"
#include "catalog/namespace.h"
#include "catalog/gp_fastsequence.h"
#include "catalog/aoseg.h"
#include "cdb/cdbvars.h"
#include "executor/spi.h"
#include "nodes/makefuncs.h"
#include "utils/acl.h"
#include "utils/builtins.h"
#include "utils/lsyscache.h"
#include "utils/syscache.h"
#include "utils/fmgroids.h"
#include "utils/numeric.h"
#include "access/filesplit.h"
#include "access/parquetsegfiles.h"
static Datum ao_compression_ratio_internal(Oid relid);
/* ------------------------------------------------------------------------
*
* FUNCTIONS FOR MANIPULATING AND QUERYING AO SEGMENT FILE CATALOG TABLES
*
* ------------------------------------------------------------------------
*/
FileSegInfo *
NewFileSegInfo(int segno)
{
FileSegInfo *fsinfo;
fsinfo = (FileSegInfo *) palloc0(sizeof(FileSegInfo));
fsinfo->segno = segno;
return fsinfo;
}
/*
* InsertFileSegInfo
*
* Adds an entry into the pg_aoseg_* table for this Append
* Only relation. Use use frozen_heap_insert so the tuple is
* frozen on insert.
*
* Also insert a new entry to gp_fastsequence for this segment file.
*/
void
InsertInitialSegnoEntry(AppendOnlyEntry *aoEntry, int segno)
{
Relation pg_aoseg_rel;
Relation pg_aoseg_idx;
TupleDesc pg_aoseg_dsc;
HeapTuple pg_aoseg_tuple = NULL;
int natts = 0;
bool *nulls;
Datum *values;
ItemPointerData tid;
Assert(aoEntry != NULL);
InsertFastSequenceEntry(aoEntry->segrelid,
(int64)segno,
0,
&tid);
if (segno == 0)
{
return;
}
pg_aoseg_rel = heap_open(aoEntry->segrelid, RowExclusiveLock);
pg_aoseg_dsc = RelationGetDescr(pg_aoseg_rel);
natts = pg_aoseg_dsc->natts;
nulls = palloc(sizeof(bool) * natts);
values = palloc0(sizeof(Datum) * natts);
MemSet(nulls, 0, sizeof(char) * natts);
if (Gp_role != GP_ROLE_EXECUTE)
pg_aoseg_idx = index_open(aoEntry->segidxid, RowExclusiveLock);
else
pg_aoseg_idx = NULL;
values[Anum_pg_aoseg_segno - 1] = Int32GetDatum(segno);
values[Anum_pg_aoseg_tupcount - 1] = Float8GetDatum(0);
values[Anum_pg_aoseg_varblockcount - 1] = Float8GetDatum(0);
values[Anum_pg_aoseg_eof - 1] = Float8GetDatum(0);
values[Anum_pg_aoseg_eofuncompressed - 1] = Float8GetDatum(0);
/*
* form the tuple and insert it
*/
pg_aoseg_tuple = heap_form_tuple(pg_aoseg_dsc, values, nulls);
if (!HeapTupleIsValid(pg_aoseg_tuple))
elog(ERROR, "failed to build AO file segment tuple");
frozen_heap_insert(pg_aoseg_rel, pg_aoseg_tuple);
if (Gp_role != GP_ROLE_EXECUTE)
CatalogUpdateIndexes(pg_aoseg_rel, pg_aoseg_tuple);
heap_freetuple(pg_aoseg_tuple);
if (Gp_role != GP_ROLE_EXECUTE)
index_close(pg_aoseg_idx, RowExclusiveLock);
heap_close(pg_aoseg_rel, RowExclusiveLock);
}
/*
* GetFileSegInfo
*
* Get the catalog entry for an appendonly (row-oriented) relation from the
* pg_aoseg_* relation that belongs to the currently used
* AppendOnly table.
*
* If a caller intends to append to this file segment entry they must
* already hold a relation Append-Only segment file (transaction-scope) lock (tag
* LOCKTAG_RELATION_APPENDONLY_SEGMENT_FILE) in order to guarantee
* stability of the pg_aoseg information on this segment file and exclusive right
* to append data to the segment file.
*/
FileSegInfo *
GetFileSegInfo(Relation parentrel, AppendOnlyEntry *aoEntry, Snapshot appendOnlyMetaDataSnapshot, int segno)
{
Relation pg_aoseg_rel;
TupleDesc pg_aoseg_dsc;
HeapTuple tuple;
ScanKeyData key[1];
SysScanDesc aoscan;
Datum eof, eof_uncompressed, tupcount, varbcount;
bool isNull;
bool indexOK;
Oid indexid;
FileSegInfo *fsinfo;
/*
* Check the pg_aoseg relation to be certain the ao table segment file
* is there.
*/
pg_aoseg_rel = heap_open(aoEntry->segrelid, AccessShareLock);
pg_aoseg_dsc = RelationGetDescr(pg_aoseg_rel);
if (Gp_role == GP_ROLE_EXECUTE)
{
indexOK = FALSE;
indexid = InvalidOid;
} else
{
indexOK = TRUE;
indexid = aoEntry->segidxid;
}
/*
* Setup a scan key to fetch from the index by segno.
*/
ScanKeyInit(&key[0], (AttrNumber) Anum_pg_aoseg_segno,
BTEqualStrategyNumber, F_INT4EQ, Int32GetDatum(segno));
aoscan = systable_beginscan(pg_aoseg_rel, indexid, indexOK, SnapshotNow, 1,
&key[0]);
tuple = systable_getnext(aoscan);
if (!HeapTupleIsValid(tuple))
{
/* This segment file does not have an entry. */
systable_endscan(aoscan);
heap_close(pg_aoseg_rel, AccessShareLock);
return NULL;
}
tuple = heap_copytuple(tuple);
systable_endscan(aoscan);
Assert(HeapTupleIsValid(tuple));
fsinfo = (FileSegInfo *) palloc0(sizeof(FileSegInfo));
/* get the eof */
eof = fastgetattr(tuple, Anum_pg_aoseg_eof, pg_aoseg_dsc, &isNull);
if(isNull)
ereport(ERROR,
(errcode(ERRCODE_UNDEFINED_OBJECT),
errmsg("got invalid eof value: NULL")));
/* get the tupcount */
tupcount = fastgetattr(tuple, Anum_pg_aoseg_tupcount, pg_aoseg_dsc, &isNull);
if(isNull)
ereport(ERROR,
(errcode(ERRCODE_UNDEFINED_OBJECT),
errmsg("got invalid tupcount value: NULL")));
/* get the varblock count */
varbcount = fastgetattr(tuple, Anum_pg_aoseg_varblockcount, pg_aoseg_dsc, &isNull);
if(isNull)
ereport(ERROR,
(errcode(ERRCODE_UNDEFINED_OBJECT),
errmsg("got invalid varblockcount value: NULL")));
/* get the uncompressed eof */
eof_uncompressed = fastgetattr(tuple, Anum_pg_aoseg_eofuncompressed, pg_aoseg_dsc, &isNull);
/*
* Confusing: This eof_uncompressed variable is never used. It appears we only
* call fastgetattr to get the isNull value. this variable "eof_uncompressed" is
* not at all the same as fsinfo->eof_uncompressed.
*/
if(isNull)
{
/*
* NULL is allowed. Tables that were created before the release of the
* eof_uncompressed catalog column will have a NULL instead of a value.
*/
fsinfo->eof_uncompressed = InvalidUncompressedEof;
}
else
{
fsinfo->eof_uncompressed = (int64)DatumGetFloat8(eof_uncompressed);
}
fsinfo->segno = segno;
fsinfo->eof = (int64)DatumGetFloat8(eof);
fsinfo->tupcount = (int64)DatumGetFloat8(tupcount);
fsinfo->varblockcount = (int64)DatumGetFloat8(varbcount);
ItemPointerSetInvalid(&fsinfo->sequence_tid);
if (fsinfo->eof < 0)
ereport(ERROR,
(errcode(ERRCODE_GP_INTERNAL_ERROR),
errmsg("Invalid eof " INT64_FORMAT " for relation %s",
fsinfo->eof, RelationGetRelationName(parentrel))));
/* Finish up scan and close appendonly catalog. */
heap_close(pg_aoseg_rel, AccessShareLock);
return fsinfo;
}
/*
* GetAllFileSegInfo
*
* Get all catalog entries for an appendonly relation from the
* pg_aoseg_* relation that belongs to the currently used
* AppendOnly table. This is basically a physical snapshot that a
* scanner can use to scan all the data in a local segment database.
*/
FileSegInfo **GetAllFileSegInfo(Relation parentrel,
AppendOnlyEntry *aoEntry,
Snapshot appendOnlyMetaDataSnapshot,
int *totalsegs)
{
Relation pg_aoseg_rel;
FileSegInfo **result;
pg_aoseg_rel = heap_open(aoEntry->segrelid, AccessShareLock);
result = GetAllFileSegInfo_pg_aoseg_rel(
RelationGetRelationName(parentrel),
aoEntry,
pg_aoseg_rel,
appendOnlyMetaDataSnapshot,
-1,
totalsegs);
heap_close(pg_aoseg_rel, AccessShareLock);
return result;
}
FileSegInfo **GetAllFileSegInfoWithSegno(Relation parentrel,
AppendOnlyEntry *aoEntry,
Snapshot appendOnlyMetaDataSnapshot,
int segno,
int *totalsegs)
{
Relation pg_aoseg_rel;
FileSegInfo **result;
pg_aoseg_rel = heap_open(aoEntry->segrelid, AccessShareLock);
result = GetAllFileSegInfo_pg_aoseg_rel(
RelationGetRelationName(parentrel),
aoEntry,
pg_aoseg_rel,
appendOnlyMetaDataSnapshot,
segno,
totalsegs);
heap_close(pg_aoseg_rel, AccessShareLock);
return result;
}
/*
* The comparison routine that sorts an array of FileSegInfos
* in the ascending order of the segment number.
*/
static int
aoFileSegInfoCmp(const void *left, const void *right)
{
FileSegInfo *leftSegInfo = *((FileSegInfo **)left);
FileSegInfo *rightSegInfo = *((FileSegInfo **)right);
if (leftSegInfo->segno < rightSegInfo->segno)
return -1;
if (leftSegInfo->segno > rightSegInfo->segno)
return 1;
return 0;
}
FileSegInfo **GetAllFileSegInfo_pg_aoseg_rel(
char *relationName,
AppendOnlyEntry *aoEntry,
Relation pg_aoseg_rel,
Snapshot appendOnlyMetaDataSnapshot,
int expectedSegno,
int *totalsegs)
{
TupleDesc pg_aoseg_dsc;
HeapTuple tuple;
SysScanDesc aoscan;
ScanKeyData key[2];
FileSegInfo **allseginfo;
FileSegInfo *oneseginfo;
int seginfo_no, numOfKey = 0;
int seginfo_slot_no = AO_FILESEGINFO_ARRAY_SIZE;
Datum segno,
eof,
eof_uncompressed,
tupcount,
varblockcount;
bool isNull;
pg_aoseg_dsc = RelationGetDescr(pg_aoseg_rel);
/* MPP-16407:
* Initialize the segment file information array, we first allocate 8 slot for the array,
* then array will be dynamically expanded later if necessary.
*/
allseginfo = (FileSegInfo **) palloc0(sizeof(FileSegInfo*) * seginfo_slot_no);
seginfo_no = 0;
if(expectedSegno>=0)
{
ScanKeyInit(&key[numOfKey++], (AttrNumber) Anum_pg_aoseg_segno,
BTEqualStrategyNumber, F_INT4EQ, Int32GetDatum(expectedSegno));
}
/*
* Now get the actual segfile information
*/
aoscan = systable_beginscan(pg_aoseg_rel, InvalidOid, FALSE,
appendOnlyMetaDataSnapshot, numOfKey, &key[0]);
oneseginfo = NULL;
while (HeapTupleIsValid(tuple = systable_getnext(aoscan)))
{
/* dynamically expand space for FileSegInfo* array */
if (seginfo_no >= seginfo_slot_no)
{
seginfo_slot_no *= 2;
allseginfo = (FileSegInfo **) repalloc(allseginfo, sizeof(FileSegInfo*) * seginfo_slot_no);
}
if (oneseginfo == NULL)
{
allseginfo[seginfo_no] = (FileSegInfo *)palloc0(sizeof(FileSegInfo));
oneseginfo = allseginfo[seginfo_no];
}
if (Gp_role == GP_ROLE_DISPATCH)
{
GetTupleVisibilitySummary(
tuple,
&oneseginfo->tupleVisibilitySummary);
}
segno = fastgetattr(tuple, Anum_pg_aoseg_segno, pg_aoseg_dsc, &isNull);
oneseginfo->segno = DatumGetInt32(segno);
eof = fastgetattr(tuple, Anum_pg_aoseg_eof, pg_aoseg_dsc, &isNull);
oneseginfo->eof = (int64)DatumGetFloat8(eof);
tupcount = fastgetattr(tuple, Anum_pg_aoseg_tupcount, pg_aoseg_dsc, &isNull);
oneseginfo->tupcount = (int64)DatumGetFloat8(tupcount);
varblockcount = fastgetattr(tuple, Anum_pg_aoseg_varblockcount, pg_aoseg_dsc, &isNull);
oneseginfo->varblockcount = (int64)DatumGetFloat8(varblockcount);
ItemPointerSetInvalid(&oneseginfo->sequence_tid);
eof_uncompressed = fastgetattr(tuple, Anum_pg_aoseg_eofuncompressed, pg_aoseg_dsc, &isNull);
if(isNull)
oneseginfo->eof_uncompressed = InvalidUncompressedEof;
else
oneseginfo->eof_uncompressed = (int64)DatumGetFloat8(eof);
seginfo_no++;
oneseginfo = NULL;
CHECK_FOR_INTERRUPTS();
}
systable_endscan(aoscan);
*totalsegs = seginfo_no;
if (*totalsegs == 0)
{
pfree(allseginfo);
return NULL;
}
/*
* Sort allseginfo by the order of segment file number.
*
* Currently this is only needed when building a bitmap index to guarantee the tids
* are in the ascending order. But since this array is pretty small, we just sort
* the array for all cases.
*/
qsort((char *)allseginfo, *totalsegs, sizeof(FileSegInfo *), aoFileSegInfoCmp);
return allseginfo;
}
/*
* Update the eof and filetupcount of an append only table.
*/
void
UpdateFileSegInfo(Relation parentrel,
AppendOnlyEntry *aoEntry,
int segno,
int64 eof,
int64 eof_uncompressed,
int64 tuples_added,
int64 varblocks_added)
{
LockAcquireResult acquireResult;
Relation pg_aoseg_rel;
TupleDesc pg_aoseg_dsc;
ScanKeyData key[1];
SysScanDesc aoscan;
HeapTuple tuple, new_tuple;
Datum filetupcount;
Datum filevarblockcount;
Datum new_tuple_count;
Datum new_varblock_count;
Datum *new_record;
bool *new_record_nulls;
bool *new_record_repl;
bool isNull;
/* overflow sanity checks. don't check the same for tuples_added,
* it may be coming as a negative diff from gp_update_ao_master_stats */
Assert(varblocks_added >= 0);
Assert(eof >= 0);
Insist(Gp_role != GP_ROLE_EXECUTE);
elog(DEBUG3, "UpdateFileSegInfo called. segno = %d", segno);
if (Gp_role != GP_ROLE_DISPATCH)
{
/*
* Verify we already have the write-lock!
*/
acquireResult = LockRelationAppendOnlySegmentFile(
&parentrel->rd_node,
segno,
AccessExclusiveLock,
/* dontWait */ false);
if (acquireResult != LOCKACQUIRE_ALREADY_HELD)
{
elog(ERROR, "Should already have the (transaction-scope) write-lock on Append-Only segment file #%d, "
"relation %s", segno, RelationGetRelationName(parentrel));
}
}
/*
* Open the aoseg relation and its index.
*/
pg_aoseg_rel = heap_open(aoEntry->segrelid, RowExclusiveLock);
pg_aoseg_dsc = pg_aoseg_rel->rd_att;
/*
* Setup a scan key to fetch from the index by segno.
*/
ScanKeyInit(&key[0], (AttrNumber) Anum_pg_aoseg_segno,
BTEqualStrategyNumber, F_INT4EQ, Int32GetDatum(segno));
aoscan = systable_beginscan(pg_aoseg_rel, aoEntry->segidxid, TRUE,
SnapshotNow, 1, &key[0]);
tuple = systable_getnext(aoscan);
if (!HeapTupleIsValid(tuple))
ereport(ERROR,
(errcode(ERRCODE_UNDEFINED_OBJECT),
errmsg("append-only table \"%s\" file segment \"%d\" entry "
"does not exist", RelationGetRelationName(parentrel),
segno)));
new_record = palloc0(sizeof(Datum) * pg_aoseg_dsc->natts);
new_record_nulls = palloc0(sizeof(bool) * pg_aoseg_dsc->natts);
new_record_repl = palloc0(sizeof(bool) * pg_aoseg_dsc->natts);
/* get the current tuple count so we can add to it */
filetupcount = fastgetattr(tuple,
Anum_pg_aoseg_tupcount,
pg_aoseg_dsc,
&isNull);
if(isNull)
ereport(ERROR,
(errcode(ERRCODE_UNDEFINED_OBJECT),
errmsg("got invalid pg_aoseg filetupcount value: NULL")));
/* calculate the new tuple count */
new_tuple_count = DirectFunctionCall2(float8pl,
filetupcount,
Float8GetDatum((float8)tuples_added));
/* get the current varblock count so we can add to it */
filevarblockcount = fastgetattr(tuple,
Anum_pg_aoseg_varblockcount,
pg_aoseg_dsc,
&isNull);
if(isNull)
ereport(ERROR,
(errcode(ERRCODE_UNDEFINED_OBJECT),
errmsg("got invalid pg_aoseg varblockcount value: NULL")));
/* calculate the new tuple count */
new_varblock_count = DirectFunctionCall2(float8pl,
filevarblockcount,
Float8GetDatum((float8)varblocks_added));
/*
* Build a tuple to update
*/
new_record[Anum_pg_aoseg_eof - 1] = Float8GetDatum((float8)eof);
new_record_repl[Anum_pg_aoseg_eof - 1] = true;
new_record[Anum_pg_aoseg_tupcount - 1] = new_tuple_count;
new_record_repl[Anum_pg_aoseg_tupcount - 1] = true;
new_record[Anum_pg_aoseg_varblockcount - 1] = new_varblock_count;
new_record_repl[Anum_pg_aoseg_varblockcount - 1] = true;
new_record[Anum_pg_aoseg_eofuncompressed - 1] = Float8GetDatum((float8)eof_uncompressed);
new_record_repl[Anum_pg_aoseg_eofuncompressed - 1] = true;
/*
* update the tuple in the pg_aoseg table
*/
new_tuple = heap_modify_tuple(tuple, pg_aoseg_dsc, new_record,
new_record_nulls, new_record_repl);
simple_heap_update(pg_aoseg_rel, &tuple->t_self, new_tuple);
CatalogUpdateIndexes(pg_aoseg_rel, new_tuple);
heap_freetuple(new_tuple);
/* Finish up scan */
systable_endscan(aoscan);
heap_close(pg_aoseg_rel, RowExclusiveLock);
pfree(new_record);
pfree(new_record_nulls);
pfree(new_record_repl);
}
/*
* GetSegFilesTotals
*
* Get the total bytes, tuples, and varblocks for a specific AO table
* from the pg_aoseg table on this local segdb.
*/
FileSegTotals *GetSegFilesTotals(Relation parentrel, Snapshot appendOnlyMetaDataSnapshot)
{
Relation pg_aoseg_rel;
TupleDesc pg_aoseg_dsc;
HeapTuple tuple;
SysScanDesc aoscan;
FileSegTotals *result;
Datum eof,
eof_uncompressed,
tupcount,
varblockcount;
bool isNull;
AppendOnlyEntry *aoEntry = NULL;
Assert(RelationIsAoRows(parentrel)); /* doesn't fit for AO column store. should implement same for CO */
aoEntry = GetAppendOnlyEntry(RelationGetRelid(parentrel), appendOnlyMetaDataSnapshot);
result = (FileSegTotals *) palloc0(sizeof(FileSegTotals));
pg_aoseg_rel = heap_open(aoEntry->segrelid, AccessShareLock);
pg_aoseg_dsc = RelationGetDescr(pg_aoseg_rel);
aoscan = systable_beginscan(pg_aoseg_rel, InvalidOid, FALSE,
appendOnlyMetaDataSnapshot, 0, NULL);
while (HeapTupleIsValid(tuple = systable_getnext(aoscan)))
{
eof = fastgetattr(tuple, Anum_pg_aoseg_eof, pg_aoseg_dsc, &isNull);
tupcount = fastgetattr(tuple, Anum_pg_aoseg_tupcount, pg_aoseg_dsc, &isNull);
varblockcount = fastgetattr(tuple, Anum_pg_aoseg_varblockcount, pg_aoseg_dsc, &isNull);
eof_uncompressed = fastgetattr(tuple, Anum_pg_aoseg_eofuncompressed, pg_aoseg_dsc, &isNull);
if(isNull)
result->totalbytesuncompressed = InvalidUncompressedEof;
else
result->totalbytesuncompressed += (int64)DatumGetFloat8(eof_uncompressed);
result->totalbytes += (int64)DatumGetFloat8(eof);
result->totaltuples += (int64)DatumGetFloat8(tupcount);
result->totalvarblocks += (int64)DatumGetFloat8(varblockcount);
result->totalfilesegs++;
CHECK_FOR_INTERRUPTS();
}
systable_endscan(aoscan);
heap_close(pg_aoseg_rel, AccessShareLock);
pfree(aoEntry);
return result;
}
/*
* GetAOTotalBytes
*
* Get the total bytes for a specific AO table from the pg_aoseg table on master.
*
* In hawq, master keep all segfile info in pg_aoseg table,
* therefore it get the whole table size.
*/
int64 GetAOTotalBytes(Relation parentrel, Snapshot appendOnlyMetaDataSnapshot)
{
Relation pg_aoseg_rel;
TupleDesc pg_aoseg_dsc;
HeapTuple tuple;
SysScanDesc aoscan;
int64 result;
Datum eof;
bool isNull;
AppendOnlyEntry *aoEntry = NULL;
aoEntry = GetAppendOnlyEntry(RelationGetRelid(parentrel), appendOnlyMetaDataSnapshot);
result = 0;
pg_aoseg_rel = heap_open(aoEntry->segrelid, AccessShareLock);
pg_aoseg_dsc = RelationGetDescr(pg_aoseg_rel);
Assert (Gp_role != GP_ROLE_EXECUTE);
aoscan = systable_beginscan(pg_aoseg_rel, InvalidOid, FALSE,
appendOnlyMetaDataSnapshot, 0, NULL);
while (HeapTupleIsValid(tuple = systable_getnext(aoscan)))
{
eof = fastgetattr(tuple, Anum_pg_aoseg_eof, pg_aoseg_dsc, &isNull);
Assert(!isNull);
result += (int64)DatumGetFloat8(eof);
CHECK_FOR_INTERRUPTS();
}
systable_endscan(aoscan);
heap_close(pg_aoseg_rel, AccessShareLock);
pfree(aoEntry);
return result;
}
PG_FUNCTION_INFO_V1(gp_aoseg_history);
extern Datum
gp_aoseg_history(PG_FUNCTION_ARGS);
Datum
gp_aoseg_history(PG_FUNCTION_ARGS)
{
int aoRelOid = PG_GETARG_OID(0);
typedef struct Context
{
Oid aoRelOid;
FileSegInfo **aoSegfileArray;
int totalAoSegFiles;
int segfileArrayIndex;
} Context;
FuncCallContext *funcctx;
Context *context;
if (Gp_role != GP_ROLE_DISPATCH)
{
ereport(ERROR,
(errcode(ERRCODE_GP_COMMAND_ERROR),
errmsg("gp_aoseg_history cannot be called on segments.")));
}
if (SRF_IS_FIRSTCALL())
{
TupleDesc tupdesc;
MemoryContext oldcontext;
Relation aocsRel;
AppendOnlyEntry *aoEntry;
Relation pg_aoseg_rel;
/* create a function context for cross-call persistence */
funcctx = SRF_FIRSTCALL_INIT();
/*
* switch to memory context appropriate for multiple function
* calls
*/
oldcontext = MemoryContextSwitchTo(funcctx->multi_call_memory_ctx);
/* build tupdesc for result tuples */
tupdesc = CreateTemplateTupleDesc(15, false);
TupleDescInitEntry(tupdesc, (AttrNumber) 1, "gp_tid",
TIDOID, -1, 0);
TupleDescInitEntry(tupdesc, (AttrNumber) 2, "gp_xmin",
INT4OID, -1, 0);
TupleDescInitEntry(tupdesc, (AttrNumber) 3, "gp_xmin_status",
TEXTOID, -1, 0);
TupleDescInitEntry(tupdesc, (AttrNumber) 4, "gp_xmin_commit_distrib_id",
TEXTOID, -1, 0);
TupleDescInitEntry(tupdesc, (AttrNumber) 5, "gp_xmax",
INT4OID, -1, 0);
TupleDescInitEntry(tupdesc, (AttrNumber) 6, "gp_xmax_status",
TEXTOID, -1, 0);
TupleDescInitEntry(tupdesc, (AttrNumber) 7, "gp_xmax_commit_distrib_id",
TEXTOID, -1, 0);
TupleDescInitEntry(tupdesc, (AttrNumber) 8, "gp_command_id",
INT4OID, -1, 0);
TupleDescInitEntry(tupdesc, (AttrNumber) 9, "gp_infomask",
TEXTOID, -1, 0);
TupleDescInitEntry(tupdesc, (AttrNumber) 10, "gp_update_tid",
TIDOID, -1, 0);
TupleDescInitEntry(tupdesc, (AttrNumber) 11, "gp_visibility",
TEXTOID, -1, 0);
TupleDescInitEntry(tupdesc, (AttrNumber) 12, "segno",
INT4OID, -1, 0);
TupleDescInitEntry(tupdesc, (AttrNumber) 13, "tupcount",
INT8OID, -1, 0);
TupleDescInitEntry(tupdesc, (AttrNumber) 14, "eof",
INT8OID, -1, 0);
TupleDescInitEntry(tupdesc, (AttrNumber) 15, "eof_uncompressed",
INT8OID, -1, 0);
funcctx->tuple_desc = BlessTupleDesc(tupdesc);
/*
* Collect all the locking information that we will format and send
* out as a result set.
*/
context = (Context *) palloc(sizeof(Context));
funcctx->user_fctx = (void *) context;
context->aoRelOid = aoRelOid;
aocsRel = heap_open(aoRelOid, NoLock);
if(!RelationIsAoRows(aocsRel))
ereport(ERROR,
(errcode(ERRCODE_INVALID_PARAMETER_VALUE),
errmsg("'%s' is not an append-only row relation",
RelationGetRelationName(aocsRel))));
aoEntry = GetAppendOnlyEntry(aoRelOid, SnapshotNow);
pg_aoseg_rel = heap_open(aoEntry->segrelid, NoLock);
context->aoSegfileArray =
GetAllFileSegInfo_pg_aoseg_rel(
RelationGetRelationName(aocsRel),
aoEntry,
pg_aoseg_rel,
SnapshotAny, // Get ALL tuples from pg_aoseg_% including aborted and in-progress ones.
-1,
&context->totalAoSegFiles);
heap_close(pg_aoseg_rel, NoLock);
heap_close(aocsRel, NoLock);
// Iteration position.
context->segfileArrayIndex = 0;
funcctx->user_fctx = (void *) context;
MemoryContextSwitchTo(oldcontext);
}
funcctx = SRF_PERCALL_SETUP();
context = (Context *) funcctx->user_fctx;
/*
* Process each column for each segment file.
*/
while (true)
{
Datum values[15];
bool nulls[15];
HeapTuple tuple;
Datum result;
struct FileSegInfo *aoSegfile;
if (context->segfileArrayIndex >= context->totalAoSegFiles)
{
break;
}
/*
* Form tuple with appropriate data.
*/
MemSet(values, 0, sizeof(values));
MemSet(nulls, 0, sizeof(nulls));
aoSegfile = context->aoSegfileArray[context->segfileArrayIndex];
GetTupleVisibilitySummaryDatums(
&values[0],
&nulls[0],
&aoSegfile->tupleVisibilitySummary);
values[11] = Int32GetDatum(aoSegfile->segno);
values[12] = Int64GetDatum(aoSegfile->tupcount);
values[13] = Int64GetDatum(aoSegfile->eof);
values[14] = Int64GetDatum(aoSegfile->eof_uncompressed);
tuple = heap_form_tuple(funcctx->tuple_desc, values, nulls);
result = HeapTupleGetDatum(tuple);
// Indicate we emitted one segment file.
context->segfileArrayIndex++;
SRF_RETURN_NEXT(funcctx, result);
}
SRF_RETURN_DONE(funcctx);
}
#define GET_STR(textp) DatumGetCString(DirectFunctionCall1(textout, PointerGetDatum(textp)))
static Datum
gp_update_aorow_master_stats_internal(Relation parentrel, Snapshot appendOnlyMetaDataSnapshot)
{
StringInfoData sqlstmt;
bool connected = false;
char *aoseg_relname;
int proc;
int ret;
int64 total_count = 0;
MemoryContext oldcontext = CurrentMemoryContext;
AppendOnlyEntry *aoEntry = NULL;
aoEntry = GetAppendOnlyEntry(RelationGetRelid(parentrel), appendOnlyMetaDataSnapshot);
Assert(aoEntry != NULL);
/*
* get the name of the aoseg relation
*/
aoseg_relname = get_rel_name(aoEntry->segrelid);
if (NULL == aoseg_relname)
elog(ERROR, "failed to get relname for AO file segment");
/*
* assemble our query string
*/
initStringInfo(&sqlstmt);
appendStringInfo(&sqlstmt, "select segno,sum(tupcount) "
"from gp_dist_random('pg_aoseg.%s') "
"group by (segno)", aoseg_relname);
PG_TRY();
{
if (SPI_OK_CONNECT != SPI_connect())
{
ereport(ERROR, (errcode(ERRCODE_CDB_INTERNAL_ERROR),
errmsg("Unable to obtain AO relation information from segment databases."),
errdetail("SPI_connect failed in gp_update_ao_master_stats")));
}
connected = true;
/* Do the query. */
ret = SPI_execute(sqlstmt.data, false, 0);
proc = SPI_processed;
if (ret > 0 && SPI_tuptable != NULL)
{
TupleDesc tupdesc = SPI_tuptable->tupdesc;
SPITupleTable *tuptable = SPI_tuptable;
int i;
/*
* Iterate through each result tuple
*/
for (i = 0; i < proc; i++)
{
HeapTuple tuple = tuptable->vals[i];
FileSegInfo *fsinfo = NULL;
int qe_segno;
int64 qe_tupcount;
char *val_segno;
char *val_tupcount;
MemoryContext cxt_save;
/*
* Get totals from QE's for a specific segment
*/
val_segno = SPI_getvalue(tuple, tupdesc, 1);
val_tupcount = SPI_getvalue(tuple, tupdesc, 2);
/* use our own context so that SPI won't free our stuff later */
cxt_save = MemoryContextSwitchTo(oldcontext);
/*
* Convert to desired data type
*/
qe_segno = pg_atoi(val_segno, sizeof(int32), 0);
qe_tupcount = (int64)DatumGetFloat8(DirectFunctionCall1(float8in,
CStringGetDatum(val_tupcount)));
total_count += qe_tupcount;
/*
* Get the numbers on the QD for this segment
*/
// CONSIDER: For integrity, we should lock ALL segment files first before
// executing the query. And, the query of the segments (the SPI_execute)
// and the update (UpdateFileSegInfo) should be in the same transaction.
//
// If there are concurrent Append-Only inserts, we can end up with
// the wrong answer...
//
// NOTE: This is a transaction scope lock that must be held until commit / abort.
//
LockRelationAppendOnlySegmentFile(
&parentrel->rd_node,
qe_segno,
AccessExclusiveLock,
/* dontWait */ false);
fsinfo = GetFileSegInfo(parentrel, aoEntry, appendOnlyMetaDataSnapshot, qe_segno);
if (fsinfo == NULL)
{
Assert(!"master should dispatch seginfo to all QE");
InsertInitialSegnoEntry(aoEntry, qe_segno);
fsinfo = NewFileSegInfo(qe_segno);
}
/*
* check if numbers match.
* NOTE: proper way is to use int8eq() but since we
* don't expect any NAN's in here better do it directly
*/
if(fsinfo->tupcount != qe_tupcount)
{
int64 diff = qe_tupcount - fsinfo->tupcount;
elog(DEBUG3, "gp_update_ao_master_stats: updating "
"segno %d with tupcount %d", qe_segno,
(int)qe_tupcount);
/*
* QD tup count != QE tup count. update QD count by
* passing in the diff (may be negative sometimes).
*/
UpdateFileSegInfo(parentrel, aoEntry, qe_segno, 0, 0, diff, 0);
}
else
elog(DEBUG3, "gp_update_ao_master_stats: no need to "
"update segno %d. it is synced", qe_segno);
pfree(fsinfo);
MemoryContextSwitchTo(cxt_save);
/*
* TODO: if an entry exists for this rel in the AO hash table
* need to also update that entry in shared memory. Need to
* figure out how to do this safely when concurrent operations
* are in progress. note that if no entry exists we are ok.
*
* At this point this doesn't seem too urgent as we generally
* only expect this function to update segno 0 only and the QD
* never cares about segment 0 anyway.
*/
}
}
connected = false;
SPI_finish();
}
/* Clean up in case of error. */
PG_CATCH();
{
if (connected)
SPI_finish();
/* Carry on with error handling. */
PG_RE_THROW();
}
PG_END_TRY();
pfree(aoEntry);
pfree(sqlstmt.data);
PG_RETURN_FLOAT8((float8)total_count);
}
/*
* gp_update_ao_master_stats
*
* This function is mainly created to handle cases that our product allowed
* loading data into an append only table in utility mode, and as a result
* the QD gets out of sync as to the number of rows in this table for each
* segment. An example for this scenario is gp_restore. running this function
* puts the QD aoseg table back in sync.
*/
static Datum
gp_update_ao_master_stats_internal(Oid relid, Snapshot appendOnlyMetaDataSnapshot)
{
Relation parentrel;
Datum returnDatum;
/* open the parent (main) relation */
parentrel = heap_open(relid, RowExclusiveLock);
if(!RelationIsAoRows(parentrel))
ereport(ERROR,
(errcode(ERRCODE_INVALID_PARAMETER_VALUE),
errmsg("'%s' is not an append-only relation",
RelationGetRelationName(parentrel))));
if (RelationIsAoRows(parentrel))
{
returnDatum = gp_update_aorow_master_stats_internal(parentrel, appendOnlyMetaDataSnapshot);
}
heap_close(parentrel, RowExclusiveLock);
return returnDatum;
}
Datum
gp_update_ao_master_stats_name(PG_FUNCTION_ARGS)
{
RangeVar *parentrv;
text *relname = PG_GETARG_TEXT_P(0);
Oid relid;
/*
* gp_update_ao_master_stats is used only in gp_restore which is not supported yet.
*/
ereport(ERROR,
(errcode(ERRCODE_FEATURE_NOT_SUPPORTED),
errmsg("function gp_update_ao_master_stats not supported"),
errOmitLocation(true)));
Assert(Gp_role != GP_ROLE_EXECUTE);
parentrv = makeRangeVarFromNameList(textToQualifiedNameList(relname));
relid = RangeVarGetRelid(parentrv, false, false /*allowHcatalog*/);
return gp_update_ao_master_stats_internal(relid, SnapshotNow);
}
/*
* get_ao_compression_ratio_oid
*
* same as get_ao_compression_ratio_name, but takes rel oid as argument.
*/
Datum
gp_update_ao_master_stats_oid(PG_FUNCTION_ARGS)
{
Oid relid = PG_GETARG_OID(0);
/*
* gp_update_ao_master_stats is used only in gp_restore which is not supported yet.
*/
ereport(ERROR,
(errcode(ERRCODE_FEATURE_NOT_SUPPORTED),
errmsg("function gp_update_ao_master_stats not supported"),
errOmitLocation(true)));
Assert(Gp_role != GP_ROLE_EXECUTE);
return gp_update_ao_master_stats_internal(relid, SnapshotNow);
}
typedef struct
{
int index;
int rows;
} QueryInfo;
/**************************************************************
* get_ao_distribution_oid
* get_ao_distribution_name
*
* given an AO table name or oid, show the total distribution
* of rows across all segment databases in the system.
*
* TODO: for now these 2 functions are almost completely
* duplicated. See how to factor out a common internal function
* such as done in get_ao_compression_ratio below.
**************************************************************/
Datum
get_ao_distribution_oid(PG_FUNCTION_ARGS)
{
FuncCallContext *funcctx;
MemoryContext oldcontext;
AclResult aclresult;
QueryInfo *query_block = NULL;
StringInfoData sqlstmt;
Relation parentrel;
char *aoseg_relname;
int ret;
Oid relid = PG_GETARG_OID(0);
Assert(Gp_role != GP_ROLE_EXECUTE);
/*
* stuff done only on the first call of the function. In here we
* execute the query, gather the result rows and keep them in our
* context so that we could return them in the next calls to this func.
*/
if (SRF_IS_FIRSTCALL())
{
bool connected = false;
Oid segrelid;
funcctx = SRF_FIRSTCALL_INIT();
/* open the parent (main) relation */
parentrel = heap_open(relid, AccessShareLock);
/*
* check permission to SELECT from this table (this function
* is effectively a form of COUNT(*) FROM TABLE
*/
aclresult = pg_class_aclcheck(parentrel->rd_id, GetUserId(),
ACL_SELECT);
if (aclresult != ACLCHECK_OK)
aclcheck_error(aclresult,
ACL_KIND_CLASS,
RelationGetRelationName(parentrel));
/*
* verify this is an AO relation
*/
if(!RelationIsAoRows(parentrel))
ereport(ERROR,
(errcode(ERRCODE_INVALID_PARAMETER_VALUE),
errmsg("'%s' is not an append-only relation",
RelationGetRelationName(parentrel))));
GetAppendOnlyEntryAuxOids(RelationGetRelid(parentrel), SnapshotNow,
&segrelid, NULL, NULL, NULL);
Assert(OidIsValid(segrelid));
/*
* get the name of the aoseg relation
*/
aoseg_relname = get_rel_name(segrelid);
if (NULL == aoseg_relname)
elog(ERROR, "failed to get relname for AO file segment");
/*
* assemble our query string
*/
initStringInfo(&sqlstmt);
if (RelationIsAoRows(parentrel))
appendStringInfo(&sqlstmt, "select gp_segment_id,sum(tupcount) "
"from gp_dist_random('pg_aoseg.%s') "
"group by (gp_segment_id)", aoseg_relname);
PG_TRY();
{
if (SPI_OK_CONNECT != SPI_connect())
{
ereport(ERROR, (errcode(ERRCODE_CDB_INTERNAL_ERROR),
errmsg("Unable to obtain AO relation information from segment databases."),
errdetail("SPI_connect failed in get_ao_distribution")));
}
connected = true;
/* Do the query. */
ret = SPI_execute(sqlstmt.data, false, 0);
if (ret > 0 && SPI_tuptable != NULL)
{
QueryInfo *query_block_state = NULL;
/* switch to memory context appropriate for multiple function calls */
oldcontext = MemoryContextSwitchTo(funcctx->multi_call_memory_ctx);
funcctx->tuple_desc = BlessTupleDesc(SPI_tuptable->tupdesc);
/*
* Allocate cross-call state, so that we can keep track of
* where we're at in the processing.
*/
query_block_state = (QueryInfo *) palloc0( sizeof(QueryInfo) );
funcctx->user_fctx = (int *)query_block_state;
query_block_state->index = 0;
query_block_state->rows = SPI_processed;
MemoryContextSwitchTo(oldcontext);
}
}
/* Clean up in case of error. */
PG_CATCH();
{
if (connected)
SPI_finish();
/* Carry on with error handling. */
PG_RE_THROW();
}
PG_END_TRY();
pfree(sqlstmt.data);
heap_close(parentrel, AccessShareLock);
}
/*
* Per-call operations
*/
funcctx = SRF_PERCALL_SETUP();
query_block = (QueryInfo *)funcctx->user_fctx;
if ( query_block->index < query_block->rows )
{
/*
* Get heaptuple from SPI, then deform it, and reform it using
* our tuple desc.
* If we don't do this, but rather try to pass the tuple from SPI
* directly back, we get an error because
* the tuple desc that is associated with the SPI call
* has not been blessed.
*/
HeapTuple tuple = SPI_tuptable->vals[query_block->index++];
TupleDesc tupleDesc = funcctx->tuple_desc;
Datum *values = (Datum *) palloc(tupleDesc->natts * sizeof(Datum));
bool *nulls = (bool *) palloc(tupleDesc->natts * sizeof(bool));
HeapTuple res = NULL;
Datum result;
heap_deform_tuple(tuple, tupleDesc, values, nulls);
res = heap_form_tuple(tupleDesc, values, nulls );
pfree(values);
pfree(nulls);
/* make the tuple into a datum */
result = HeapTupleGetDatum(res);
SRF_RETURN_NEXT(funcctx, result);
}
/*
* do when there is no more left
*/
pfree(query_block);
SPI_finish();
funcctx->user_fctx = NULL;
SRF_RETURN_DONE(funcctx);
}
Datum
get_ao_distribution_name(PG_FUNCTION_ARGS)
{
FuncCallContext *funcctx;
MemoryContext oldcontext;
AclResult aclresult;
QueryInfo *query_block = NULL;
StringInfoData sqlstmt;
RangeVar *parentrv;
Relation parentrel;
char *aoseg_relname;
int ret;
text *relname = PG_GETARG_TEXT_P(0);
Oid relid;
Assert(Gp_role != GP_ROLE_EXECUTE);
/*
* stuff done only on the first call of the function. In here we
* execute the query, gather the result rows and keep them in our
* context so that we could return them in the next calls to this func.
*/
if (SRF_IS_FIRSTCALL())
{
bool connected = false;
Oid segrelid = InvalidOid;
funcctx = SRF_FIRSTCALL_INIT();
parentrv = makeRangeVarFromNameList(textToQualifiedNameList(relname));
relid = RangeVarGetRelid(parentrv, false, true /*allowHcatalog*/);
/* get the relid of the parent (main) relation */
parentrel = heap_openrv(parentrv, AccessShareLock);
/*
* check permission to SELECT from this table (this function
* is effectively a form of COUNT(*) FROM TABLE
*/
aclresult = pg_class_aclcheck(parentrel->rd_id, GetUserId(),
ACL_SELECT);
if (aclresult != ACLCHECK_OK)
aclcheck_error(aclresult,
ACL_KIND_CLASS,
RelationGetRelationName(parentrel));
/*
* verify this is an AO relation
*/
if(!RelationIsAoRows(parentrel))
ereport(ERROR,
(errcode(ERRCODE_INVALID_PARAMETER_VALUE),
errmsg("'%s' is not an append-only relation",
RelationGetRelationName(parentrel))));
GetAppendOnlyEntryAuxOids(RelationGetRelid(parentrel), SnapshotNow,
&segrelid,
NULL, NULL, NULL);
Assert(OidIsValid(segrelid));
/*
* get the name of the aoseg relation
*/
aoseg_relname = get_rel_name(segrelid);
if (NULL == aoseg_relname)
elog(ERROR, "failed to get relname for AO file segment");
/*
* assemble our query string
*/
initStringInfo(&sqlstmt);
appendStringInfo(&sqlstmt, "select gp_segment_id,sum(tupcount) "
"from gp_dist_random('pg_aoseg.%s') "
"group by (gp_segment_id)", aoseg_relname);
PG_TRY();
{
if (SPI_OK_CONNECT != SPI_connect())
{
ereport(ERROR, (errcode(ERRCODE_CDB_INTERNAL_ERROR),
errmsg("Unable to obtain AO relation information from segment databases."),
errdetail("SPI_connect failed in get_ao_distribution")));
}
connected = true;
/* Do the query. */
ret = SPI_execute(sqlstmt.data, false, 0);
if (ret > 0 && SPI_tuptable != NULL)
{
QueryInfo *query_block_state = NULL;
/* switch to memory context appropriate for multiple function calls */
oldcontext = MemoryContextSwitchTo(funcctx->multi_call_memory_ctx);
funcctx->tuple_desc = BlessTupleDesc(SPI_tuptable->tupdesc);
/*
* Allocate cross-call state, so that we can keep track of
* where we're at in the processing.
*/
query_block_state = (QueryInfo *) palloc0( sizeof(QueryInfo) );
funcctx->user_fctx = (int *)query_block_state;
query_block_state->index = 0;
query_block_state->rows = SPI_processed;
MemoryContextSwitchTo(oldcontext);
}
}
/* Clean up in case of error. */
PG_CATCH();
{
if (connected)
SPI_finish();
/* Carry on with error handling. */
PG_RE_THROW();
}
PG_END_TRY();
pfree(sqlstmt.data);
heap_close(parentrel, AccessShareLock);
}
/*
* Per-call operations
*/
funcctx = SRF_PERCALL_SETUP();
query_block = (QueryInfo *)funcctx->user_fctx;
if ( query_block->index < query_block->rows )
{
/*
* Get heaptuple from SPI, then deform it, and reform it using
* our tuple desc.
* If we don't do this, but rather try to pass the tuple from SPI
* directly back, we get an error because
* the tuple desc that is associated with the SPI call
* has not been blessed.
*/
HeapTuple tuple = SPI_tuptable->vals[query_block->index++];
TupleDesc tupleDesc = funcctx->tuple_desc;
Datum *values = (Datum *) palloc(tupleDesc->natts * sizeof(Datum));
bool *nulls = (bool *) palloc(tupleDesc->natts * sizeof(bool));
HeapTuple res = NULL;
Datum result;
heap_deform_tuple(tuple, tupleDesc, values, nulls);
res = heap_form_tuple(tupleDesc, values, nulls );
pfree(values);
pfree(nulls);
/* make the tuple into a datum */
result = HeapTupleGetDatum(res);
SRF_RETURN_NEXT(funcctx, result);
}
/*
* do when there is no more left
*/
pfree(query_block);
SPI_finish();
funcctx->user_fctx = NULL;
SRF_RETURN_DONE(funcctx);
}
/**************************************************************
* get_ao_compression_ratio_oid
* get_ao_compression_ratio_name
*
* Given an append-only table name or oid calculate the effective
* compression ratio for this append only table stored data.
* If this info is not available (pre 3.3 created tables) then
* return -1.
**************************************************************/
Datum
get_ao_compression_ratio_name(PG_FUNCTION_ARGS)
{
RangeVar *parentrv;
text *relname = PG_GETARG_TEXT_P(0);
Oid relid;
/* Assert(Gp_role != GP_ROLE_EXECUTE); */
parentrv = makeRangeVarFromNameList(textToQualifiedNameList(relname));
relid = RangeVarGetRelid(parentrv, false, true /*allowHcatalog*/);
return ao_compression_ratio_internal(relid);
}
/*
* get_ao_compression_ratio_oid
*
* same as get_ao_compression_ratio_name, but takes rel oid as argument.
*/
Datum
get_ao_compression_ratio_oid(PG_FUNCTION_ARGS)
{
Oid relid = PG_GETARG_OID(0);
/* Assert(Gp_role != GP_ROLE_EXECUTE); */
return ao_compression_ratio_internal(relid);
}
static Datum
aorow_compression_ratio_internal(Relation parentrel)
{
Relation pg_aoseg_rel;
TupleDesc pg_aoseg_dsc;
HeapTuple tuple;
HeapScanDesc aoscan;
Datum eof;
Datum eof_uncompressed;
float8 total_eof = 0;
float8 total_eof_uncompressed = 0;
bool isNull;
AppendOnlyEntry *aoEntry = NULL;
float8 compress_ratio = -1; /* the default, meaning "not available" */
Assert(AmIMaster()); /* Make sure this function is called in master */
aoEntry = GetAppendOnlyEntry(RelationGetRelid(parentrel), SnapshotNow);
Assert(aoEntry != NULL);
pg_aoseg_rel = heap_open(aoEntry->segrelid, AccessShareLock);
pg_aoseg_dsc = RelationGetDescr(pg_aoseg_rel);
aoscan = heap_beginscan(pg_aoseg_rel, SnapshotNow, 0, NULL);
while (HeapTupleIsValid(tuple = heap_getnext(aoscan, ForwardScanDirection)))
{
eof = fastgetattr(tuple, Anum_pg_aoseg_eof, pg_aoseg_dsc, &isNull);
Assert(!isNull);
eof_uncompressed = fastgetattr(tuple, Anum_pg_aoseg_eofuncompressed, pg_aoseg_dsc, &isNull);
Assert(!isNull);
total_eof += DatumGetFloat8(eof);
total_eof_uncompressed += DatumGetFloat8(eof_uncompressed);
CHECK_FOR_INTERRUPTS();
}
heap_endscan(aoscan);
heap_close(pg_aoseg_rel, AccessShareLock);
pfree(aoEntry);
if (total_eof > 0)
{
char buf[8];
/* calculate the compression ratio */
float8 compress_ratio_raw = DatumGetFloat8(DirectFunctionCall2(float8div,
Float8GetDatum(total_eof_uncompressed),
Float8GetDatum(total_eof)));
/* format to 2 digits past the decimal point */
snprintf(buf, 8, "%.2f", compress_ratio_raw);
/* format to 2 digit decimal precision */
compress_ratio = DatumGetFloat8(DirectFunctionCall1(float8in,
CStringGetDatum(buf)));
}
PG_RETURN_FLOAT8(compress_ratio);
}
static Datum
ao_compression_ratio_internal(Oid relid)
{
Relation parentrel;
Datum returnDatum;
/* open the parent (main) relation */
parentrel = heap_open(relid, AccessShareLock);
if(!RelationIsAoRows(parentrel) && !RelationIsParquet(parentrel))
ereport(ERROR,
(errcode(ERRCODE_INVALID_PARAMETER_VALUE),
errmsg("'%s' is not an append-only relation",
RelationGetRelationName(parentrel))));
if (RelationIsAoRows(parentrel))
{
returnDatum = aorow_compression_ratio_internal(parentrel);
}
else
{
returnDatum = parquet_compression_ration_internal(parentrel);
}
heap_close(parentrel, AccessShareLock);
return returnDatum;
}
void
FreeAllSegFileInfo(FileSegInfo **allSegInfo, int totalSegFiles)
{
Assert(allSegInfo);
for(int file_no = 0; file_no < totalSegFiles; file_no++)
{
Assert(allSegInfo[file_no] != NULL);
pfree(allSegInfo[file_no]);
}
}
/*
* Get all segment file splits associated with
* one appendonly table.
*/
List *
AOGetAllSegFileSplits(AppendOnlyEntry *aoEntry,
Snapshot appendOnlyMetaDataSnapshot)
{
Relation pg_aoseg_rel;
TupleDesc pg_aoseg_dsc;
HeapTuple tuple;
SysScanDesc aoscan;
List *splits = NIL;
pg_aoseg_rel = heap_open(aoEntry->segrelid, AccessShareLock);
pg_aoseg_dsc = RelationGetDescr(pg_aoseg_rel);
aoscan = systable_beginscan(pg_aoseg_rel, InvalidOid, FALSE,
appendOnlyMetaDataSnapshot, 0, NULL);
while (HeapTupleIsValid(tuple = systable_getnext(aoscan)))
{
int64 file_size = (int64)DatumGetFloat8(fastgetattr(tuple, Anum_pg_aoseg_eof, pg_aoseg_dsc, NULL));
int segno = DatumGetInt32(fastgetattr(tuple, Anum_pg_aoseg_segno, pg_aoseg_dsc, NULL));
if (aoEntry->majorversion < 2)
{
FileSplit split = makeNode(FileSplitNode);
split->segno = segno;
split->offsets = 0;
split->lengths = file_size;
splits = lappend(splits, split);
}
else
{
int64 remaining_size = file_size;
int64 offset = 0;
FileSplit split = NULL;
while (remaining_size >= aoEntry->splitsize)
{
split = makeNode(FileSplitNode);
split->segno = segno;
split->offsets = offset;
split->lengths = aoEntry->splitsize;
splits = lappend(splits, split);
offset += aoEntry->splitsize;
remaining_size -= aoEntry->splitsize;
}
if (remaining_size > 0)
{
split = makeNode(FileSplitNode);
split->segno = segno;
split->offsets = offset;
split->lengths = remaining_size;
splits = lappend(splits, split);
}
}
}
systable_endscan(aoscan);
heap_close(pg_aoseg_rel, AccessShareLock);
return splits;
}
/*
* AOFetchSegFileInfo
*
* Given the segment file numbers, fetch other
* information associated those files.
*/
void
AOFetchSegFileInfo(AppendOnlyEntry *aoEntry, List *segfileinfos, Snapshot appendOnlyMetaDataSnapshot)
{
Relation pg_aoseg_rel;
TupleDesc pg_aoseg_dsc;
HeapTuple tuple;
SysScanDesc aoscan;
/*
* Since this function is called for insert operation,
* here we use RowExclusiveLock.
*/
pg_aoseg_rel = heap_open(aoEntry->segrelid, RowExclusiveLock);
pg_aoseg_dsc = RelationGetDescr(pg_aoseg_rel);
aoscan = systable_beginscan(pg_aoseg_rel, InvalidOid, FALSE,
appendOnlyMetaDataSnapshot, 0, NULL);
while (HeapTupleIsValid(tuple = systable_getnext(aoscan)))
{
int segno = DatumGetInt32(fastgetattr(tuple, Anum_pg_aoseg_segno, pg_aoseg_dsc, NULL));
ListCell *lc;
foreach (lc, segfileinfos)
{
ResultRelSegFileInfo *segfileinfo = (ResultRelSegFileInfo *)lfirst(lc);
Assert(segfileinfo != NULL);
if (segfileinfo->segno == segno)
{
segfileinfo->numfiles = 1;
segfileinfo->tupcount = (int64)DatumGetFloat8(fastgetattr(tuple, Anum_pg_aoseg_tupcount, pg_aoseg_dsc, NULL));
segfileinfo->varblock = (int64)DatumGetFloat8(fastgetattr(tuple, Anum_pg_aoseg_varblockcount, pg_aoseg_dsc, NULL));
segfileinfo->eof = (int64 *)palloc(sizeof(int64));
segfileinfo->eof[0] = (int64)DatumGetFloat8(fastgetattr(tuple, Anum_pg_aoseg_eof, pg_aoseg_dsc, NULL));
segfileinfo->uncompressed_eof = (int64 *)palloc(sizeof(int64));
segfileinfo->uncompressed_eof[0] = (int64)DatumGetFloat8(fastgetattr(tuple, Anum_pg_aoseg_eofuncompressed, pg_aoseg_dsc, NULL));
break;
}
}
}
systable_endscan(aoscan);
heap_close(pg_aoseg_rel, RowExclusiveLock);
return;
}