blob: 3fd39dfb167660296d9ad58861b175f528d5a9eb [file] [log] [blame]
/*-------------------------------------------------------------------------
*
* instrument.c
* functions for instrumentation of plan execution
*
*
* Portions Copyright (c) 2006-2009, Greenplum inc
* Portions Copyright (c) 2012-Present VMware, Inc. or its affiliates.
* Copyright (c) 2001-2023, PostgreSQL Global Development Group
*
* IDENTIFICATION
* src/backend/executor/instrument.c
*
*-------------------------------------------------------------------------
*/
#include "postgres.h"
#include <unistd.h>
#include "cdb/cdbvars.h"
#include "storage/spin.h"
#include "executor/instrument.h"
#include "utils/memutils.h"
#include "utils/timestamp.h"
#include "miscadmin.h"
#include "storage/shmem.h"
#include "cdb/cdbdtxcontextinfo.h"
#include "cdb/cdbtm.h"
BufferUsage pgBufferUsage;
static BufferUsage save_pgBufferUsage;
WalUsage pgWalUsage;
static WalUsage save_pgWalUsage;
static void BufferUsageAdd(BufferUsage *dst, const BufferUsage *add);
static void WalUsageAdd(WalUsage *dst, WalUsage *add);
/* GPDB specific */
static bool shouldPickInstrInShmem(NodeTag tag);
static Instrumentation *pickInstrFromShmem(const Plan *plan, int instrument_options);
static void instrShmemRecycleCallback(ResourceReleasePhase phase, bool isCommit,
bool isTopLevel, void *arg);
InstrumentationHeader *InstrumentGlobal = NULL;
static int scanNodeCounter = 0;
static int shmemNumSlots = -1;
static bool instrumentResownerCallbackRegistered = false;
static InstrumentationResownerSet *slotsOccupied = NULL;
static void InstrStopNodeAsync(Instrumentation *instr, uint64 nTuples);
static void InstrStopNodeSync(Instrumentation *instr, uint64 nTuples);
/* Allocate new instrumentation structure(s) */
Instrumentation *
InstrAlloc(int n, int instrument_options, bool async_mode)
{
Instrumentation *instr;
/* initialize all fields to zeroes, then modify as needed */
instr = palloc0(n * sizeof(Instrumentation));
if (instrument_options & (INSTRUMENT_BUFFERS | INSTRUMENT_TIMER | INSTRUMENT_WAL | INSTRUMENT_CDB))
{
bool need_buffers = (instrument_options & INSTRUMENT_BUFFERS) != 0;
bool need_wal = (instrument_options & INSTRUMENT_WAL) != 0;
bool need_timer = (instrument_options & INSTRUMENT_TIMER) != 0;
bool need_cdb = (instrument_options & INSTRUMENT_CDB) != 0;
int i;
for (i = 0; i < n; i++)
{
instr[i].need_bufusage = need_buffers;
instr[i].need_walusage = need_wal;
instr[i].need_timer = need_timer;
instr[i].need_cdb = need_cdb;
instr[i].async_mode = async_mode;
}
}
return instr;
}
/* Initialize a pre-allocated instrumentation structure. */
void
InstrInit(Instrumentation *instr, int instrument_options)
{
memset(instr, 0, sizeof(Instrumentation));
instr->need_bufusage = (instrument_options & INSTRUMENT_BUFFERS) != 0;
instr->need_walusage = (instrument_options & INSTRUMENT_WAL) != 0;
instr->need_timer = (instrument_options & INSTRUMENT_TIMER) != 0;
}
/* Entry to a plan node */
void
InstrStartNode(Instrumentation *instr)
{
if (instr->need_timer &&
!INSTR_TIME_SET_CURRENT_LAZY(instr->starttime))
elog(ERROR, "InstrStartNode called twice in a row");
/* save buffer usage totals at node entry, if needed */
if (instr->need_bufusage)
instr->bufusage_start = pgBufferUsage;
if (instr->need_walusage)
instr->walusage_start = pgWalUsage;
}
/* Exit from a plan node */
static void
InstrStopNodeSync(Instrumentation *instr, uint64 nTuples)
{
instr_time endtime;
instr_time starttime;
starttime = instr->starttime;
/* count the returned tuples */
instr->tuplecount += nTuples;
/* let's update the time only if the timer was requested */
if (instr->need_timer)
{
if (INSTR_TIME_IS_ZERO(instr->starttime))
elog(ERROR, "InstrStopNode called without start");
INSTR_TIME_SET_CURRENT_COARSE(endtime);
INSTR_TIME_ACCUM_DIFF(instr->counter, endtime, instr->starttime);
INSTR_TIME_SET_ZERO(instr->starttime);
}
/* Add delta of buffer usage since entry to node's totals */
if (instr->need_bufusage)
BufferUsageAccumDiff(&instr->bufusage,
&pgBufferUsage, &instr->bufusage_start);
if (instr->need_walusage)
WalUsageAccumDiff(&instr->walusage,
&pgWalUsage, &instr->walusage_start);
/* Is this the first tuple of this cycle? */
if (!instr->running)
{
instr->running = true;
instr->firsttuple = INSTR_TIME_GET_DOUBLE(instr->counter);
/* CDB: save this start time as the first start */
instr->firststart = starttime;
}
}
static void
InstrStopNodeAsync(Instrumentation *instr, uint64 nTuples)
{
uint64 save_tuplecount = instr->tuplecount;
InstrStopNodeSync(instr, nTuples);
if (instr->running)
{
/*
* In async mode, if the plan node hadn't emitted any tuples before,
* this might be the first tuple
*/
if (instr->async_mode && save_tuplecount < 1)
instr->firsttuple = INSTR_TIME_GET_DOUBLE(instr->counter);
}
}
void
InstrStopNode(Instrumentation *instr, uint64 nTuples)
{
if (unlikely(instr->async_mode))
InstrStopNodeAsync(instr, nTuples);
else
InstrStopNodeSync(instr, nTuples);
}
/* Update tuple count */
void
InstrUpdateTupleCount(Instrumentation *instr, uint64 nTuples)
{
/* count the returned tuples */
instr->tuplecount += nTuples;
}
/* Finish a run cycle for a plan node */
void
InstrEndLoop(Instrumentation *instr)
{
double totaltime;
/* Skip if nothing has happened, or already shut down */
if (!instr->running)
return;
if (!INSTR_TIME_IS_ZERO(instr->starttime))
elog(ERROR, "InstrEndLoop called on running node");
/* Accumulate per-cycle statistics into totals */
totaltime = INSTR_TIME_GET_DOUBLE(instr->counter);
/* CDB: Report startup time from only the first cycle. */
if (instr->nloops == 0)
instr->startup = instr->firsttuple;
instr->total += totaltime;
instr->ntuples += instr->tuplecount;
instr->nloops += 1;
/* Reset for next cycle (if any) */
instr->running = false;
INSTR_TIME_SET_ZERO(instr->starttime);
INSTR_TIME_SET_ZERO(instr->counter);
instr->firsttuple = 0;
instr->tuplecount = 0;
}
/* aggregate instrumentation information */
void
InstrAggNode(Instrumentation *dst, Instrumentation *add)
{
if (!dst->running && add->running)
{
dst->running = true;
dst->firsttuple = add->firsttuple;
}
else if (dst->running && add->running && dst->firsttuple > add->firsttuple)
dst->firsttuple = add->firsttuple;
INSTR_TIME_ADD(dst->counter, add->counter);
dst->tuplecount += add->tuplecount;
dst->startup += add->startup;
dst->total += add->total;
dst->ntuples += add->ntuples;
dst->ntuples2 += add->ntuples2;
dst->nloops += add->nloops;
dst->nfiltered1 += add->nfiltered1;
dst->nfiltered2 += add->nfiltered2;
/* Add delta of buffer usage since entry to node's totals */
if (dst->need_bufusage)
BufferUsageAdd(&dst->bufusage, &add->bufusage);
if (dst->need_walusage)
WalUsageAdd(&dst->walusage, &add->walusage);
}
/* note current values during parallel executor startup */
void
InstrStartParallelQuery(void)
{
save_pgBufferUsage = pgBufferUsage;
save_pgWalUsage = pgWalUsage;
}
/* report usage after parallel executor shutdown */
void
InstrEndParallelQuery(BufferUsage *bufusage, WalUsage *walusage)
{
memset(bufusage, 0, sizeof(BufferUsage));
BufferUsageAccumDiff(bufusage, &pgBufferUsage, &save_pgBufferUsage);
memset(walusage, 0, sizeof(WalUsage));
WalUsageAccumDiff(walusage, &pgWalUsage, &save_pgWalUsage);
}
/* accumulate work done by workers in leader's stats */
void
InstrAccumParallelQuery(BufferUsage *bufusage, WalUsage *walusage)
{
BufferUsageAdd(&pgBufferUsage, bufusage);
WalUsageAdd(&pgWalUsage, walusage);
}
/* dst += add */
static void
BufferUsageAdd(BufferUsage *dst, const BufferUsage *add)
{
dst->shared_blks_hit += add->shared_blks_hit;
dst->shared_blks_read += add->shared_blks_read;
dst->shared_blks_dirtied += add->shared_blks_dirtied;
dst->shared_blks_written += add->shared_blks_written;
dst->local_blks_hit += add->local_blks_hit;
dst->local_blks_read += add->local_blks_read;
dst->local_blks_dirtied += add->local_blks_dirtied;
dst->local_blks_written += add->local_blks_written;
dst->temp_blks_read += add->temp_blks_read;
dst->temp_blks_written += add->temp_blks_written;
INSTR_TIME_ADD(dst->blk_read_time, add->blk_read_time);
INSTR_TIME_ADD(dst->blk_write_time, add->blk_write_time);
INSTR_TIME_ADD(dst->temp_blk_read_time, add->temp_blk_read_time);
INSTR_TIME_ADD(dst->temp_blk_write_time, add->temp_blk_write_time);
}
/* dst += add - sub */
void
BufferUsageAccumDiff(BufferUsage *dst,
const BufferUsage *add,
const BufferUsage *sub)
{
dst->shared_blks_hit += add->shared_blks_hit - sub->shared_blks_hit;
dst->shared_blks_read += add->shared_blks_read - sub->shared_blks_read;
dst->shared_blks_dirtied += add->shared_blks_dirtied - sub->shared_blks_dirtied;
dst->shared_blks_written += add->shared_blks_written - sub->shared_blks_written;
dst->local_blks_hit += add->local_blks_hit - sub->local_blks_hit;
dst->local_blks_read += add->local_blks_read - sub->local_blks_read;
dst->local_blks_dirtied += add->local_blks_dirtied - sub->local_blks_dirtied;
dst->local_blks_written += add->local_blks_written - sub->local_blks_written;
dst->temp_blks_read += add->temp_blks_read - sub->temp_blks_read;
dst->temp_blks_written += add->temp_blks_written - sub->temp_blks_written;
INSTR_TIME_ACCUM_DIFF(dst->blk_read_time,
add->blk_read_time, sub->blk_read_time);
INSTR_TIME_ACCUM_DIFF(dst->blk_write_time,
add->blk_write_time, sub->blk_write_time);
INSTR_TIME_ACCUM_DIFF(dst->temp_blk_read_time,
add->temp_blk_read_time, sub->temp_blk_read_time);
INSTR_TIME_ACCUM_DIFF(dst->temp_blk_write_time,
add->temp_blk_write_time, sub->temp_blk_write_time);
}
/* Calculate number slots from gp_instrument_shmem_size */
Size
InstrShmemNumSlots(void)
{
if (shmemNumSlots < 0) {
shmemNumSlots = (int)(gp_instrument_shmem_size * 1024 - sizeof(InstrumentationHeader)) / sizeof(InstrumentationSlot);
shmemNumSlots = (shmemNumSlots < 0) ? 0 : shmemNumSlots;
}
return shmemNumSlots;
}
/* Allocate a header and an array of Instrumentation slots */
Size
InstrShmemSize(void)
{
Size size = 0;
Size number_slots;
/* If start in utility mode, disallow Instrumentation on Shmem */
if (IS_UTILITY_BUT_NOT_SINGLENODE())
return size;
/* If GUCs not enabled, bypass Instrumentation on Shmem */
if (!gp_enable_query_metrics || gp_instrument_shmem_size <= 0)
return size;
number_slots = InstrShmemNumSlots();
if (number_slots <= 0)
return size;
size = add_size(size, sizeof(InstrumentationHeader));
size = add_size(size, mul_size(number_slots, sizeof(InstrumentationSlot)));
return size;
}
/* Initialize Shmem space to construct a free list of Instrumentation */
void
InstrShmemInit(void)
{
Size size, number_slots;
InstrumentationSlot *slot;
InstrumentationHeader *header;
int i;
number_slots = InstrShmemNumSlots();
size = InstrShmemSize();
if (size <= 0)
return;
/* Allocate space from Shmem */
header = (InstrumentationHeader *) ShmemAlloc(size);
if (!header)
ereport(FATAL, (errcode(ERRCODE_OUT_OF_MEMORY), errmsg("out of shared memory")));
/* Initialize header and all slots to zeroes, then modify as needed */
memset(header, PATTERN, size);
/* pointer to the first Instrumentation slot */
slot = (InstrumentationSlot *) (header + 1);
/* header points to the first slot */
header->head = slot;
header->free = number_slots;
SpinLockInit(&header->lock);
/* Each slot points to next one to construct the free list */
for (i = 0; i < number_slots - 1; i++)
GetInstrumentNext(&slot[i]) = &slot[i + 1];
GetInstrumentNext(&slot[i]) = NULL;
/* Finished init the free list */
InstrumentGlobal = header;
if (NULL != InstrumentGlobal && !instrumentResownerCallbackRegistered)
{
/*
* Register a callback function in ResourceOwner to recycle Instr in
* shmem
*/
RegisterResourceReleaseCallback(instrShmemRecycleCallback, NULL);
instrumentResownerCallbackRegistered = true;
}
}
/*
* This is GPDB replacement of InstrAlloc for ExecInitNode to get an
* Instrumentation struct
*
* Use shmem if gp_enable_query_metrics is on and there is free slot.
* Otherwise use local memory.
*/
Instrumentation *
GpInstrAlloc(const Plan *node, int instrument_options, bool async_mode)
{
Instrumentation *instr = NULL;
if (shouldPickInstrInShmem(nodeTag(node)))
instr = pickInstrFromShmem(node, instrument_options);
if (instr == NULL)
instr = InstrAlloc(1, instrument_options, async_mode);
return instr;
}
static bool
shouldPickInstrInShmem(NodeTag tag)
{
/* For utility mode, don't alloc in shmem */
if (IS_UTILITY_BUT_NOT_SINGLENODE())
return false;
if (!gp_enable_query_metrics || NULL == InstrumentGlobal)
return false;
switch (tag)
{
case T_SeqScan:
/*
* If table has many partitions, Postgres planner will generate a
* plan with many SCAN nodes under a APPEND node. If the number of
* partitions are too many, this plan will occupy too many slots.
* Here is a limitation on number of shmem slots used by scan
* nodes for each backend. Instruments exceeding the limitation
* are allocated local memory.
*/
if (scanNodeCounter >= MAX_SCAN_ON_SHMEM)
return false;
scanNodeCounter++;
break;
default:
break;
}
return true;
}
/*
* Pick an Instrumentation from free slots in Shmem.
* Return NULL when no more free slots in Shmem.
*
* Instrumentation returned by this function requires to be
* recycled back to the free slots list when the query is done.
* See instrShmemRecycleCallback for recycling behavior
*/
static Instrumentation *
pickInstrFromShmem(const Plan *plan, int instrument_options)
{
Instrumentation *instr = NULL;
InstrumentationSlot *slot = NULL;
InstrumentationResownerSet *item;
/* Lock to protect write to header */
SpinLockAcquire(&InstrumentGlobal->lock);
/* Pick the first free slot */
slot = InstrumentGlobal->head;
if (NULL != slot && SlotIsEmpty(slot))
{
/* Header points to the next free slot */
InstrumentGlobal->head = GetInstrumentNext(slot);
InstrumentGlobal->free--;
}
SpinLockRelease(&InstrumentGlobal->lock);
if (NULL != slot && SlotIsEmpty(slot))
{
memset(slot, 0x00, sizeof(InstrumentationSlot));
/* initialize the picked slot */
instr = &(slot->data);
slot->segid = (int16) GpIdentity.segindex;
slot->pid = MyProcPid;
gp_gettmid(&(slot->tmid));
slot->ssid = gp_session_id;
slot->ccnt = gp_command_count;
slot->nid = (int16) plan->plan_node_id;
MemoryContext contextSave = MemoryContextSwitchTo(TopMemoryContext);
item = (InstrumentationResownerSet *) palloc0(sizeof(InstrumentationResownerSet));
item->owner = CurrentResourceOwner;
item->slot = slot;
item->next = slotsOccupied;
slotsOccupied = item;
MemoryContextSwitchTo(contextSave);
}
if (NULL != instr && instrument_options & (INSTRUMENT_TIMER | INSTRUMENT_CDB))
{
instr->need_timer = (instrument_options & INSTRUMENT_TIMER) != 0;
instr->need_cdb = (instrument_options & INSTRUMENT_CDB) != 0;
}
return instr;
}
/*
* Recycle instrumentation in shmem
*/
static void
instrShmemRecycleCallback(ResourceReleasePhase phase, bool isCommit, bool isTopLevel, void *arg)
{
InstrumentationResownerSet *next;
InstrumentationResownerSet *curr;
InstrumentationSlot *slot;
if (NULL == InstrumentGlobal || NULL == slotsOccupied || phase != RESOURCE_RELEASE_AFTER_LOCKS)
return;
/* Reset scanNodeCounter */
scanNodeCounter = 0;
next = slotsOccupied;
slotsOccupied = NULL;
SpinLockAcquire(&InstrumentGlobal->lock);
while (next)
{
curr = next;
next = curr->next;
if (curr->owner != CurrentResourceOwner)
{
curr->next = slotsOccupied;
slotsOccupied = curr;
continue;
}
slot = curr->slot;
/* Recycle Instrumentation slot back to the free list */
memset(slot, PATTERN, sizeof(InstrumentationSlot));
GetInstrumentNext(slot) = InstrumentGlobal->head;
InstrumentGlobal->head = slot;
InstrumentGlobal->free++;
pfree(curr);
}
SpinLockRelease(&InstrumentGlobal->lock);
}
/*
* Cast PgStartTime from TimestampTz to int32. Separated from gp_gettmid() to avoid elog() in
* gp_gettmid() to cause panic when running unit tests.
* Return -1 for invalid PgStartTime or overflow values.
*/
static int32 gp_gettmid_helper()
{
pg_time_t time;
if (PgStartTime < 0)
return -1;
time = timestamptz_to_time_t(PgStartTime);
if (time > INT32_MAX)
return -1;
return (int32)time;
}
/*
* Wrapper for gp_gettmid_helper()
*/
void
gp_gettmid(int32* tmid)
{
int32 time = gp_gettmid_helper();
if (time == -1)
elog(PANIC, "time_t converted from PgStartTime is too large");
*tmid = time;
}
/* helper functions for WAL usage accumulation */
static void
WalUsageAdd(WalUsage *dst, WalUsage *add)
{
dst->wal_bytes += add->wal_bytes;
dst->wal_records += add->wal_records;
dst->wal_fpi += add->wal_fpi;
}
void
WalUsageAccumDiff(WalUsage *dst, const WalUsage *add, const WalUsage *sub)
{
dst->wal_bytes += add->wal_bytes - sub->wal_bytes;
dst->wal_records += add->wal_records - sub->wal_records;
dst->wal_fpi += add->wal_fpi - sub->wal_fpi;
}