| /*------------------------------------------------------------------------- |
| * |
| * basebackup_throttle.c |
| * Basebackup sink implementing throttling. Data is forwarded to the |
| * next base backup sink in the chain at a rate no greater than the |
| * configured maximum. |
| * |
| * Portions Copyright (c) 2010-2023, PostgreSQL Global Development Group |
| * |
| * IDENTIFICATION |
| * src/backend/backup/basebackup_throttle.c |
| * |
| *------------------------------------------------------------------------- |
| */ |
| #include "postgres.h" |
| |
| #include "backup/basebackup_sink.h" |
| #include "miscadmin.h" |
| #include "pgstat.h" |
| #include "storage/latch.h" |
| #include "utils/timestamp.h" |
| |
| typedef struct bbsink_throttle |
| { |
| /* Common information for all types of sink. */ |
| bbsink base; |
| |
| /* The actual number of bytes, transfer of which may cause sleep. */ |
| uint64 throttling_sample; |
| |
| /* Amount of data already transferred but not yet throttled. */ |
| int64 throttling_counter; |
| |
| /* The minimum time required to transfer throttling_sample bytes. */ |
| TimeOffset elapsed_min_unit; |
| |
| /* The last check of the transfer rate. */ |
| TimestampTz throttled_last; |
| } bbsink_throttle; |
| |
| static void bbsink_throttle_begin_backup(bbsink *sink); |
| static void bbsink_throttle_archive_contents(bbsink *sink, size_t len); |
| static void bbsink_throttle_manifest_contents(bbsink *sink, size_t len); |
| static void throttle(bbsink_throttle *sink, size_t increment); |
| |
| static const bbsink_ops bbsink_throttle_ops = { |
| .begin_backup = bbsink_throttle_begin_backup, |
| .begin_archive = bbsink_forward_begin_archive, |
| .archive_contents = bbsink_throttle_archive_contents, |
| .end_archive = bbsink_forward_end_archive, |
| .begin_manifest = bbsink_forward_begin_manifest, |
| .manifest_contents = bbsink_throttle_manifest_contents, |
| .end_manifest = bbsink_forward_end_manifest, |
| .end_backup = bbsink_forward_end_backup, |
| .cleanup = bbsink_forward_cleanup |
| }; |
| |
| /* |
| * How frequently to throttle, as a fraction of the specified rate-second. |
| */ |
| #define THROTTLING_FREQUENCY 8 |
| |
| /* |
| * Create a new basebackup sink that performs throttling and forwards data |
| * to a successor sink. |
| */ |
| bbsink * |
| bbsink_throttle_new(bbsink *next, uint32 maxrate) |
| { |
| bbsink_throttle *sink; |
| |
| Assert(next != NULL); |
| Assert(maxrate > 0); |
| |
| sink = palloc0(sizeof(bbsink_throttle)); |
| *((const bbsink_ops **) &sink->base.bbs_ops) = &bbsink_throttle_ops; |
| sink->base.bbs_next = next; |
| |
| sink->throttling_sample = |
| (int64) maxrate * (int64) 1024 / THROTTLING_FREQUENCY; |
| |
| /* |
| * The minimum amount of time for throttling_sample bytes to be |
| * transferred. |
| */ |
| sink->elapsed_min_unit = USECS_PER_SEC / THROTTLING_FREQUENCY; |
| |
| return &sink->base; |
| } |
| |
| /* |
| * There's no real work to do here, but we need to record the current time so |
| * that it can be used for future calculations. |
| */ |
| static void |
| bbsink_throttle_begin_backup(bbsink *sink) |
| { |
| bbsink_throttle *mysink = (bbsink_throttle *) sink; |
| |
| bbsink_forward_begin_backup(sink); |
| |
| /* The 'real data' starts now (header was ignored). */ |
| mysink->throttled_last = GetCurrentTimestamp(); |
| } |
| |
| /* |
| * First throttle, and then pass archive contents to next sink. |
| */ |
| static void |
| bbsink_throttle_archive_contents(bbsink *sink, size_t len) |
| { |
| throttle((bbsink_throttle *) sink, len); |
| |
| bbsink_forward_archive_contents(sink, len); |
| } |
| |
| /* |
| * First throttle, and then pass manifest contents to next sink. |
| */ |
| static void |
| bbsink_throttle_manifest_contents(bbsink *sink, size_t len) |
| { |
| throttle((bbsink_throttle *) sink, len); |
| |
| bbsink_forward_manifest_contents(sink, len); |
| } |
| |
| /* |
| * Increment the network transfer counter by the given number of bytes, |
| * and sleep if necessary to comply with the requested network transfer |
| * rate. |
| */ |
| static void |
| throttle(bbsink_throttle *sink, size_t increment) |
| { |
| TimeOffset elapsed_min; |
| |
| Assert(sink->throttling_counter >= 0); |
| |
| sink->throttling_counter += increment; |
| if (sink->throttling_counter < sink->throttling_sample) |
| return; |
| |
| /* How much time should have elapsed at minimum? */ |
| elapsed_min = sink->elapsed_min_unit * |
| (sink->throttling_counter / sink->throttling_sample); |
| |
| /* |
| * Since the latch could be set repeatedly because of concurrently WAL |
| * activity, sleep in a loop to ensure enough time has passed. |
| */ |
| for (;;) |
| { |
| TimeOffset elapsed, |
| sleep; |
| int wait_result; |
| |
| /* Time elapsed since the last measurement (and possible wake up). */ |
| elapsed = GetCurrentTimestamp() - sink->throttled_last; |
| |
| /* sleep if the transfer is faster than it should be */ |
| sleep = elapsed_min - elapsed; |
| if (sleep <= 0) |
| break; |
| |
| ResetLatch(MyLatch); |
| |
| /* We're eating a potentially set latch, so check for interrupts */ |
| CHECK_FOR_INTERRUPTS(); |
| |
| /* |
| * (TAR_SEND_SIZE / throttling_sample * elapsed_min_unit) should be |
| * the maximum time to sleep. Thus the cast to long is safe. |
| */ |
| wait_result = WaitLatch(MyLatch, |
| WL_LATCH_SET | WL_TIMEOUT | WL_EXIT_ON_PM_DEATH, |
| (long) (sleep / 1000), |
| WAIT_EVENT_BASE_BACKUP_THROTTLE); |
| |
| if (wait_result & WL_LATCH_SET) |
| CHECK_FOR_INTERRUPTS(); |
| |
| /* Done waiting? */ |
| if (wait_result & WL_TIMEOUT) |
| break; |
| } |
| |
| /* |
| * As we work with integers, only whole multiple of throttling_sample was |
| * processed. The rest will be done during the next call of this function. |
| */ |
| sink->throttling_counter %= sink->throttling_sample; |
| |
| /* |
| * Time interval for the remaining amount and possible next increments |
| * starts now. |
| */ |
| sink->throttled_last = GetCurrentTimestamp(); |
| } |