blob: 4d3b985d98729970121ad1550f06fa3ead3f7c87 [file] [log] [blame]
/*
* Licensed to the Apache Software Foundation (ASF) under one
* or more contributor license agreements. See the NOTICE file
* distributed with this work for additional information
* regarding copyright ownership. The ASF licenses this file
* to you under the Apache License, Version 2.0 (the
* "License"); you may not use this file except in compliance
* with the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/
package org.apache.ratis.server.impl;
import edu.umd.cs.findbugs.annotations.SuppressFBWarnings;
import org.apache.ratis.conf.RaftProperties;
import org.apache.ratis.proto.RaftProtos.ReplicationLevel;
import org.apache.ratis.proto.RaftProtos.WatchRequestTypeProto;
import org.apache.ratis.protocol.exceptions.NotReplicatedException;
import org.apache.ratis.protocol.RaftClientRequest;
import org.apache.ratis.protocol.exceptions.ResourceUnavailableException;
import org.apache.ratis.server.RaftServerConfigKeys;
import org.apache.ratis.util.*;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
import java.util.Arrays;
import java.util.Comparator;
import java.util.EnumMap;
import java.util.Map;
import java.util.SortedMap;
import java.util.TreeMap;
import java.util.concurrent.CompletableFuture;
import java.util.concurrent.TimeUnit;
import java.util.function.Supplier;
class WatchRequests {
public static final Logger LOG = LoggerFactory.getLogger(WatchRequests.class);
static class PendingWatch {
private final WatchRequestTypeProto watch;
private final Timestamp creationTime;
private final Supplier<CompletableFuture<Void>> future = JavaUtils.memoize(CompletableFuture::new);
PendingWatch(WatchRequestTypeProto watch, Timestamp creationTime) {
this.watch = watch;
this.creationTime = creationTime;
}
CompletableFuture<Void> getFuture() {
return future.get();
}
long getIndex() {
return watch.getIndex();
}
Timestamp getCreationTime() {
return creationTime;
}
@Override
public String toString() {
return RaftClientRequest.Type.toString(watch) + "@" + creationTime
+ "?" + StringUtils.completableFuture2String(future.get(), true);
}
}
private class WatchQueue {
private final ReplicationLevel replication;
private final SortedMap<PendingWatch, PendingWatch> q = new TreeMap<>(
Comparator.comparingLong(PendingWatch::getIndex).thenComparing(PendingWatch::getCreationTime));
private final ResourceSemaphore resource;
private volatile long index; //Invariant: q.isEmpty() or index < any element q
WatchQueue(ReplicationLevel replication, int elementLimit) {
this.replication = replication;
this.resource = new ResourceSemaphore(elementLimit);
}
long getIndex() {
return index;
}
CompletableFuture<Void> add(RaftClientRequest request) {
final long currentTime = Timestamp.currentTimeNanos();
final long roundUp = watchTimeoutDenominationNanos.roundUpNanos(currentTime);
final PendingWatch pending = new PendingWatch(request.getType().getWatch(), Timestamp.valueOf(roundUp));
final PendingWatch computed;
synchronized (this) {
if (pending.getIndex() <= getIndex()) { // compare again synchronized
// watch condition already satisfied
return null;
}
computed = q.compute(pending, (key, old) -> old != null? old: resource.tryAcquire()? pending: null);
}
if (computed == null) {
// failed to acquire
return JavaUtils.completeExceptionally(new ResourceUnavailableException(
"Failed to acquire a pending watch request in " + name + " for " + request));
}
if (computed != pending) {
// already exists in q
return computed.getFuture();
}
// newly added to q
final TimeDuration timeout = watchTimeoutNanos.apply(duration -> duration + roundUp - currentTime);
scheduler.onTimeout(timeout, () -> handleTimeout(request, pending),
LOG, () -> name + ": Failed to timeout " + request);
return pending.getFuture();
}
void handleTimeout(RaftClientRequest request, PendingWatch pending) {
if (removeExisting(pending)) {
pending.getFuture().completeExceptionally(
new NotReplicatedException(request.getCallId(), replication, pending.getIndex()));
LOG.debug("{}: timeout {}, {}", name, pending, request);
}
}
synchronized boolean removeExisting(PendingWatch pending) {
final PendingWatch removed = q.remove(pending);
if (removed == null) {
return false;
}
Preconditions.assertTrue(removed == pending);
resource.release();
return true;
}
@SuppressFBWarnings("NP_NULL_PARAM_DEREF")
synchronized void updateIndex(final long newIndex) {
if (newIndex <= getIndex()) { // compare again synchronized
return;
}
LOG.debug("{}: update {} index from {} to {}", name, replication, index, newIndex);
index = newIndex;
for(; !q.isEmpty();) {
final PendingWatch first = q.firstKey();
if (first.getIndex() > newIndex) {
return;
}
final boolean removed = removeExisting(first);
Preconditions.assertTrue(removed);
LOG.debug("{}: complete {}", name, first);
first.getFuture().complete(null);
}
}
synchronized void failAll(Exception e) {
for(PendingWatch pending : q.values()) {
pending.getFuture().completeExceptionally(e);
}
q.clear();
resource.close();
}
}
private final String name;
private final Map<ReplicationLevel, WatchQueue> queues = new EnumMap<>(ReplicationLevel.class);
private final TimeDuration watchTimeoutNanos;
private final TimeDuration watchTimeoutDenominationNanos;
private final TimeoutScheduler scheduler = TimeoutScheduler.getInstance();
WatchRequests(Object name, RaftProperties properties) {
this.name = name + "-" + JavaUtils.getClassSimpleName(getClass());
final TimeDuration watchTimeout = RaftServerConfigKeys.Watch.timeout(properties);
this.watchTimeoutNanos = watchTimeout.to(TimeUnit.NANOSECONDS);
final TimeDuration watchTimeoutDenomination = RaftServerConfigKeys.Watch.timeoutDenomination(properties);
this.watchTimeoutDenominationNanos = watchTimeoutDenomination.to(TimeUnit.NANOSECONDS);
Preconditions.assertTrue(watchTimeoutNanos.getDuration() % watchTimeoutDenominationNanos.getDuration() == 0L,
() -> "watchTimeout (=" + watchTimeout + ") is not a multiple of watchTimeoutDenomination (="
+ watchTimeoutDenomination + ").");
final int elementLimit = RaftServerConfigKeys.Watch.elementLimit(properties);
Arrays.stream(ReplicationLevel.values()).forEach(r -> queues.put(r, new WatchQueue(r, elementLimit)));
}
@SuppressFBWarnings("NP_NULL_PARAM_DEREF")
CompletableFuture<Void> add(RaftClientRequest request) {
final WatchRequestTypeProto watch = request.getType().getWatch();
final WatchQueue queue = queues.get(watch.getReplication());
if (watch.getIndex() > queue.getIndex()) { // compare without synchronization
final CompletableFuture<Void> future = queue.add(request);
if (future != null) {
return future;
}
}
return CompletableFuture.completedFuture(null);
}
void update(ReplicationLevel replication, final long newIndex) {
final WatchQueue queue = queues.get(replication);
if (newIndex > queue.getIndex()) { // compare without synchronization
queue.updateIndex(newIndex);
}
}
void failWatches(Exception e) {
queues.values().forEach(q -> q.failAll(e));
}
}