| /* |
| * Licensed to the Apache Software Foundation (ASF) under one or more |
| * contributor license agreements. See the NOTICE file distributed with |
| * this work for additional information regarding copyright ownership. |
| * The ASF licenses this file to You under the Apache License, Version 2.0 |
| * (the "License"); you may not use this file except in compliance with |
| * the License. You may obtain a copy of the License at |
| * |
| * http://www.apache.org/licenses/LICENSE-2.0 |
| * |
| * Unless required by applicable law or agreed to in writing, software |
| * distributed under the License is distributed on an "AS IS" BASIS, |
| * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. |
| * See the License for the specific language governing permissions and |
| * limitations under the License. |
| */ |
| package org.apache.ignite.internal.processors.cache.persistence.pagemem; |
| |
| import java.util.concurrent.TimeUnit; |
| import java.util.concurrent.atomic.AtomicInteger; |
| import java.util.concurrent.atomic.AtomicLong; |
| import java.util.concurrent.locks.LockSupport; |
| import org.apache.ignite.IgniteLogger; |
| import org.apache.ignite.internal.processors.cache.persistence.CheckpointLockStateChecker; |
| import org.apache.ignite.internal.processors.cache.persistence.CheckpointWriteProgressSupplier; |
| import org.apache.ignite.internal.util.GridConcurrentHashSet; |
| import org.apache.ignite.internal.util.typedef.internal.U; |
| |
| /** |
| * Throttles threads that generate dirty pages during ongoing checkpoint. |
| * Designed to avoid zero dropdowns that can happen if checkpoint buffer is overflowed. |
| * Uses average checkpoint write speed and moment speed of marking pages as dirty. |
| */ |
| public class PagesWriteSpeedBasedThrottle implements PagesWriteThrottlePolicy { |
| /** Maximum dirty pages in region. */ |
| private static final double MAX_DIRTY_PAGES = 0.75; |
| |
| /** Page memory. */ |
| private final PageMemoryImpl pageMemory; |
| |
| /** Database manager. */ |
| private final CheckpointWriteProgressSupplier cpProgress; |
| |
| /** Starting throttle time. Limits write speed to 1000 MB/s. */ |
| private static final long STARTING_THROTTLE_NANOS = 4000; |
| |
| /** Backoff ratio. Each next park will be this times longer. */ |
| private static final double BACKOFF_RATIO = 1.05; |
| |
| /** Percent of dirty pages which will not cause throttling. */ |
| private static final double MIN_RATIO_NO_THROTTLE = 0.03; |
| |
| /** Exponential backoff counter. */ |
| private final AtomicInteger exponentialBackoffCntr = new AtomicInteger(0); |
| |
| /** Counter of written pages from checkpoint. Value is saved here for detecting checkpoint start. */ |
| private final AtomicInteger lastObservedWritten = new AtomicInteger(0); |
| |
| /** |
| * Dirty pages ratio was observed at checkpoint start (here start is moment when first page was actually saved to |
| * store). This ratio is excluded from throttling. |
| */ |
| private volatile double initDirtyRatioAtCpBegin = MIN_RATIO_NO_THROTTLE; |
| |
| /** |
| * Target (maximum) dirty pages ratio, after which throttling will start using |
| * {@link #getParkTime(double, long, int, int, long, long)}. |
| */ |
| private volatile double targetDirtyRatio; |
| |
| /** |
| * Current dirty pages ratio (percent of dirty pages in most used segment), negative value means no cp is running. |
| */ |
| private volatile double currDirtyRatio; |
| |
| /** Speed average checkpoint write speed. Current and 3 past checkpoints used. Pages/second. */ |
| private final IntervalBasedMeasurement speedCpWrite = new IntervalBasedMeasurement(); |
| |
| /** Last estimated speed for marking all clear pages as dirty till the end of checkpoint. */ |
| private volatile long speedForMarkAll; |
| |
| /** Threads set. Contains identifiers of all threads which were marking pages for current checkpoint. */ |
| private final GridConcurrentHashSet<Long> threadIds = new GridConcurrentHashSet<>(); |
| |
| /** |
| * Used for calculating speed of marking pages dirty. |
| * Value from past 750-1000 millis only. |
| * {@link IntervalBasedMeasurement#getSpeedOpsPerSec(long)} returns pages marked/second. |
| * {@link IntervalBasedMeasurement#getAverage()} returns average throttle time. |
| * */ |
| private final IntervalBasedMeasurement speedMarkAndAvgParkTime = new IntervalBasedMeasurement(250, 3); |
| |
| /** Total pages which is possible to store in page memory. */ |
| private long totalPages; |
| |
| /** Checkpoint lock state provider. */ |
| private CheckpointLockStateChecker cpLockStateChecker; |
| |
| /** Logger. */ |
| private IgniteLogger log; |
| |
| /** Previous warning time, nanos. */ |
| private AtomicLong prevWarnTime = new AtomicLong(); |
| |
| /** Warning min delay nanoseconds. */ |
| private static final long WARN_MIN_DELAY_NS = TimeUnit.SECONDS.toNanos(10); |
| |
| /** Warning threshold: minimal level of pressure that causes warning messages to log. */ |
| static final double WARN_THRESHOLD = 0.2; |
| |
| /** |
| * @param pageMemory Page memory. |
| * @param cpProgress Database manager. |
| * @param stateChecker Checkpoint lock state provider. |
| * @param log Logger. |
| */ |
| public PagesWriteSpeedBasedThrottle( |
| PageMemoryImpl pageMemory, |
| CheckpointWriteProgressSupplier cpProgress, |
| CheckpointLockStateChecker stateChecker, |
| IgniteLogger log |
| ) { |
| this.pageMemory = pageMemory; |
| this.cpProgress = cpProgress; |
| totalPages = pageMemory.totalPages(); |
| this.cpLockStateChecker = stateChecker; |
| this.log = log; |
| } |
| |
| /** {@inheritDoc} */ |
| @Override public void onMarkDirty(boolean isPageInCheckpoint) { |
| assert cpLockStateChecker.checkpointLockIsHeldByThread(); |
| |
| AtomicInteger writtenPagesCntr = cpProgress.writtenPagesCounter(); |
| |
| if (writtenPagesCntr == null) { |
| speedForMarkAll = 0; |
| targetDirtyRatio = -1; |
| currDirtyRatio = -1; |
| |
| return; // Don't throttle if checkpoint is not running. |
| } |
| |
| int cpWrittenPages = writtenPagesCntr.get(); |
| |
| long fullyCompletedPages = (cpWrittenPages + cpSyncedPages()) / 2; // written & sync'ed |
| |
| long curNanoTime = System.nanoTime(); |
| |
| speedCpWrite.setCounter(fullyCompletedPages, curNanoTime); |
| |
| long markDirtySpeed = speedMarkAndAvgParkTime.getSpeedOpsPerSec(curNanoTime); |
| |
| long curCpWriteSpeed = speedCpWrite.getSpeedOpsPerSec(curNanoTime); |
| |
| threadIds.add(Thread.currentThread().getId()); |
| |
| ThrottleMode level = ThrottleMode.NO; //should apply delay (throttling) for current page modification |
| |
| if (isPageInCheckpoint) { |
| int checkpointBufLimit = pageMemory.checkpointBufferPagesSize() * 2 / 3; |
| |
| if (pageMemory.checkpointBufferPagesCount() > checkpointBufLimit) |
| level = ThrottleMode.EXPONENTIAL; |
| } |
| |
| long throttleParkTimeNs = 0; |
| |
| if (level == ThrottleMode.NO) { |
| int nThreads = threadIds.size(); |
| |
| int cpTotalPages = cpTotalPages(); |
| |
| if (cpTotalPages == 0) { |
| boolean throttleByCpSpeed = curCpWriteSpeed > 0 && markDirtySpeed > curCpWriteSpeed; |
| |
| if (throttleByCpSpeed) { |
| throttleParkTimeNs = calcDelayTime(curCpWriteSpeed, nThreads, 1); |
| |
| level = ThrottleMode.LIMITED; |
| } |
| } |
| else { |
| double dirtyPagesRatio = pageMemory.getDirtyPagesRatio(); |
| |
| currDirtyRatio = dirtyPagesRatio; |
| |
| detectCpPagesWriteStart(cpWrittenPages, dirtyPagesRatio); |
| |
| if (dirtyPagesRatio >= MAX_DIRTY_PAGES) |
| level = ThrottleMode.NO; // too late to throttle, will wait on safe to update instead. |
| else { |
| int notEvictedPagesTotal = cpTotalPages - cpEvictedPages(); |
| |
| throttleParkTimeNs = getParkTime(dirtyPagesRatio, |
| fullyCompletedPages, |
| notEvictedPagesTotal < 0 ? 0 : notEvictedPagesTotal, |
| nThreads, |
| markDirtySpeed, |
| curCpWriteSpeed); |
| |
| level = throttleParkTimeNs == 0 ? ThrottleMode.NO : ThrottleMode.LIMITED; |
| } |
| } |
| } |
| |
| if (level == ThrottleMode.EXPONENTIAL) { |
| int exponent = exponentialBackoffCntr.getAndIncrement(); |
| |
| throttleParkTimeNs = (long)(STARTING_THROTTLE_NANOS * Math.pow(BACKOFF_RATIO, exponent)); |
| } |
| else { |
| if (isPageInCheckpoint) |
| exponentialBackoffCntr.set(0); |
| |
| if (level == ThrottleMode.NO) |
| throttleParkTimeNs = 0; |
| } |
| |
| if (throttleParkTimeNs > 0) { |
| recurrentLogIfNeed(); |
| |
| doPark(throttleParkTimeNs); |
| } |
| |
| speedMarkAndAvgParkTime.addMeasurementForAverageCalculation(throttleParkTimeNs); |
| } |
| |
| /** |
| * Disables the current thread for thread scheduling purposes. May be overriden by subclasses for tests |
| * |
| * @param throttleParkTimeNs the maximum number of nanoseconds to wait |
| */ |
| protected void doPark(long throttleParkTimeNs) { |
| if (throttleParkTimeNs > LOGGING_THRESHOLD) { |
| U.warn(log, "Parking thread=" + Thread.currentThread().getName() |
| + " for timeout(ms)=" + (throttleParkTimeNs / 1_000_000)); |
| } |
| |
| LockSupport.parkNanos(throttleParkTimeNs); |
| } |
| |
| /** |
| * @return number of written pages. |
| */ |
| private int cpWrittenPages() { |
| AtomicInteger writtenPagesCntr = cpProgress.writtenPagesCounter(); |
| |
| return writtenPagesCntr == null ? 0 : writtenPagesCntr.get(); |
| } |
| |
| /** |
| * @return Number of pages in current checkpoint. |
| */ |
| private int cpTotalPages() { |
| return cpProgress.currentCheckpointPagesCount(); |
| } |
| |
| /** |
| * @return Counter for fsynced checkpoint pages. |
| */ |
| private int cpSyncedPages() { |
| AtomicInteger syncedPagesCntr = cpProgress.syncedPagesCounter(); |
| |
| return syncedPagesCntr == null ? 0 : syncedPagesCntr.get(); |
| } |
| |
| /** |
| * @return number of evicted pages. |
| */ |
| private int cpEvictedPages() { |
| AtomicInteger evictedPagesCntr = cpProgress.evictedPagesCntr(); |
| |
| return evictedPagesCntr == null ? 0 : evictedPagesCntr.get(); |
| } |
| |
| /** |
| * Prints warning to log if throttling is occurred and requires markable amount of time. |
| */ |
| private void recurrentLogIfNeed() { |
| long prevWarningNs = prevWarnTime.get(); |
| long curNs = System.nanoTime(); |
| |
| if (prevWarningNs != 0 && (curNs - prevWarningNs) <= WARN_MIN_DELAY_NS) |
| return; |
| |
| double weight = throttleWeight(); |
| if (weight <= WARN_THRESHOLD) |
| return; |
| |
| if (prevWarnTime.compareAndSet(prevWarningNs, curNs)) { |
| String msg = String.format("Throttling is applied to page modifications " + |
| "[percentOfPartTime=%.2f, markDirty=%d pages/sec, checkpointWrite=%d pages/sec, " + |
| "estIdealMarkDirty=%d pages/sec, curDirty=%.2f, maxDirty=%.2f, avgParkTime=%d ns, " + |
| "pages: (total=%d, evicted=%d, written=%d, synced=%d, cpBufUsed=%d, cpBufTotal=%d)]", |
| weight, getMarkDirtySpeed(), getCpWriteSpeed(), |
| getLastEstimatedSpeedForMarkAll(), getCurrDirtyRatio(), getTargetDirtyRatio(), throttleParkTime(), |
| cpTotalPages(), cpEvictedPages(), cpWrittenPages(), cpSyncedPages(), |
| pageMemory.checkpointBufferPagesCount(), pageMemory.checkpointBufferPagesSize()); |
| |
| log.info(msg); |
| } |
| } |
| |
| /** |
| * @param dirtyPagesRatio actual percent of dirty pages. |
| * @param fullyCompletedPages written & fsynced pages count. |
| * @param cpTotalPages total checkpoint scope. |
| * @param nThreads number of threads providing data during current checkpoint. |
| * @param markDirtySpeed registered mark dirty speed, pages/sec. |
| * @param curCpWriteSpeed average checkpoint write speed, pages/sec. |
| * @return time in nanoseconds to part or 0 if throttling is not required. |
| */ |
| long getParkTime( |
| double dirtyPagesRatio, |
| long fullyCompletedPages, |
| int cpTotalPages, |
| int nThreads, |
| long markDirtySpeed, |
| long curCpWriteSpeed) { |
| |
| long speedForMarkAll = calcSpeedToMarkAllSpaceTillEndOfCp(dirtyPagesRatio, |
| fullyCompletedPages, |
| curCpWriteSpeed, |
| cpTotalPages); |
| |
| double targetDirtyRatio = calcTargetDirtyRatio(fullyCompletedPages, cpTotalPages); |
| |
| this.speedForMarkAll = speedForMarkAll; //publish for metrics |
| this.targetDirtyRatio = targetDirtyRatio; //publish for metrics |
| |
| boolean lowSpaceLeft = dirtyPagesRatio > targetDirtyRatio && (dirtyPagesRatio + 0.05 > MAX_DIRTY_PAGES); |
| int slowdown = lowSpaceLeft ? 3 : 1; |
| |
| double multiplierForSpeedForMarkAll = lowSpaceLeft |
| ? 0.8 |
| : 1.0; |
| |
| boolean markingTooFast = speedForMarkAll > 0 && markDirtySpeed > multiplierForSpeedForMarkAll * speedForMarkAll; |
| boolean throttleBySizeAndMarkSpeed = dirtyPagesRatio > targetDirtyRatio && markingTooFast; |
| |
| //for case of speedForMarkAll >> markDirtySpeed, allow write little bit faster than CP average |
| double allowWriteFasterThanCp = (speedForMarkAll > 0 && markDirtySpeed > 0 && speedForMarkAll > markDirtySpeed) |
| ? (0.1 * speedForMarkAll / markDirtySpeed) |
| : (dirtyPagesRatio > targetDirtyRatio ? 0.0 : 0.1); |
| |
| double fasterThanCpWriteSpeed = lowSpaceLeft |
| ? 1.0 |
| : 1.0 + allowWriteFasterThanCp; |
| boolean throttleByCpSpeed = curCpWriteSpeed > 0 && markDirtySpeed > (fasterThanCpWriteSpeed * curCpWriteSpeed); |
| |
| long delayByCpWrite = throttleByCpSpeed ? calcDelayTime(curCpWriteSpeed, nThreads, slowdown) : 0; |
| long delayByMarkAllWrite = throttleBySizeAndMarkSpeed ? calcDelayTime(speedForMarkAll, nThreads, slowdown) : 0; |
| return Math.max(delayByCpWrite, delayByMarkAllWrite); |
| } |
| |
| /** |
| * @param dirtyPagesRatio current percent of dirty pages. |
| * @param fullyCompletedPages count of written and sync'ed pages |
| * @param curCpWriteSpeed pages/second checkpoint write speed. 0 speed means 'no data'. |
| * @param cpTotalPages total pages in checkpoint. |
| * @return pages/second to mark to mark all clean pages as dirty till the end of checkpoint. 0 speed means 'no |
| * data'. |
| */ |
| private long calcSpeedToMarkAllSpaceTillEndOfCp(double dirtyPagesRatio, |
| long fullyCompletedPages, |
| long curCpWriteSpeed, |
| int cpTotalPages) { |
| |
| if (curCpWriteSpeed == 0) |
| return 0; |
| |
| if (cpTotalPages <= 0) |
| return 0; |
| |
| if (dirtyPagesRatio >= MAX_DIRTY_PAGES) |
| return 0; |
| |
| double remainedClear = (MAX_DIRTY_PAGES - dirtyPagesRatio) * totalPages; |
| |
| double timeRemainedSeconds = 1.0 * (cpTotalPages - fullyCompletedPages) / curCpWriteSpeed; |
| |
| return (long)(remainedClear / timeRemainedSeconds); |
| } |
| |
| /** |
| * @param fullyCompletedPages number of completed. |
| * @param cpTotalPages Total amount of pages under checkpoint. |
| * @return size-based calculation of target ratio. |
| */ |
| private double calcTargetDirtyRatio(long fullyCompletedPages, int cpTotalPages) { |
| double cpProgress = ((double)fullyCompletedPages) / cpTotalPages; |
| |
| // Starting with initialDirtyRatioAtCpBegin to avoid throttle right after checkpoint start |
| double constStart = initDirtyRatioAtCpBegin; |
| |
| double throttleTotalWeight = 1.0 - constStart; |
| |
| // .75 is maximum ratio of dirty pages |
| return (cpProgress * throttleTotalWeight + constStart) * MAX_DIRTY_PAGES; |
| } |
| |
| /** |
| * @param baseSpeed speed to slow down. |
| * @param nThreads operating threads. |
| * @param coefficient how much it is needed to slowdown base speed. 1.0 means delay to get exact base speed. |
| * @return sleep time in nanoseconds. |
| */ |
| private long calcDelayTime(long baseSpeed, int nThreads, double coefficient) { |
| if (coefficient <= 0.0) |
| return 0; |
| |
| if (baseSpeed <= 0) |
| return 0; |
| |
| long updTimeNsForOnePage = TimeUnit.SECONDS.toNanos(1) * nThreads / (baseSpeed); |
| |
| return (long)(coefficient * updTimeNsForOnePage); |
| } |
| |
| /** |
| * @param cpWrittenPages current counter of written pages. |
| * @param dirtyPagesRatio current percent of dirty pages. |
| */ |
| private void detectCpPagesWriteStart(int cpWrittenPages, double dirtyPagesRatio) { |
| if (cpWrittenPages > 0 && lastObservedWritten.compareAndSet(0, cpWrittenPages)) { |
| double newMinRatio = dirtyPagesRatio; |
| |
| if (newMinRatio < MIN_RATIO_NO_THROTTLE) |
| newMinRatio = MIN_RATIO_NO_THROTTLE; |
| |
| if (newMinRatio > 1) |
| newMinRatio = 1; |
| |
| //for slow cp is completed now, drop previous dirty page percent |
| initDirtyRatioAtCpBegin = newMinRatio; |
| } |
| } |
| |
| /** {@inheritDoc} */ |
| @Override public void onBeginCheckpoint() { |
| speedCpWrite.setCounter(0L, System.nanoTime()); |
| |
| initDirtyRatioAtCpBegin = MIN_RATIO_NO_THROTTLE; |
| |
| lastObservedWritten.set(0); |
| } |
| |
| |
| /** {@inheritDoc} */ |
| @Override public void onFinishCheckpoint() { |
| exponentialBackoffCntr.set(0); |
| |
| speedCpWrite.finishInterval(); |
| speedMarkAndAvgParkTime.finishInterval(); |
| threadIds.clear(); |
| } |
| |
| /** |
| * @return Exponential backoff counter. |
| */ |
| public long throttleParkTime() { |
| return speedMarkAndAvgParkTime.getAverage(); |
| } |
| |
| /** |
| * @return Target (maximum) dirty pages ratio, after which throttling will start. |
| */ |
| public double getTargetDirtyRatio() { |
| return targetDirtyRatio; |
| } |
| |
| /** |
| * @return Current dirty pages ratio. |
| */ |
| public double getCurrDirtyRatio() { |
| double ratio = currDirtyRatio; |
| |
| if (ratio >= 0) |
| return ratio; |
| |
| return pageMemory.getDirtyPagesRatio(); |
| } |
| |
| /** |
| * @return Speed of marking pages dirty. Value from past 750-1000 millis only. Pages/second. |
| */ |
| public long getMarkDirtySpeed() { |
| return speedMarkAndAvgParkTime.getSpeedOpsPerSec(System.nanoTime()); |
| } |
| |
| /** |
| * @return Speed average checkpoint write speed. Current and 3 past checkpoints used. Pages/second. |
| */ |
| public long getCpWriteSpeed() { |
| return speedCpWrite.getSpeedOpsPerSecReadOnly(); |
| } |
| |
| /** |
| * @return Returns {@link #speedForMarkAll}. |
| */ |
| public long getLastEstimatedSpeedForMarkAll() { |
| return speedForMarkAll; |
| } |
| |
| /** |
| * Measurement shows how much throttling time is involved into average marking time. |
| * @return metric started from 0.0 and showing how much throttling is involved into current marking process. |
| */ |
| public double throttleWeight() { |
| long speed = speedMarkAndAvgParkTime.getSpeedOpsPerSec(System.nanoTime()); |
| |
| if (speed <= 0) |
| return 0; |
| |
| long timeForOnePage = calcDelayTime(speed, threadIds.size(), 1); |
| |
| if (timeForOnePage == 0) |
| return 0; |
| |
| return 1.0 * throttleParkTime() / timeForOnePage; |
| } |
| |
| /** |
| * Throttling mode for page. |
| */ |
| private enum ThrottleMode { |
| /** No delay is applied. */ |
| NO, |
| |
| /** Limited, time is based on target speed. */ |
| LIMITED, |
| |
| /** Exponential. */ |
| EXPONENTIAL |
| } |
| } |