| /*------------------------------------------------------------------------- |
| * |
| * xlogprefetcher.c |
| * Prefetching support for recovery. |
| * |
| * Portions Copyright (c) 2022-2023, PostgreSQL Global Development Group |
| * Portions Copyright (c) 1994, Regents of the University of California |
| * |
| * |
| * IDENTIFICATION |
| * src/backend/access/transam/xlogprefetcher.c |
| * |
| * This module provides a drop-in replacement for an XLogReader that tries to |
| * minimize I/O stalls by looking ahead in the WAL. If blocks that will be |
| * accessed in the near future are not already in the buffer pool, it initiates |
| * I/Os that might complete before the caller eventually needs the data. When |
| * referenced blocks are found in the buffer pool already, the buffer is |
| * recorded in the decoded record so that XLogReadBufferForRedo() can try to |
| * avoid a second buffer mapping table lookup. |
| * |
| * Currently, only the main fork is considered for prefetching. Currently, |
| * prefetching is only effective on systems where PrefetchBuffer() does |
| * something useful (mainly Linux). |
| * |
| *------------------------------------------------------------------------- |
| */ |
| |
| #include "postgres.h" |
| |
| #include "access/xlog.h" |
| #include "access/xlogprefetcher.h" |
| #include "access/xlogreader.h" |
| #include "access/xlogutils.h" |
| #include "catalog/pg_class.h" |
| #include "catalog/pg_control.h" |
| #include "catalog/storage_xlog.h" |
| #include "commands/dbcommands_xlog.h" |
| #include "utils/fmgrprotos.h" |
| #include "utils/timestamp.h" |
| #include "funcapi.h" |
| #include "pgstat.h" |
| #include "miscadmin.h" |
| #include "port/atomics.h" |
| #include "storage/bufmgr.h" |
| #include "storage/shmem.h" |
| #include "storage/smgr.h" |
| #include "utils/guc_hooks.h" |
| #include "utils/hsearch.h" |
| |
| /* |
| * Every time we process this much WAL, we'll update the values in |
| * pg_stat_recovery_prefetch. |
| */ |
| #define XLOGPREFETCHER_STATS_DISTANCE BLCKSZ |
| |
| /* |
| * To detect repeated access to the same block and skip useless extra system |
| * calls, we remember a small window of recently prefetched blocks. |
| */ |
| #define XLOGPREFETCHER_SEQ_WINDOW_SIZE 4 |
| |
| /* |
| * When maintenance_io_concurrency is not saturated, we're prepared to look |
| * ahead up to N times that number of block references. |
| */ |
| #define XLOGPREFETCHER_DISTANCE_MULTIPLIER 4 |
| |
| /* Define to log internal debugging messages. */ |
| /* #define XLOGPREFETCHER_DEBUG_LEVEL LOG */ |
| |
| /* GUCs */ |
| int recovery_prefetch = RECOVERY_PREFETCH_TRY; |
| |
| #ifdef USE_PREFETCH |
| #define RecoveryPrefetchEnabled() \ |
| (recovery_prefetch != RECOVERY_PREFETCH_OFF && \ |
| maintenance_io_concurrency > 0) |
| #else |
| #define RecoveryPrefetchEnabled() false |
| #endif |
| |
| static int XLogPrefetchReconfigureCount = 0; |
| |
| /* |
| * Enum used to report whether an IO should be started. |
| */ |
| typedef enum |
| { |
| LRQ_NEXT_NO_IO, |
| LRQ_NEXT_IO, |
| LRQ_NEXT_AGAIN |
| } LsnReadQueueNextStatus; |
| |
| /* |
| * Type of callback that can decide which block to prefetch next. For now |
| * there is only one. |
| */ |
| typedef LsnReadQueueNextStatus (*LsnReadQueueNextFun) (uintptr_t lrq_private, |
| XLogRecPtr *lsn); |
| |
| /* |
| * A simple circular queue of LSNs, using to control the number of |
| * (potentially) inflight IOs. This stands in for a later more general IO |
| * control mechanism, which is why it has the apparently unnecessary |
| * indirection through a function pointer. |
| */ |
| typedef struct LsnReadQueue |
| { |
| LsnReadQueueNextFun next; |
| uintptr_t lrq_private; |
| uint32 max_inflight; |
| uint32 inflight; |
| uint32 completed; |
| uint32 head; |
| uint32 tail; |
| uint32 size; |
| struct |
| { |
| bool io; |
| XLogRecPtr lsn; |
| } queue[FLEXIBLE_ARRAY_MEMBER]; |
| } LsnReadQueue; |
| |
| /* |
| * A prefetcher. This is a mechanism that wraps an XLogReader, prefetching |
| * blocks that will be soon be referenced, to try to avoid IO stalls. |
| */ |
| struct XLogPrefetcher |
| { |
| /* WAL reader and current reading state. */ |
| XLogReaderState *reader; |
| DecodedXLogRecord *record; |
| int next_block_id; |
| |
| /* When to publish stats. */ |
| XLogRecPtr next_stats_shm_lsn; |
| |
| /* Book-keeping to avoid accessing blocks that don't exist yet. */ |
| HTAB *filter_table; |
| dlist_head filter_queue; |
| |
| /* Book-keeping to avoid repeat prefetches. */ |
| RelFileLocator recent_rlocator[XLOGPREFETCHER_SEQ_WINDOW_SIZE]; |
| BlockNumber recent_block[XLOGPREFETCHER_SEQ_WINDOW_SIZE]; |
| int recent_idx; |
| |
| /* Book-keeping to disable prefetching temporarily. */ |
| XLogRecPtr no_readahead_until; |
| |
| /* IO depth manager. */ |
| LsnReadQueue *streaming_read; |
| |
| XLogRecPtr begin_ptr; |
| |
| int reconfigure_count; |
| }; |
| |
| /* |
| * A temporary filter used to track block ranges that haven't been created |
| * yet, whole relations that haven't been created yet, and whole relations |
| * that (we assume) have already been dropped, or will be created by bulk WAL |
| * operators. |
| */ |
| typedef struct XLogPrefetcherFilter |
| { |
| RelFileLocator rlocator; |
| XLogRecPtr filter_until_replayed; |
| BlockNumber filter_from_block; |
| dlist_node link; |
| } XLogPrefetcherFilter; |
| |
| /* |
| * Counters exposed in shared memory for pg_stat_recovery_prefetch. |
| */ |
| typedef struct XLogPrefetchStats |
| { |
| pg_atomic_uint64 reset_time; /* Time of last reset. */ |
| pg_atomic_uint64 prefetch; /* Prefetches initiated. */ |
| pg_atomic_uint64 hit; /* Blocks already in cache. */ |
| pg_atomic_uint64 skip_init; /* Zero-inited blocks skipped. */ |
| pg_atomic_uint64 skip_new; /* New/missing blocks filtered. */ |
| pg_atomic_uint64 skip_fpw; /* FPWs skipped. */ |
| pg_atomic_uint64 skip_rep; /* Repeat accesses skipped. */ |
| |
| /* Dynamic values */ |
| int wal_distance; /* Number of WAL bytes ahead. */ |
| int block_distance; /* Number of block references ahead. */ |
| int io_depth; /* Number of I/Os in progress. */ |
| } XLogPrefetchStats; |
| |
| static inline void XLogPrefetcherAddFilter(XLogPrefetcher *prefetcher, |
| RelFileLocator rlocator, |
| BlockNumber blockno, |
| XLogRecPtr lsn); |
| static inline bool XLogPrefetcherIsFiltered(XLogPrefetcher *prefetcher, |
| RelFileLocator rlocator, |
| BlockNumber blockno); |
| static inline void XLogPrefetcherCompleteFilters(XLogPrefetcher *prefetcher, |
| XLogRecPtr replaying_lsn); |
| static LsnReadQueueNextStatus XLogPrefetcherNextBlock(uintptr_t pgsr_private, |
| XLogRecPtr *lsn); |
| |
| static XLogPrefetchStats *SharedStats; |
| |
| static inline LsnReadQueue * |
| lrq_alloc(uint32 max_distance, |
| uint32 max_inflight, |
| uintptr_t lrq_private, |
| LsnReadQueueNextFun next) |
| { |
| LsnReadQueue *lrq; |
| uint32 size; |
| |
| Assert(max_distance >= max_inflight); |
| |
| size = max_distance + 1; /* full ring buffer has a gap */ |
| lrq = palloc(offsetof(LsnReadQueue, queue) + sizeof(lrq->queue[0]) * size); |
| lrq->lrq_private = lrq_private; |
| lrq->max_inflight = max_inflight; |
| lrq->size = size; |
| lrq->next = next; |
| lrq->head = 0; |
| lrq->tail = 0; |
| lrq->inflight = 0; |
| lrq->completed = 0; |
| |
| return lrq; |
| } |
| |
| static inline void |
| lrq_free(LsnReadQueue *lrq) |
| { |
| pfree(lrq); |
| } |
| |
| static inline uint32 |
| lrq_inflight(LsnReadQueue *lrq) |
| { |
| return lrq->inflight; |
| } |
| |
| static inline uint32 |
| lrq_completed(LsnReadQueue *lrq) |
| { |
| return lrq->completed; |
| } |
| |
| static inline void |
| lrq_prefetch(LsnReadQueue *lrq) |
| { |
| /* Try to start as many IOs as we can within our limits. */ |
| while (lrq->inflight < lrq->max_inflight && |
| lrq->inflight + lrq->completed < lrq->size - 1) |
| { |
| Assert(((lrq->head + 1) % lrq->size) != lrq->tail); |
| switch (lrq->next(lrq->lrq_private, &lrq->queue[lrq->head].lsn)) |
| { |
| case LRQ_NEXT_AGAIN: |
| return; |
| case LRQ_NEXT_IO: |
| lrq->queue[lrq->head].io = true; |
| lrq->inflight++; |
| break; |
| case LRQ_NEXT_NO_IO: |
| lrq->queue[lrq->head].io = false; |
| lrq->completed++; |
| break; |
| } |
| lrq->head++; |
| if (lrq->head == lrq->size) |
| lrq->head = 0; |
| } |
| } |
| |
| static inline void |
| lrq_complete_lsn(LsnReadQueue *lrq, XLogRecPtr lsn) |
| { |
| /* |
| * We know that LSNs before 'lsn' have been replayed, so we can now assume |
| * that any IOs that were started before then have finished. |
| */ |
| while (lrq->tail != lrq->head && |
| lrq->queue[lrq->tail].lsn < lsn) |
| { |
| if (lrq->queue[lrq->tail].io) |
| lrq->inflight--; |
| else |
| lrq->completed--; |
| lrq->tail++; |
| if (lrq->tail == lrq->size) |
| lrq->tail = 0; |
| } |
| if (RecoveryPrefetchEnabled()) |
| lrq_prefetch(lrq); |
| } |
| |
| size_t |
| XLogPrefetchShmemSize(void) |
| { |
| return sizeof(XLogPrefetchStats); |
| } |
| |
| /* |
| * Reset all counters to zero. |
| */ |
| void |
| XLogPrefetchResetStats(void) |
| { |
| pg_atomic_write_u64(&SharedStats->reset_time, GetCurrentTimestamp()); |
| pg_atomic_write_u64(&SharedStats->prefetch, 0); |
| pg_atomic_write_u64(&SharedStats->hit, 0); |
| pg_atomic_write_u64(&SharedStats->skip_init, 0); |
| pg_atomic_write_u64(&SharedStats->skip_new, 0); |
| pg_atomic_write_u64(&SharedStats->skip_fpw, 0); |
| pg_atomic_write_u64(&SharedStats->skip_rep, 0); |
| } |
| |
| void |
| XLogPrefetchShmemInit(void) |
| { |
| bool found; |
| |
| SharedStats = (XLogPrefetchStats *) |
| ShmemInitStruct("XLogPrefetchStats", |
| sizeof(XLogPrefetchStats), |
| &found); |
| |
| if (!found) |
| { |
| pg_atomic_init_u64(&SharedStats->reset_time, GetCurrentTimestamp()); |
| pg_atomic_init_u64(&SharedStats->prefetch, 0); |
| pg_atomic_init_u64(&SharedStats->hit, 0); |
| pg_atomic_init_u64(&SharedStats->skip_init, 0); |
| pg_atomic_init_u64(&SharedStats->skip_new, 0); |
| pg_atomic_init_u64(&SharedStats->skip_fpw, 0); |
| pg_atomic_init_u64(&SharedStats->skip_rep, 0); |
| } |
| } |
| |
| /* |
| * Called when any GUC is changed that affects prefetching. |
| */ |
| void |
| XLogPrefetchReconfigure(void) |
| { |
| XLogPrefetchReconfigureCount++; |
| } |
| |
| /* |
| * Increment a counter in shared memory. This is equivalent to *counter++ on a |
| * plain uint64 without any memory barrier or locking, except on platforms |
| * where readers can't read uint64 without possibly observing a torn value. |
| */ |
| static inline void |
| XLogPrefetchIncrement(pg_atomic_uint64 *counter) |
| { |
| Assert(AmStartupProcess() || !IsUnderPostmaster); |
| pg_atomic_write_u64(counter, pg_atomic_read_u64(counter) + 1); |
| } |
| |
| /* |
| * Create a prefetcher that is ready to begin prefetching blocks referenced by |
| * WAL records. |
| */ |
| XLogPrefetcher * |
| XLogPrefetcherAllocate(XLogReaderState *reader) |
| { |
| XLogPrefetcher *prefetcher; |
| static HASHCTL hash_table_ctl = { |
| .keysize = sizeof(RelFileLocator), |
| .entrysize = sizeof(XLogPrefetcherFilter) |
| }; |
| |
| prefetcher = palloc0(sizeof(XLogPrefetcher)); |
| |
| prefetcher->reader = reader; |
| prefetcher->filter_table = hash_create("XLogPrefetcherFilterTable", 1024, |
| &hash_table_ctl, |
| HASH_ELEM | HASH_BLOBS); |
| dlist_init(&prefetcher->filter_queue); |
| |
| SharedStats->wal_distance = 0; |
| SharedStats->block_distance = 0; |
| SharedStats->io_depth = 0; |
| |
| /* First usage will cause streaming_read to be allocated. */ |
| prefetcher->reconfigure_count = XLogPrefetchReconfigureCount - 1; |
| |
| return prefetcher; |
| } |
| |
| /* |
| * Destroy a prefetcher and release all resources. |
| */ |
| void |
| XLogPrefetcherFree(XLogPrefetcher *prefetcher) |
| { |
| lrq_free(prefetcher->streaming_read); |
| hash_destroy(prefetcher->filter_table); |
| pfree(prefetcher); |
| } |
| |
| /* |
| * Provide access to the reader. |
| */ |
| XLogReaderState * |
| XLogPrefetcherGetReader(XLogPrefetcher *prefetcher) |
| { |
| return prefetcher->reader; |
| } |
| |
| /* |
| * Update the statistics visible in the pg_stat_recovery_prefetch view. |
| */ |
| void |
| XLogPrefetcherComputeStats(XLogPrefetcher *prefetcher) |
| { |
| uint32 io_depth; |
| uint32 completed; |
| int64 wal_distance; |
| |
| |
| /* How far ahead of replay are we now? */ |
| if (prefetcher->reader->decode_queue_tail) |
| { |
| wal_distance = |
| prefetcher->reader->decode_queue_tail->lsn - |
| prefetcher->reader->decode_queue_head->lsn; |
| } |
| else |
| { |
| wal_distance = 0; |
| } |
| |
| /* How many IOs are currently in flight and completed? */ |
| io_depth = lrq_inflight(prefetcher->streaming_read); |
| completed = lrq_completed(prefetcher->streaming_read); |
| |
| /* Update the instantaneous stats visible in pg_stat_recovery_prefetch. */ |
| SharedStats->io_depth = io_depth; |
| SharedStats->block_distance = io_depth + completed; |
| SharedStats->wal_distance = wal_distance; |
| |
| prefetcher->next_stats_shm_lsn = |
| prefetcher->reader->ReadRecPtr + XLOGPREFETCHER_STATS_DISTANCE; |
| } |
| |
| /* |
| * A callback that examines the next block reference in the WAL, and possibly |
| * starts an IO so that a later read will be fast. |
| * |
| * Returns LRQ_NEXT_AGAIN if no more WAL data is available yet. |
| * |
| * Returns LRQ_NEXT_IO if the next block reference is for a main fork block |
| * that isn't in the buffer pool, and the kernel has been asked to start |
| * reading it to make a future read system call faster. An LSN is written to |
| * *lsn, and the I/O will be considered to have completed once that LSN is |
| * replayed. |
| * |
| * Returns LRQ_NEXT_NO_IO if we examined the next block reference and found |
| * that it was already in the buffer pool, or we decided for various reasons |
| * not to prefetch. |
| */ |
| static LsnReadQueueNextStatus |
| XLogPrefetcherNextBlock(uintptr_t pgsr_private, XLogRecPtr *lsn) |
| { |
| XLogPrefetcher *prefetcher = (XLogPrefetcher *) pgsr_private; |
| XLogReaderState *reader = prefetcher->reader; |
| XLogRecPtr replaying_lsn = reader->ReadRecPtr; |
| |
| /* |
| * We keep track of the record and block we're up to between calls with |
| * prefetcher->record and prefetcher->next_block_id. |
| */ |
| for (;;) |
| { |
| DecodedXLogRecord *record; |
| |
| /* Try to read a new future record, if we don't already have one. */ |
| if (prefetcher->record == NULL) |
| { |
| bool nonblocking; |
| |
| /* |
| * If there are already records or an error queued up that could |
| * be replayed, we don't want to block here. Otherwise, it's OK |
| * to block waiting for more data: presumably the caller has |
| * nothing else to do. |
| */ |
| nonblocking = XLogReaderHasQueuedRecordOrError(reader); |
| |
| /* Readahead is disabled until we replay past a certain point. */ |
| if (nonblocking && replaying_lsn <= prefetcher->no_readahead_until) |
| return LRQ_NEXT_AGAIN; |
| |
| record = XLogReadAhead(prefetcher->reader, nonblocking); |
| if (record == NULL) |
| { |
| /* |
| * We can't read any more, due to an error or lack of data in |
| * nonblocking mode. Don't try to read ahead again until |
| * we've replayed everything already decoded. |
| */ |
| if (nonblocking && prefetcher->reader->decode_queue_tail) |
| prefetcher->no_readahead_until = |
| prefetcher->reader->decode_queue_tail->lsn; |
| |
| return LRQ_NEXT_AGAIN; |
| } |
| |
| /* |
| * If prefetching is disabled, we don't need to analyze the record |
| * or issue any prefetches. We just need to cause one record to |
| * be decoded. |
| */ |
| if (!RecoveryPrefetchEnabled()) |
| { |
| *lsn = InvalidXLogRecPtr; |
| return LRQ_NEXT_NO_IO; |
| } |
| |
| /* We have a new record to process. */ |
| prefetcher->record = record; |
| prefetcher->next_block_id = 0; |
| } |
| else |
| { |
| /* Continue to process from last call, or last loop. */ |
| record = prefetcher->record; |
| } |
| |
| /* |
| * Check for operations that require us to filter out block ranges, or |
| * pause readahead completely. |
| */ |
| if (replaying_lsn < record->lsn) |
| { |
| uint8 rmid = record->header.xl_rmid; |
| uint8 record_type = record->header.xl_info & ~XLR_INFO_MASK; |
| |
| if (rmid == RM_XLOG_ID) |
| { |
| if (record_type == XLOG_CHECKPOINT_SHUTDOWN || |
| record_type == XLOG_END_OF_RECOVERY) |
| { |
| /* |
| * These records might change the TLI. Avoid potential |
| * bugs if we were to allow "read TLI" and "replay TLI" to |
| * differ without more analysis. |
| */ |
| prefetcher->no_readahead_until = record->lsn; |
| |
| #ifdef XLOGPREFETCHER_DEBUG_LEVEL |
| elog(XLOGPREFETCHER_DEBUG_LEVEL, |
| "suppressing all readahead until %X/%X is replayed due to possible TLI change", |
| LSN_FORMAT_ARGS(record->lsn)); |
| #endif |
| |
| /* Fall through so we move past this record. */ |
| } |
| } |
| else if (rmid == RM_DBASE_ID) |
| { |
| /* |
| * When databases are created with the file-copy strategy, |
| * there are no WAL records to tell us about the creation of |
| * individual relations. |
| */ |
| if (record_type == XLOG_DBASE_CREATE_FILE_COPY) |
| { |
| xl_dbase_create_file_copy_rec *xlrec = |
| (xl_dbase_create_file_copy_rec *) record->main_data; |
| RelFileLocator rlocator = |
| {InvalidOid, xlrec->db_id, InvalidRelFileNumber}; |
| |
| /* |
| * Don't try to prefetch anything in this database until |
| * it has been created, or we might confuse the blocks of |
| * different generations, if a database OID or |
| * relfilenumber is reused. It's also more efficient than |
| * discovering that relations don't exist on disk yet with |
| * ENOENT errors. |
| */ |
| XLogPrefetcherAddFilter(prefetcher, rlocator, 0, record->lsn); |
| |
| #ifdef XLOGPREFETCHER_DEBUG_LEVEL |
| elog(XLOGPREFETCHER_DEBUG_LEVEL, |
| "suppressing prefetch in database %u until %X/%X is replayed due to raw file copy", |
| rlocator.dbOid, |
| LSN_FORMAT_ARGS(record->lsn)); |
| #endif |
| } |
| } |
| else if (rmid == RM_SMGR_ID) |
| { |
| if (record_type == XLOG_SMGR_CREATE) |
| { |
| xl_smgr_create *xlrec = (xl_smgr_create *) |
| record->main_data; |
| |
| if (xlrec->forkNum == MAIN_FORKNUM) |
| { |
| /* |
| * Don't prefetch anything for this whole relation |
| * until it has been created. Otherwise we might |
| * confuse the blocks of different generations, if a |
| * relfilenumber is reused. This also avoids the need |
| * to discover the problem via extra syscalls that |
| * report ENOENT. |
| */ |
| XLogPrefetcherAddFilter(prefetcher, xlrec->rlocator, 0, |
| record->lsn); |
| |
| #ifdef XLOGPREFETCHER_DEBUG_LEVEL |
| elog(XLOGPREFETCHER_DEBUG_LEVEL, |
| "suppressing prefetch in relation %u/%u/%u until %X/%X is replayed, which creates the relation", |
| xlrec->rlocator.spcOid, |
| xlrec->rlocator.dbOid, |
| xlrec->rlocator.relNumber, |
| LSN_FORMAT_ARGS(record->lsn)); |
| #endif |
| } |
| } |
| else if (record_type == XLOG_SMGR_TRUNCATE) |
| { |
| xl_smgr_truncate *xlrec = (xl_smgr_truncate *) |
| record->main_data; |
| |
| /* |
| * Don't consider prefetching anything in the truncated |
| * range until the truncation has been performed. |
| */ |
| XLogPrefetcherAddFilter(prefetcher, xlrec->rlocator, |
| xlrec->blkno, |
| record->lsn); |
| |
| #ifdef XLOGPREFETCHER_DEBUG_LEVEL |
| elog(XLOGPREFETCHER_DEBUG_LEVEL, |
| "suppressing prefetch in relation %u/%u/%u from block %u until %X/%X is replayed, which truncates the relation", |
| xlrec->rlocator.spcOid, |
| xlrec->rlocator.dbOid, |
| xlrec->rlocator.relNumber, |
| xlrec->blkno, |
| LSN_FORMAT_ARGS(record->lsn)); |
| #endif |
| } |
| } |
| } |
| |
| /* Scan the block references, starting where we left off last time. */ |
| while (prefetcher->next_block_id <= record->max_block_id) |
| { |
| int block_id = prefetcher->next_block_id++; |
| DecodedBkpBlock *block = &record->blocks[block_id]; |
| SMgrRelation reln; |
| PrefetchBufferResult result; |
| |
| if (!block->in_use) |
| continue; |
| |
| Assert(!BufferIsValid(block->prefetch_buffer)); |
| |
| /* |
| * Record the LSN of this record. When it's replayed, |
| * LsnReadQueue will consider any IOs submitted for earlier LSNs |
| * to be finished. |
| */ |
| *lsn = record->lsn; |
| |
| /* We don't try to prefetch anything but the main fork for now. */ |
| if (block->forknum != MAIN_FORKNUM) |
| { |
| return LRQ_NEXT_NO_IO; |
| } |
| |
| /* |
| * If there is a full page image attached, we won't be reading the |
| * page, so don't bother trying to prefetch. |
| */ |
| if (block->has_image) |
| { |
| XLogPrefetchIncrement(&SharedStats->skip_fpw); |
| return LRQ_NEXT_NO_IO; |
| } |
| |
| /* There is no point in reading a page that will be zeroed. */ |
| if (block->flags & BKPBLOCK_WILL_INIT) |
| { |
| XLogPrefetchIncrement(&SharedStats->skip_init); |
| return LRQ_NEXT_NO_IO; |
| } |
| |
| /* Should we skip prefetching this block due to a filter? */ |
| if (XLogPrefetcherIsFiltered(prefetcher, block->rlocator, block->blkno)) |
| { |
| XLogPrefetchIncrement(&SharedStats->skip_new); |
| return LRQ_NEXT_NO_IO; |
| } |
| |
| /* There is no point in repeatedly prefetching the same block. */ |
| for (int i = 0; i < XLOGPREFETCHER_SEQ_WINDOW_SIZE; ++i) |
| { |
| if (block->blkno == prefetcher->recent_block[i] && |
| RelFileLocatorEquals(block->rlocator, prefetcher->recent_rlocator[i])) |
| { |
| /* |
| * XXX If we also remembered where it was, we could set |
| * recent_buffer so that recovery could skip smgropen() |
| * and a buffer table lookup. |
| */ |
| XLogPrefetchIncrement(&SharedStats->skip_rep); |
| return LRQ_NEXT_NO_IO; |
| } |
| } |
| prefetcher->recent_rlocator[prefetcher->recent_idx] = block->rlocator; |
| prefetcher->recent_block[prefetcher->recent_idx] = block->blkno; |
| prefetcher->recent_idx = |
| (prefetcher->recent_idx + 1) % XLOGPREFETCHER_SEQ_WINDOW_SIZE; |
| |
| /* |
| * We could try to have a fast path for repeated references to the |
| * same relation (with some scheme to handle invalidations |
| * safely), but for now we'll call smgropen() every time. |
| */ |
| reln = smgropen(block->rlocator, InvalidBackendId, SMGR_MD, NULL); |
| |
| /* |
| * If the relation file doesn't exist on disk, for example because |
| * we're replaying after a crash and the file will be created and |
| * then unlinked by WAL that hasn't been replayed yet, suppress |
| * further prefetching in the relation until this record is |
| * replayed. |
| */ |
| if (!smgrexists(reln, MAIN_FORKNUM)) |
| { |
| #ifdef XLOGPREFETCHER_DEBUG_LEVEL |
| elog(XLOGPREFETCHER_DEBUG_LEVEL, |
| "suppressing all prefetch in relation %u/%u/%u until %X/%X is replayed, because the relation does not exist on disk", |
| reln->smgr_rlocator.locator.spcOid, |
| reln->smgr_rlocator.locator.dbOid, |
| reln->smgr_rlocator.locator.relNumber, |
| LSN_FORMAT_ARGS(record->lsn)); |
| #endif |
| XLogPrefetcherAddFilter(prefetcher, block->rlocator, 0, |
| record->lsn); |
| XLogPrefetchIncrement(&SharedStats->skip_new); |
| return LRQ_NEXT_NO_IO; |
| } |
| |
| /* |
| * If the relation isn't big enough to contain the referenced |
| * block yet, suppress prefetching of this block and higher until |
| * this record is replayed. |
| */ |
| if (block->blkno >= smgrnblocks(reln, block->forknum)) |
| { |
| #ifdef XLOGPREFETCHER_DEBUG_LEVEL |
| elog(XLOGPREFETCHER_DEBUG_LEVEL, |
| "suppressing prefetch in relation %u/%u/%u from block %u until %X/%X is replayed, because the relation is too small", |
| reln->smgr_rlocator.locator.spcOid, |
| reln->smgr_rlocator.locator.dbOid, |
| reln->smgr_rlocator.locator.relNumber, |
| block->blkno, |
| LSN_FORMAT_ARGS(record->lsn)); |
| #endif |
| XLogPrefetcherAddFilter(prefetcher, block->rlocator, block->blkno, |
| record->lsn); |
| XLogPrefetchIncrement(&SharedStats->skip_new); |
| return LRQ_NEXT_NO_IO; |
| } |
| |
| /* Try to initiate prefetching. */ |
| result = PrefetchSharedBuffer(reln, block->forknum, block->blkno); |
| if (BufferIsValid(result.recent_buffer)) |
| { |
| /* Cache hit, nothing to do. */ |
| XLogPrefetchIncrement(&SharedStats->hit); |
| block->prefetch_buffer = result.recent_buffer; |
| return LRQ_NEXT_NO_IO; |
| } |
| else if (result.initiated_io) |
| { |
| /* Cache miss, I/O (presumably) started. */ |
| XLogPrefetchIncrement(&SharedStats->prefetch); |
| block->prefetch_buffer = InvalidBuffer; |
| return LRQ_NEXT_IO; |
| } |
| else if ((io_direct_flags & IO_DIRECT_DATA) == 0) |
| { |
| /* |
| * This shouldn't be possible, because we already determined |
| * that the relation exists on disk and is big enough. |
| * Something is wrong with the cache invalidation for |
| * smgrexists(), smgrnblocks(), or the file was unlinked or |
| * truncated beneath our feet? |
| */ |
| elog(ERROR, |
| "could not prefetch relation %u/%u/%u block %u", |
| reln->smgr_rlocator.locator.spcOid, |
| reln->smgr_rlocator.locator.dbOid, |
| reln->smgr_rlocator.locator.relNumber, |
| block->blkno); |
| } |
| } |
| |
| /* |
| * Several callsites need to be able to read exactly one record |
| * without any internal readahead. Examples: xlog.c reading |
| * checkpoint records with emode set to PANIC, which might otherwise |
| * cause XLogPageRead() to panic on some future page, and xlog.c |
| * determining where to start writing WAL next, which depends on the |
| * contents of the reader's internal buffer after reading one record. |
| * Therefore, don't even think about prefetching until the first |
| * record after XLogPrefetcherBeginRead() has been consumed. |
| */ |
| if (prefetcher->reader->decode_queue_tail && |
| prefetcher->reader->decode_queue_tail->lsn == prefetcher->begin_ptr) |
| return LRQ_NEXT_AGAIN; |
| |
| /* Advance to the next record. */ |
| prefetcher->record = NULL; |
| } |
| pg_unreachable(); |
| } |
| |
| /* |
| * Expose statistics about recovery prefetching. |
| */ |
| Datum |
| pg_stat_get_recovery_prefetch(PG_FUNCTION_ARGS) |
| { |
| #define PG_STAT_GET_RECOVERY_PREFETCH_COLS 10 |
| ReturnSetInfo *rsinfo = (ReturnSetInfo *) fcinfo->resultinfo; |
| Datum values[PG_STAT_GET_RECOVERY_PREFETCH_COLS]; |
| bool nulls[PG_STAT_GET_RECOVERY_PREFETCH_COLS]; |
| |
| InitMaterializedSRF(fcinfo, 0); |
| |
| for (int i = 0; i < PG_STAT_GET_RECOVERY_PREFETCH_COLS; ++i) |
| nulls[i] = false; |
| |
| values[0] = TimestampTzGetDatum(pg_atomic_read_u64(&SharedStats->reset_time)); |
| values[1] = Int64GetDatum(pg_atomic_read_u64(&SharedStats->prefetch)); |
| values[2] = Int64GetDatum(pg_atomic_read_u64(&SharedStats->hit)); |
| values[3] = Int64GetDatum(pg_atomic_read_u64(&SharedStats->skip_init)); |
| values[4] = Int64GetDatum(pg_atomic_read_u64(&SharedStats->skip_new)); |
| values[5] = Int64GetDatum(pg_atomic_read_u64(&SharedStats->skip_fpw)); |
| values[6] = Int64GetDatum(pg_atomic_read_u64(&SharedStats->skip_rep)); |
| values[7] = Int32GetDatum(SharedStats->wal_distance); |
| values[8] = Int32GetDatum(SharedStats->block_distance); |
| values[9] = Int32GetDatum(SharedStats->io_depth); |
| tuplestore_putvalues(rsinfo->setResult, rsinfo->setDesc, values, nulls); |
| |
| return (Datum) 0; |
| } |
| |
| /* |
| * Don't prefetch any blocks >= 'blockno' from a given 'rlocator', until 'lsn' |
| * has been replayed. |
| */ |
| static inline void |
| XLogPrefetcherAddFilter(XLogPrefetcher *prefetcher, RelFileLocator rlocator, |
| BlockNumber blockno, XLogRecPtr lsn) |
| { |
| XLogPrefetcherFilter *filter; |
| bool found; |
| |
| filter = hash_search(prefetcher->filter_table, &rlocator, HASH_ENTER, &found); |
| if (!found) |
| { |
| /* |
| * Don't allow any prefetching of this block or higher until replayed. |
| */ |
| filter->filter_until_replayed = lsn; |
| filter->filter_from_block = blockno; |
| dlist_push_head(&prefetcher->filter_queue, &filter->link); |
| } |
| else |
| { |
| /* |
| * We were already filtering this rlocator. Extend the filter's |
| * lifetime to cover this WAL record, but leave the lower of the block |
| * numbers there because we don't want to have to track individual |
| * blocks. |
| */ |
| filter->filter_until_replayed = lsn; |
| dlist_delete(&filter->link); |
| dlist_push_head(&prefetcher->filter_queue, &filter->link); |
| filter->filter_from_block = Min(filter->filter_from_block, blockno); |
| } |
| } |
| |
| /* |
| * Have we replayed any records that caused us to begin filtering a block |
| * range? That means that relations should have been created, extended or |
| * dropped as required, so we can stop filtering out accesses to a given |
| * relfilenumber. |
| */ |
| static inline void |
| XLogPrefetcherCompleteFilters(XLogPrefetcher *prefetcher, XLogRecPtr replaying_lsn) |
| { |
| while (unlikely(!dlist_is_empty(&prefetcher->filter_queue))) |
| { |
| XLogPrefetcherFilter *filter = dlist_tail_element(XLogPrefetcherFilter, |
| link, |
| &prefetcher->filter_queue); |
| |
| if (filter->filter_until_replayed >= replaying_lsn) |
| break; |
| |
| dlist_delete(&filter->link); |
| hash_search(prefetcher->filter_table, filter, HASH_REMOVE, NULL); |
| } |
| } |
| |
| /* |
| * Check if a given block should be skipped due to a filter. |
| */ |
| static inline bool |
| XLogPrefetcherIsFiltered(XLogPrefetcher *prefetcher, RelFileLocator rlocator, |
| BlockNumber blockno) |
| { |
| /* |
| * Test for empty queue first, because we expect it to be empty most of |
| * the time and we can avoid the hash table lookup in that case. |
| */ |
| if (unlikely(!dlist_is_empty(&prefetcher->filter_queue))) |
| { |
| XLogPrefetcherFilter *filter; |
| |
| /* See if the block range is filtered. */ |
| filter = hash_search(prefetcher->filter_table, &rlocator, HASH_FIND, NULL); |
| if (filter && filter->filter_from_block <= blockno) |
| { |
| #ifdef XLOGPREFETCHER_DEBUG_LEVEL |
| elog(XLOGPREFETCHER_DEBUG_LEVEL, |
| "prefetch of %u/%u/%u block %u suppressed; filtering until LSN %X/%X is replayed (blocks >= %u filtered)", |
| rlocator.spcOid, rlocator.dbOid, rlocator.relNumber, blockno, |
| LSN_FORMAT_ARGS(filter->filter_until_replayed), |
| filter->filter_from_block); |
| #endif |
| return true; |
| } |
| |
| /* See if the whole database is filtered. */ |
| rlocator.relNumber = InvalidRelFileNumber; |
| rlocator.spcOid = InvalidOid; |
| filter = hash_search(prefetcher->filter_table, &rlocator, HASH_FIND, NULL); |
| if (filter) |
| { |
| #ifdef XLOGPREFETCHER_DEBUG_LEVEL |
| elog(XLOGPREFETCHER_DEBUG_LEVEL, |
| "prefetch of %u/%u/%u block %u suppressed; filtering until LSN %X/%X is replayed (whole database)", |
| rlocator.spcOid, rlocator.dbOid, rlocator.relNumber, blockno, |
| LSN_FORMAT_ARGS(filter->filter_until_replayed)); |
| #endif |
| return true; |
| } |
| } |
| |
| return false; |
| } |
| |
| /* |
| * A wrapper for XLogBeginRead() that also resets the prefetcher. |
| */ |
| void |
| XLogPrefetcherBeginRead(XLogPrefetcher *prefetcher, XLogRecPtr recPtr) |
| { |
| /* This will forget about any in-flight IO. */ |
| prefetcher->reconfigure_count--; |
| |
| /* Book-keeping to avoid readahead on first read. */ |
| prefetcher->begin_ptr = recPtr; |
| |
| prefetcher->no_readahead_until = 0; |
| |
| /* This will forget about any queued up records in the decoder. */ |
| XLogBeginRead(prefetcher->reader, recPtr); |
| } |
| |
| /* |
| * A wrapper for XLogReadRecord() that provides the same interface, but also |
| * tries to initiate I/O for blocks referenced in future WAL records. |
| */ |
| XLogRecord * |
| XLogPrefetcherReadRecord(XLogPrefetcher *prefetcher, char **errmsg) |
| { |
| DecodedXLogRecord *record; |
| XLogRecPtr replayed_up_to; |
| |
| /* |
| * See if it's time to reset the prefetching machinery, because a relevant |
| * GUC was changed. |
| */ |
| if (unlikely(XLogPrefetchReconfigureCount != prefetcher->reconfigure_count)) |
| { |
| uint32 max_distance; |
| uint32 max_inflight; |
| |
| if (prefetcher->streaming_read) |
| lrq_free(prefetcher->streaming_read); |
| |
| if (RecoveryPrefetchEnabled()) |
| { |
| Assert(maintenance_io_concurrency > 0); |
| max_inflight = maintenance_io_concurrency; |
| max_distance = max_inflight * XLOGPREFETCHER_DISTANCE_MULTIPLIER; |
| } |
| else |
| { |
| max_inflight = 1; |
| max_distance = 1; |
| } |
| |
| prefetcher->streaming_read = lrq_alloc(max_distance, |
| max_inflight, |
| (uintptr_t) prefetcher, |
| XLogPrefetcherNextBlock); |
| |
| prefetcher->reconfigure_count = XLogPrefetchReconfigureCount; |
| } |
| |
| /* |
| * Release last returned record, if there is one, as it's now been |
| * replayed. |
| */ |
| replayed_up_to = XLogReleasePreviousRecord(prefetcher->reader); |
| |
| /* |
| * Can we drop any filters yet? If we were waiting for a relation to be |
| * created or extended, it is now OK to access blocks in the covered |
| * range. |
| */ |
| XLogPrefetcherCompleteFilters(prefetcher, replayed_up_to); |
| |
| /* |
| * All IO initiated by earlier WAL is now completed. This might trigger |
| * further prefetching. |
| */ |
| lrq_complete_lsn(prefetcher->streaming_read, replayed_up_to); |
| |
| /* |
| * If there's nothing queued yet, then start prefetching to cause at least |
| * one record to be queued. |
| */ |
| if (!XLogReaderHasQueuedRecordOrError(prefetcher->reader)) |
| { |
| Assert(lrq_inflight(prefetcher->streaming_read) == 0); |
| Assert(lrq_completed(prefetcher->streaming_read) == 0); |
| lrq_prefetch(prefetcher->streaming_read); |
| } |
| |
| /* Read the next record. */ |
| record = XLogNextRecord(prefetcher->reader, errmsg); |
| if (!record) |
| return NULL; |
| |
| /* |
| * The record we just got is the "current" one, for the benefit of the |
| * XLogRecXXX() macros. |
| */ |
| Assert(record == prefetcher->reader->record); |
| |
| /* |
| * If maintenance_io_concurrency is set very low, we might have started |
| * prefetching some but not all of the blocks referenced in the record |
| * we're about to return. Forget about the rest of the blocks in this |
| * record by dropping the prefetcher's reference to it. |
| */ |
| if (record == prefetcher->record) |
| prefetcher->record = NULL; |
| |
| /* |
| * See if it's time to compute some statistics, because enough WAL has |
| * been processed. |
| */ |
| if (unlikely(record->lsn >= prefetcher->next_stats_shm_lsn)) |
| XLogPrefetcherComputeStats(prefetcher); |
| |
| Assert(record == prefetcher->reader->record); |
| |
| return &record->header; |
| } |
| |
| bool |
| check_recovery_prefetch(int *new_value, void **extra, GucSource source) |
| { |
| #ifndef USE_PREFETCH |
| if (*new_value == RECOVERY_PREFETCH_ON) |
| { |
| GUC_check_errdetail("recovery_prefetch is not supported on platforms that lack posix_fadvise()."); |
| return false; |
| } |
| #endif |
| |
| return true; |
| } |
| |
| void |
| assign_recovery_prefetch(int new_value, void *extra) |
| { |
| /* Reconfigure prefetching, because a setting it depends on changed. */ |
| recovery_prefetch = new_value; |
| if (AmStartupProcess()) |
| XLogPrefetchReconfigure(); |
| } |