blob: 2828c4348af8d85ba69fe0027cbe4d38b8741dc1 [file] [log] [blame]
/*
* Licensed to the Apache Software Foundation (ASF) under one or more
* contributor license agreements. See the NOTICE file distributed with
* this work for additional information regarding copyright ownership.
* The ASF licenses this file to You under the Apache License, Version 2.0
* (the "License"); you may not use this file except in compliance with
* the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/
package org.apache.ignite.internal.processors.cache.persistence.pagemem;
import java.util.Collection;
import java.util.concurrent.ConcurrentLinkedQueue;
import java.util.concurrent.atomic.AtomicInteger;
import java.util.concurrent.locks.LockSupport;
import org.apache.ignite.IgniteLogger;
import org.apache.ignite.IgniteSystemProperties;
import org.apache.ignite.internal.processors.cache.persistence.CheckpointLockStateChecker;
import org.apache.ignite.internal.processors.cache.persistence.CheckpointWriteProgressSupplier;
import org.apache.ignite.internal.util.typedef.internal.U;
import static org.apache.ignite.IgniteSystemProperties.IGNITE_STARVATION_CHECK_INTERVAL;
/**
* Throttles threads that generate dirty pages during ongoing checkpoint.
* Designed to avoid zero dropdowns that can happen if checkpoint buffer is overflowed.
*/
public class PagesWriteThrottle implements PagesWriteThrottlePolicy {
/** Page memory. */
private final PageMemoryImpl pageMemory;
/** Database manager. */
private final CheckpointWriteProgressSupplier cpProgress;
/** If true, throttle will only protect from checkpoint buffer overflow, not from dirty pages ratio cap excess. */
private final boolean throttleOnlyPagesInCheckpoint;
/** Checkpoint lock state checker. */
private CheckpointLockStateChecker stateChecker;
/** Starting throttle time. Limits write speed to 1000 MB/s. */
private static final long STARTING_THROTTLE_NANOS = 4000;
/** Backoff ratio. Each next park will be this times longer. */
private static final double BACKOFF_RATIO = 1.05;
/** Checkpoint buffer fullfill upper bound. */
private static final float CP_BUF_FILL_THRESHOLD = 2f / 3;
/** Counter for dirty pages ratio throttling. */
private final AtomicInteger notInCheckpointBackoffCntr = new AtomicInteger(0);
/** Counter for checkpoint buffer usage ratio throttling (we need a separate one due to IGNITE-7751). */
private final AtomicInteger inCheckpointBackoffCntr = new AtomicInteger(0);
/** Logger. */
private IgniteLogger log;
/** Currently parking threads. */
private final Collection<Thread> parkThrds = new ConcurrentLinkedQueue<>();
/**
* @param pageMemory Page memory.
* @param cpProgress Database manager.
* @param stateChecker checkpoint lock state checker.
* @param throttleOnlyPagesInCheckpoint If true, throttle will only protect from checkpoint buffer overflow.
* @param log Logger.
*/
public PagesWriteThrottle(PageMemoryImpl pageMemory,
CheckpointWriteProgressSupplier cpProgress,
CheckpointLockStateChecker stateChecker,
boolean throttleOnlyPagesInCheckpoint,
IgniteLogger log
) {
this.pageMemory = pageMemory;
this.cpProgress = cpProgress;
this.stateChecker = stateChecker;
this.throttleOnlyPagesInCheckpoint = throttleOnlyPagesInCheckpoint;
this.log = log;
if (!throttleOnlyPagesInCheckpoint)
assert cpProgress != null : "cpProgress must be not null if ratio based throttling mode is used";
}
/** {@inheritDoc} */
@Override public void onMarkDirty(boolean isPageInCheckpoint) {
assert stateChecker.checkpointLockIsHeldByThread();
boolean shouldThrottle = false;
if (isPageInCheckpoint) {
int checkpointBufLimit = (int)(pageMemory.checkpointBufferPagesSize() * CP_BUF_FILL_THRESHOLD);
shouldThrottle = pageMemory.checkpointBufferPagesCount() > checkpointBufLimit;
}
if (!shouldThrottle && !throttleOnlyPagesInCheckpoint) {
AtomicInteger writtenPagesCntr = cpProgress.writtenPagesCounter();
if (writtenPagesCntr == null)
return; // Don't throttle if checkpoint is not running.
int cpWrittenPages = writtenPagesCntr.get();
int cpTotalPages = cpProgress.currentCheckpointPagesCount();
if (cpWrittenPages == cpTotalPages) {
// Checkpoint is already in fsync stage, increasing maximum ratio of dirty pages to 3/4
shouldThrottle = pageMemory.shouldThrottle(3.0 / 4);
} else {
double dirtyRatioThreshold = ((double)cpWrittenPages) / cpTotalPages;
// Starting with 0.05 to avoid throttle right after checkpoint start
// 7/12 is maximum ratio of dirty pages
dirtyRatioThreshold = (dirtyRatioThreshold * 0.95 + 0.05) * 7 / 12;
shouldThrottle = pageMemory.shouldThrottle(dirtyRatioThreshold);
}
}
AtomicInteger cntr = isPageInCheckpoint ? inCheckpointBackoffCntr : notInCheckpointBackoffCntr;
if (shouldThrottle) {
int throttleLevel = cntr.getAndIncrement();
long throttleParkTimeNs = (long) (STARTING_THROTTLE_NANOS * Math.pow(BACKOFF_RATIO, throttleLevel));
if (throttleParkTimeNs > LOGGING_THRESHOLD) {
U.warn(log, "Parking thread=" + Thread.currentThread().getName()
+ " for timeout(ms)=" + (throttleParkTimeNs / 1_000_000));
}
if (isPageInCheckpoint)
parkThrds.add(Thread.currentThread());
LockSupport.parkNanos(throttleParkTimeNs);
}
else {
int oldCntr = cntr.getAndSet(0);
if (isPageInCheckpoint && oldCntr != 0) {
parkThrds.forEach(LockSupport::unpark);
parkThrds.clear();
}
}
}
/** {@inheritDoc} */
@Override public void onBeginCheckpoint() {
}
/** {@inheritDoc} */
@Override public void onFinishCheckpoint() {
inCheckpointBackoffCntr.set(0);
notInCheckpointBackoffCntr.set(0);
}
}