blob: c8422220630a0ac8b8bd5d1afa85b5624a57bde5 [file] [log] [blame]
/*
* Licensed to the Apache Software Foundation (ASF) under one or more
* contributor license agreements. See the NOTICE file distributed with
* this work for additional information regarding copyright ownership.
* The ASF licenses this file to You under the Apache License, Version 2.0
* (the "License"); you may not use this file except in compliance with
* the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/
package org.apache.flink.runtime.checkpoint;
import org.apache.flink.annotation.VisibleForTesting;
import org.apache.flink.runtime.checkpoint.CheckpointCoordinator.CheckpointTriggerRequest;
import org.apache.flink.util.Preconditions;
import org.apache.flink.util.clock.Clock;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
import java.util.Comparator;
import java.util.NavigableSet;
import java.util.Optional;
import java.util.PriorityQueue;
import java.util.TreeSet;
import java.util.function.BiConsumer;
import java.util.function.IntSupplier;
import static java.lang.System.currentTimeMillis;
import static java.lang.System.identityHashCode;
import static org.apache.flink.runtime.checkpoint.CheckpointFailureReason.MINIMUM_TIME_BETWEEN_CHECKPOINTS;
import static org.apache.flink.runtime.checkpoint.CheckpointFailureReason.TOO_MANY_CHECKPOINT_REQUESTS;
/**
* Decides whether a {@link CheckpointCoordinator.CheckpointTriggerRequest checkpoint request}
* should be executed, dropped or postponed. Dropped requests are failed immediately. Postponed
* requests are enqueued into a queue and can be dequeued later.
*
* <p>Decision is made according to:
*
* <ul>
* <li>checkpoint properties (e.g. isForce, isPeriodic)
* <li>checkpointing configuration (e.g. max concurrent checkpoints, min pause)
* <li>current state (other queued requests, pending checkpoints, last checkpoint completion time)
* </ul>
*/
@SuppressWarnings("ConstantConditions")
class CheckpointRequestDecider {
private static final Logger LOG = LoggerFactory.getLogger(CheckpointRequestDecider.class);
private static final int LOG_TIME_IN_QUEUE_THRESHOLD_MS = 100;
private static final int DEFAULT_MAX_QUEUED_REQUESTS = 1000;
private final int maxConcurrentCheckpointAttempts;
private final BiConsumer<Long, Long> rescheduleTrigger;
private final Clock clock;
private final long minPauseBetweenCheckpoints;
private final IntSupplier pendingCheckpointsSizeSupplier;
private final IntSupplier numberOfCleaningCheckpointsSupplier;
private final NavigableSet<CheckpointTriggerRequest> queuedRequests =
new TreeSet<>(checkpointTriggerRequestsComparator());
private final int maxQueuedRequests;
CheckpointRequestDecider(
int maxConcurrentCheckpointAttempts,
BiConsumer<Long, Long> rescheduleTrigger,
Clock clock,
long minPauseBetweenCheckpoints,
IntSupplier pendingCheckpointsSizeSupplier,
IntSupplier numberOfCleaningCheckpointsSupplier) {
this(
maxConcurrentCheckpointAttempts,
rescheduleTrigger,
clock,
minPauseBetweenCheckpoints,
pendingCheckpointsSizeSupplier,
numberOfCleaningCheckpointsSupplier,
DEFAULT_MAX_QUEUED_REQUESTS);
}
CheckpointRequestDecider(
int maxConcurrentCheckpointAttempts,
BiConsumer<Long, Long> rescheduleTrigger,
Clock clock,
long minPauseBetweenCheckpoints,
IntSupplier pendingCheckpointsSizeSupplier,
IntSupplier numberOfCleaningCheckpointsSupplier,
int maxQueuedRequests) {
Preconditions.checkArgument(maxConcurrentCheckpointAttempts > 0);
Preconditions.checkArgument(maxQueuedRequests > 0);
this.maxConcurrentCheckpointAttempts = maxConcurrentCheckpointAttempts;
this.rescheduleTrigger = rescheduleTrigger;
this.clock = clock;
this.minPauseBetweenCheckpoints = minPauseBetweenCheckpoints;
this.pendingCheckpointsSizeSupplier = pendingCheckpointsSizeSupplier;
this.numberOfCleaningCheckpointsSupplier = numberOfCleaningCheckpointsSupplier;
this.maxQueuedRequests = maxQueuedRequests;
}
/**
* Submit a new checkpoint request and decide whether it or some other request can be executed.
*
* @return request that should be executed
*/
Optional<CheckpointTriggerRequest> chooseRequestToExecute(
CheckpointTriggerRequest newRequest, boolean isTriggering, long lastCompletionMs) {
if (queuedRequests.size() >= maxQueuedRequests && !queuedRequests.last().isPeriodic) {
// there are only non-periodic (ie user-submitted) requests enqueued - retain them and
// drop the new one
newRequest.completeExceptionally(new CheckpointException(TOO_MANY_CHECKPOINT_REQUESTS));
return Optional.empty();
} else {
queuedRequests.add(newRequest);
if (queuedRequests.size() > maxQueuedRequests) {
queuedRequests
.pollLast()
.completeExceptionally(
new CheckpointException(TOO_MANY_CHECKPOINT_REQUESTS));
}
Optional<CheckpointTriggerRequest> request =
chooseRequestToExecute(isTriggering, lastCompletionMs);
request.ifPresent(CheckpointRequestDecider::logInQueueTime);
return request;
}
}
/**
* Choose one of the queued requests to execute, if any.
*
* @return request that should be executed
*/
Optional<CheckpointTriggerRequest> chooseQueuedRequestToExecute(
boolean isTriggering, long lastCompletionMs) {
Optional<CheckpointTriggerRequest> request =
chooseRequestToExecute(isTriggering, lastCompletionMs);
request.ifPresent(CheckpointRequestDecider::logInQueueTime);
return request;
}
/**
* Choose the next {@link CheckpointTriggerRequest request} to execute based on the provided
* candidate and the current state. Acquires a lock and may update the state.
*
* @return request that should be executed
*/
private Optional<CheckpointTriggerRequest> chooseRequestToExecute(
boolean isTriggering, long lastCompletionMs) {
if (isTriggering
|| queuedRequests.isEmpty()
|| numberOfCleaningCheckpointsSupplier.getAsInt()
> maxConcurrentCheckpointAttempts) {
return Optional.empty();
}
if (pendingCheckpointsSizeSupplier.getAsInt() >= maxConcurrentCheckpointAttempts) {
return Optional.of(queuedRequests.first())
.filter(CheckpointTriggerRequest::isForce)
.map(unused -> queuedRequests.pollFirst());
}
CheckpointTriggerRequest first = queuedRequests.first();
if (!first.isForce() && first.isPeriodic) {
long currentRelativeTime = clock.relativeTimeMillis();
long nextTriggerDelayMillis =
lastCompletionMs - currentRelativeTime + minPauseBetweenCheckpoints;
if (nextTriggerDelayMillis > 0) {
queuedRequests
.pollFirst()
.completeExceptionally(
new CheckpointException(MINIMUM_TIME_BETWEEN_CHECKPOINTS));
rescheduleTrigger.accept(currentRelativeTime, nextTriggerDelayMillis);
return Optional.empty();
}
}
return Optional.of(queuedRequests.pollFirst());
}
@VisibleForTesting
@Deprecated
PriorityQueue<CheckpointTriggerRequest> getTriggerRequestQueue() {
return new PriorityQueue<>(queuedRequests);
}
void abortAll(CheckpointException exception) {
while (!queuedRequests.isEmpty()) {
queuedRequests.pollFirst().completeExceptionally(exception);
}
}
int getNumQueuedRequests() {
return queuedRequests.size();
}
private static Comparator<CheckpointTriggerRequest> checkpointTriggerRequestsComparator() {
return (r1, r2) -> {
if (r1.props.isSavepoint() != r2.props.isSavepoint()) {
return r1.props.isSavepoint() ? -1 : 1;
} else if (r1.isForce() != r2.isForce()) {
return r1.isForce() ? -1 : 1;
} else if (r1.isPeriodic != r2.isPeriodic) {
return r1.isPeriodic ? 1 : -1;
} else if (r1.timestamp != r2.timestamp) {
return Long.compare(r1.timestamp, r2.timestamp);
} else {
return Integer.compare(identityHashCode(r1), identityHashCode(r2));
}
};
}
private static void logInQueueTime(CheckpointTriggerRequest request) {
if (LOG.isInfoEnabled()) {
long timeInQueue = currentTimeMillis() - request.timestamp;
if (timeInQueue > LOG_TIME_IN_QUEUE_THRESHOLD_MS) {
LOG.info("checkpoint request time in queue: {}", timeInQueue);
}
}
}
}