| /*------------------------------------------------------------------------- |
| * |
| * instrument.c |
| * functions for instrumentation of plan execution |
| * |
| * |
| * Portions Copyright (c) 2006-2009, Greenplum inc |
| * Portions Copyright (c) 2012-Present VMware, Inc. or its affiliates. |
| * Copyright (c) 2001-2023, PostgreSQL Global Development Group |
| * |
| * IDENTIFICATION |
| * src/backend/executor/instrument.c |
| * |
| *------------------------------------------------------------------------- |
| */ |
| #include "postgres.h" |
| |
| #include <unistd.h> |
| |
| #include "cdb/cdbvars.h" |
| #include "storage/spin.h" |
| #include "executor/instrument.h" |
| #include "utils/memutils.h" |
| #include "utils/timestamp.h" |
| #include "miscadmin.h" |
| #include "storage/shmem.h" |
| #include "cdb/cdbdtxcontextinfo.h" |
| #include "cdb/cdbtm.h" |
| |
| BufferUsage pgBufferUsage; |
| static BufferUsage save_pgBufferUsage; |
| WalUsage pgWalUsage; |
| static WalUsage save_pgWalUsage; |
| |
| static void BufferUsageAdd(BufferUsage *dst, const BufferUsage *add); |
| static void WalUsageAdd(WalUsage *dst, WalUsage *add); |
| |
| /* GPDB specific */ |
| static bool shouldPickInstrInShmem(NodeTag tag); |
| static Instrumentation *pickInstrFromShmem(const Plan *plan, int instrument_options); |
| static void instrShmemRecycleCallback(ResourceReleasePhase phase, bool isCommit, |
| bool isTopLevel, void *arg); |
| |
| InstrumentationHeader *InstrumentGlobal = NULL; |
| static int scanNodeCounter = 0; |
| static int shmemNumSlots = -1; |
| static bool instrumentResownerCallbackRegistered = false; |
| static InstrumentationResownerSet *slotsOccupied = NULL; |
| static void InstrStopNodeAsync(Instrumentation *instr, uint64 nTuples); |
| static void InstrStopNodeSync(Instrumentation *instr, uint64 nTuples); |
| |
| |
| /* Allocate new instrumentation structure(s) */ |
| Instrumentation * |
| InstrAlloc(int n, int instrument_options, bool async_mode) |
| { |
| Instrumentation *instr; |
| |
| /* initialize all fields to zeroes, then modify as needed */ |
| instr = palloc0(n * sizeof(Instrumentation)); |
| if (instrument_options & (INSTRUMENT_BUFFERS | INSTRUMENT_TIMER | INSTRUMENT_WAL | INSTRUMENT_CDB)) |
| { |
| bool need_buffers = (instrument_options & INSTRUMENT_BUFFERS) != 0; |
| bool need_wal = (instrument_options & INSTRUMENT_WAL) != 0; |
| bool need_timer = (instrument_options & INSTRUMENT_TIMER) != 0; |
| bool need_cdb = (instrument_options & INSTRUMENT_CDB) != 0; |
| int i; |
| |
| for (i = 0; i < n; i++) |
| { |
| instr[i].need_bufusage = need_buffers; |
| instr[i].need_walusage = need_wal; |
| instr[i].need_timer = need_timer; |
| instr[i].need_cdb = need_cdb; |
| instr[i].async_mode = async_mode; |
| } |
| } |
| |
| return instr; |
| } |
| |
| /* Initialize a pre-allocated instrumentation structure. */ |
| void |
| InstrInit(Instrumentation *instr, int instrument_options) |
| { |
| memset(instr, 0, sizeof(Instrumentation)); |
| instr->need_bufusage = (instrument_options & INSTRUMENT_BUFFERS) != 0; |
| instr->need_walusage = (instrument_options & INSTRUMENT_WAL) != 0; |
| instr->need_timer = (instrument_options & INSTRUMENT_TIMER) != 0; |
| } |
| |
| /* Entry to a plan node */ |
| void |
| InstrStartNode(Instrumentation *instr) |
| { |
| if (instr->need_timer && |
| !INSTR_TIME_SET_CURRENT_LAZY(instr->starttime)) |
| elog(ERROR, "InstrStartNode called twice in a row"); |
| |
| /* save buffer usage totals at node entry, if needed */ |
| if (instr->need_bufusage) |
| instr->bufusage_start = pgBufferUsage; |
| |
| if (instr->need_walusage) |
| instr->walusage_start = pgWalUsage; |
| } |
| |
| /* Exit from a plan node */ |
| static void |
| InstrStopNodeSync(Instrumentation *instr, uint64 nTuples) |
| { |
| instr_time endtime; |
| instr_time starttime; |
| |
| starttime = instr->starttime; |
| |
| /* count the returned tuples */ |
| instr->tuplecount += nTuples; |
| |
| /* let's update the time only if the timer was requested */ |
| if (instr->need_timer) |
| { |
| if (INSTR_TIME_IS_ZERO(instr->starttime)) |
| elog(ERROR, "InstrStopNode called without start"); |
| |
| INSTR_TIME_SET_CURRENT_COARSE(endtime); |
| INSTR_TIME_ACCUM_DIFF(instr->counter, endtime, instr->starttime); |
| |
| INSTR_TIME_SET_ZERO(instr->starttime); |
| } |
| |
| /* Add delta of buffer usage since entry to node's totals */ |
| if (instr->need_bufusage) |
| BufferUsageAccumDiff(&instr->bufusage, |
| &pgBufferUsage, &instr->bufusage_start); |
| |
| if (instr->need_walusage) |
| WalUsageAccumDiff(&instr->walusage, |
| &pgWalUsage, &instr->walusage_start); |
| |
| /* Is this the first tuple of this cycle? */ |
| if (!instr->running) |
| { |
| instr->running = true; |
| instr->firsttuple = INSTR_TIME_GET_DOUBLE(instr->counter); |
| /* CDB: save this start time as the first start */ |
| instr->firststart = starttime; |
| } |
| } |
| |
| static void |
| InstrStopNodeAsync(Instrumentation *instr, uint64 nTuples) |
| { |
| uint64 save_tuplecount = instr->tuplecount; |
| InstrStopNodeSync(instr, nTuples); |
| if (instr->running) |
| { |
| /* |
| * In async mode, if the plan node hadn't emitted any tuples before, |
| * this might be the first tuple |
| */ |
| if (instr->async_mode && save_tuplecount < 1) |
| instr->firsttuple = INSTR_TIME_GET_DOUBLE(instr->counter); |
| } |
| } |
| |
| void |
| InstrStopNode(Instrumentation *instr, uint64 nTuples) |
| { |
| if (unlikely(instr->async_mode)) |
| InstrStopNodeAsync(instr, nTuples); |
| else |
| InstrStopNodeSync(instr, nTuples); |
| } |
| |
| /* Update tuple count */ |
| void |
| InstrUpdateTupleCount(Instrumentation *instr, uint64 nTuples) |
| { |
| /* count the returned tuples */ |
| instr->tuplecount += nTuples; |
| } |
| |
| /* Finish a run cycle for a plan node */ |
| void |
| InstrEndLoop(Instrumentation *instr) |
| { |
| double totaltime; |
| |
| /* Skip if nothing has happened, or already shut down */ |
| if (!instr->running) |
| return; |
| |
| if (!INSTR_TIME_IS_ZERO(instr->starttime)) |
| elog(ERROR, "InstrEndLoop called on running node"); |
| |
| /* Accumulate per-cycle statistics into totals */ |
| totaltime = INSTR_TIME_GET_DOUBLE(instr->counter); |
| |
| /* CDB: Report startup time from only the first cycle. */ |
| if (instr->nloops == 0) |
| instr->startup = instr->firsttuple; |
| |
| instr->total += totaltime; |
| instr->ntuples += instr->tuplecount; |
| instr->nloops += 1; |
| |
| /* Reset for next cycle (if any) */ |
| instr->running = false; |
| INSTR_TIME_SET_ZERO(instr->starttime); |
| INSTR_TIME_SET_ZERO(instr->counter); |
| instr->firsttuple = 0; |
| instr->tuplecount = 0; |
| } |
| |
| /* aggregate instrumentation information */ |
| void |
| InstrAggNode(Instrumentation *dst, Instrumentation *add) |
| { |
| if (!dst->running && add->running) |
| { |
| dst->running = true; |
| dst->firsttuple = add->firsttuple; |
| } |
| else if (dst->running && add->running && dst->firsttuple > add->firsttuple) |
| dst->firsttuple = add->firsttuple; |
| |
| INSTR_TIME_ADD(dst->counter, add->counter); |
| |
| dst->tuplecount += add->tuplecount; |
| dst->startup += add->startup; |
| dst->total += add->total; |
| dst->ntuples += add->ntuples; |
| dst->ntuples2 += add->ntuples2; |
| dst->nloops += add->nloops; |
| dst->nfiltered1 += add->nfiltered1; |
| dst->nfiltered2 += add->nfiltered2; |
| |
| /* Add delta of buffer usage since entry to node's totals */ |
| if (dst->need_bufusage) |
| BufferUsageAdd(&dst->bufusage, &add->bufusage); |
| |
| if (dst->need_walusage) |
| WalUsageAdd(&dst->walusage, &add->walusage); |
| } |
| |
| /* note current values during parallel executor startup */ |
| void |
| InstrStartParallelQuery(void) |
| { |
| save_pgBufferUsage = pgBufferUsage; |
| save_pgWalUsage = pgWalUsage; |
| } |
| |
| /* report usage after parallel executor shutdown */ |
| void |
| InstrEndParallelQuery(BufferUsage *bufusage, WalUsage *walusage) |
| { |
| memset(bufusage, 0, sizeof(BufferUsage)); |
| BufferUsageAccumDiff(bufusage, &pgBufferUsage, &save_pgBufferUsage); |
| memset(walusage, 0, sizeof(WalUsage)); |
| WalUsageAccumDiff(walusage, &pgWalUsage, &save_pgWalUsage); |
| } |
| |
| /* accumulate work done by workers in leader's stats */ |
| void |
| InstrAccumParallelQuery(BufferUsage *bufusage, WalUsage *walusage) |
| { |
| BufferUsageAdd(&pgBufferUsage, bufusage); |
| WalUsageAdd(&pgWalUsage, walusage); |
| } |
| |
| /* dst += add */ |
| static void |
| BufferUsageAdd(BufferUsage *dst, const BufferUsage *add) |
| { |
| dst->shared_blks_hit += add->shared_blks_hit; |
| dst->shared_blks_read += add->shared_blks_read; |
| dst->shared_blks_dirtied += add->shared_blks_dirtied; |
| dst->shared_blks_written += add->shared_blks_written; |
| dst->local_blks_hit += add->local_blks_hit; |
| dst->local_blks_read += add->local_blks_read; |
| dst->local_blks_dirtied += add->local_blks_dirtied; |
| dst->local_blks_written += add->local_blks_written; |
| dst->temp_blks_read += add->temp_blks_read; |
| dst->temp_blks_written += add->temp_blks_written; |
| INSTR_TIME_ADD(dst->blk_read_time, add->blk_read_time); |
| INSTR_TIME_ADD(dst->blk_write_time, add->blk_write_time); |
| INSTR_TIME_ADD(dst->temp_blk_read_time, add->temp_blk_read_time); |
| INSTR_TIME_ADD(dst->temp_blk_write_time, add->temp_blk_write_time); |
| } |
| |
| /* dst += add - sub */ |
| void |
| BufferUsageAccumDiff(BufferUsage *dst, |
| const BufferUsage *add, |
| const BufferUsage *sub) |
| { |
| dst->shared_blks_hit += add->shared_blks_hit - sub->shared_blks_hit; |
| dst->shared_blks_read += add->shared_blks_read - sub->shared_blks_read; |
| dst->shared_blks_dirtied += add->shared_blks_dirtied - sub->shared_blks_dirtied; |
| dst->shared_blks_written += add->shared_blks_written - sub->shared_blks_written; |
| dst->local_blks_hit += add->local_blks_hit - sub->local_blks_hit; |
| dst->local_blks_read += add->local_blks_read - sub->local_blks_read; |
| dst->local_blks_dirtied += add->local_blks_dirtied - sub->local_blks_dirtied; |
| dst->local_blks_written += add->local_blks_written - sub->local_blks_written; |
| dst->temp_blks_read += add->temp_blks_read - sub->temp_blks_read; |
| dst->temp_blks_written += add->temp_blks_written - sub->temp_blks_written; |
| INSTR_TIME_ACCUM_DIFF(dst->blk_read_time, |
| add->blk_read_time, sub->blk_read_time); |
| INSTR_TIME_ACCUM_DIFF(dst->blk_write_time, |
| add->blk_write_time, sub->blk_write_time); |
| INSTR_TIME_ACCUM_DIFF(dst->temp_blk_read_time, |
| add->temp_blk_read_time, sub->temp_blk_read_time); |
| INSTR_TIME_ACCUM_DIFF(dst->temp_blk_write_time, |
| add->temp_blk_write_time, sub->temp_blk_write_time); |
| } |
| |
| /* Calculate number slots from gp_instrument_shmem_size */ |
| Size |
| InstrShmemNumSlots(void) |
| { |
| if (shmemNumSlots < 0) { |
| shmemNumSlots = (int)(gp_instrument_shmem_size * 1024 - sizeof(InstrumentationHeader)) / sizeof(InstrumentationSlot); |
| shmemNumSlots = (shmemNumSlots < 0) ? 0 : shmemNumSlots; |
| } |
| return shmemNumSlots; |
| } |
| |
| /* Allocate a header and an array of Instrumentation slots */ |
| Size |
| InstrShmemSize(void) |
| { |
| Size size = 0; |
| Size number_slots; |
| |
| /* If start in utility mode, disallow Instrumentation on Shmem */ |
| if (IS_UTILITY_BUT_NOT_SINGLENODE()) |
| return size; |
| |
| /* If GUCs not enabled, bypass Instrumentation on Shmem */ |
| if (!gp_enable_query_metrics || gp_instrument_shmem_size <= 0) |
| return size; |
| |
| number_slots = InstrShmemNumSlots(); |
| |
| if (number_slots <= 0) |
| return size; |
| |
| size = add_size(size, sizeof(InstrumentationHeader)); |
| size = add_size(size, mul_size(number_slots, sizeof(InstrumentationSlot))); |
| |
| return size; |
| } |
| |
| /* Initialize Shmem space to construct a free list of Instrumentation */ |
| void |
| InstrShmemInit(void) |
| { |
| Size size, number_slots; |
| InstrumentationSlot *slot; |
| InstrumentationHeader *header; |
| int i; |
| |
| number_slots = InstrShmemNumSlots(); |
| size = InstrShmemSize(); |
| if (size <= 0) |
| return; |
| |
| /* Allocate space from Shmem */ |
| header = (InstrumentationHeader *) ShmemAlloc(size); |
| if (!header) |
| ereport(FATAL, (errcode(ERRCODE_OUT_OF_MEMORY), errmsg("out of shared memory"))); |
| |
| /* Initialize header and all slots to zeroes, then modify as needed */ |
| memset(header, PATTERN, size); |
| |
| /* pointer to the first Instrumentation slot */ |
| slot = (InstrumentationSlot *) (header + 1); |
| |
| /* header points to the first slot */ |
| header->head = slot; |
| header->free = number_slots; |
| SpinLockInit(&header->lock); |
| |
| /* Each slot points to next one to construct the free list */ |
| for (i = 0; i < number_slots - 1; i++) |
| GetInstrumentNext(&slot[i]) = &slot[i + 1]; |
| GetInstrumentNext(&slot[i]) = NULL; |
| |
| /* Finished init the free list */ |
| InstrumentGlobal = header; |
| |
| if (NULL != InstrumentGlobal && !instrumentResownerCallbackRegistered) |
| { |
| /* |
| * Register a callback function in ResourceOwner to recycle Instr in |
| * shmem |
| */ |
| RegisterResourceReleaseCallback(instrShmemRecycleCallback, NULL); |
| instrumentResownerCallbackRegistered = true; |
| } |
| } |
| |
| /* |
| * This is GPDB replacement of InstrAlloc for ExecInitNode to get an |
| * Instrumentation struct |
| * |
| * Use shmem if gp_enable_query_metrics is on and there is free slot. |
| * Otherwise use local memory. |
| */ |
| Instrumentation * |
| GpInstrAlloc(const Plan *node, int instrument_options, bool async_mode) |
| { |
| Instrumentation *instr = NULL; |
| |
| if (shouldPickInstrInShmem(nodeTag(node))) |
| instr = pickInstrFromShmem(node, instrument_options); |
| |
| if (instr == NULL) |
| instr = InstrAlloc(1, instrument_options, async_mode); |
| |
| return instr; |
| } |
| |
| static bool |
| shouldPickInstrInShmem(NodeTag tag) |
| { |
| /* For utility mode, don't alloc in shmem */ |
| if (IS_UTILITY_BUT_NOT_SINGLENODE()) |
| return false; |
| |
| if (!gp_enable_query_metrics || NULL == InstrumentGlobal) |
| return false; |
| |
| switch (tag) |
| { |
| case T_SeqScan: |
| |
| /* |
| * If table has many partitions, Postgres planner will generate a |
| * plan with many SCAN nodes under a APPEND node. If the number of |
| * partitions are too many, this plan will occupy too many slots. |
| * Here is a limitation on number of shmem slots used by scan |
| * nodes for each backend. Instruments exceeding the limitation |
| * are allocated local memory. |
| */ |
| if (scanNodeCounter >= MAX_SCAN_ON_SHMEM) |
| return false; |
| scanNodeCounter++; |
| break; |
| default: |
| break; |
| } |
| return true; |
| } |
| |
| /* |
| * Pick an Instrumentation from free slots in Shmem. |
| * Return NULL when no more free slots in Shmem. |
| * |
| * Instrumentation returned by this function requires to be |
| * recycled back to the free slots list when the query is done. |
| * See instrShmemRecycleCallback for recycling behavior |
| */ |
| static Instrumentation * |
| pickInstrFromShmem(const Plan *plan, int instrument_options) |
| { |
| Instrumentation *instr = NULL; |
| InstrumentationSlot *slot = NULL; |
| InstrumentationResownerSet *item; |
| |
| /* Lock to protect write to header */ |
| SpinLockAcquire(&InstrumentGlobal->lock); |
| |
| /* Pick the first free slot */ |
| slot = InstrumentGlobal->head; |
| if (NULL != slot && SlotIsEmpty(slot)) |
| { |
| /* Header points to the next free slot */ |
| InstrumentGlobal->head = GetInstrumentNext(slot); |
| InstrumentGlobal->free--; |
| } |
| |
| SpinLockRelease(&InstrumentGlobal->lock); |
| |
| if (NULL != slot && SlotIsEmpty(slot)) |
| { |
| memset(slot, 0x00, sizeof(InstrumentationSlot)); |
| /* initialize the picked slot */ |
| instr = &(slot->data); |
| slot->segid = (int16) GpIdentity.segindex; |
| slot->pid = MyProcPid; |
| gp_gettmid(&(slot->tmid)); |
| slot->ssid = gp_session_id; |
| slot->ccnt = gp_command_count; |
| slot->nid = (int16) plan->plan_node_id; |
| |
| MemoryContext contextSave = MemoryContextSwitchTo(TopMemoryContext); |
| |
| item = (InstrumentationResownerSet *) palloc0(sizeof(InstrumentationResownerSet)); |
| item->owner = CurrentResourceOwner; |
| item->slot = slot; |
| item->next = slotsOccupied; |
| slotsOccupied = item; |
| MemoryContextSwitchTo(contextSave); |
| } |
| |
| if (NULL != instr && instrument_options & (INSTRUMENT_TIMER | INSTRUMENT_CDB)) |
| { |
| instr->need_timer = (instrument_options & INSTRUMENT_TIMER) != 0; |
| instr->need_cdb = (instrument_options & INSTRUMENT_CDB) != 0; |
| } |
| |
| return instr; |
| } |
| |
| /* |
| * Recycle instrumentation in shmem |
| */ |
| static void |
| instrShmemRecycleCallback(ResourceReleasePhase phase, bool isCommit, bool isTopLevel, void *arg) |
| { |
| InstrumentationResownerSet *next; |
| InstrumentationResownerSet *curr; |
| InstrumentationSlot *slot; |
| |
| if (NULL == InstrumentGlobal || NULL == slotsOccupied || phase != RESOURCE_RELEASE_AFTER_LOCKS) |
| return; |
| |
| /* Reset scanNodeCounter */ |
| scanNodeCounter = 0; |
| |
| next = slotsOccupied; |
| slotsOccupied = NULL; |
| SpinLockAcquire(&InstrumentGlobal->lock); |
| while (next) |
| { |
| curr = next; |
| next = curr->next; |
| if (curr->owner != CurrentResourceOwner) |
| { |
| curr->next = slotsOccupied; |
| slotsOccupied = curr; |
| continue; |
| } |
| |
| slot = curr->slot; |
| |
| /* Recycle Instrumentation slot back to the free list */ |
| memset(slot, PATTERN, sizeof(InstrumentationSlot)); |
| |
| GetInstrumentNext(slot) = InstrumentGlobal->head; |
| InstrumentGlobal->head = slot; |
| InstrumentGlobal->free++; |
| |
| pfree(curr); |
| } |
| SpinLockRelease(&InstrumentGlobal->lock); |
| } |
| |
| /* |
| * Cast PgStartTime from TimestampTz to int32. Separated from gp_gettmid() to avoid elog() in |
| * gp_gettmid() to cause panic when running unit tests. |
| * Return -1 for invalid PgStartTime or overflow values. |
| */ |
| static int32 gp_gettmid_helper() |
| { |
| pg_time_t time; |
| if (PgStartTime < 0) |
| return -1; |
| time = timestamptz_to_time_t(PgStartTime); |
| if (time > INT32_MAX) |
| return -1; |
| return (int32)time; |
| } |
| |
| /* |
| * Wrapper for gp_gettmid_helper() |
| */ |
| void |
| gp_gettmid(int32* tmid) |
| { |
| int32 time = gp_gettmid_helper(); |
| if (time == -1) |
| elog(PANIC, "time_t converted from PgStartTime is too large"); |
| *tmid = time; |
| } |
| |
| /* helper functions for WAL usage accumulation */ |
| static void |
| WalUsageAdd(WalUsage *dst, WalUsage *add) |
| { |
| dst->wal_bytes += add->wal_bytes; |
| dst->wal_records += add->wal_records; |
| dst->wal_fpi += add->wal_fpi; |
| } |
| |
| void |
| WalUsageAccumDiff(WalUsage *dst, const WalUsage *add, const WalUsage *sub) |
| { |
| dst->wal_bytes += add->wal_bytes - sub->wal_bytes; |
| dst->wal_records += add->wal_records - sub->wal_records; |
| dst->wal_fpi += add->wal_fpi - sub->wal_fpi; |
| } |