blob: 7adc7a7210a796c1342a7bbb0dbda82fca50723d [file] [log] [blame]
/*
* Licensed to the Apache Software Foundation (ASF) under one
* or more contributor license agreements. See the NOTICE file
* distributed with this work for additional information
* regarding copyright ownership. The ASF licenses this file
* to you under the Apache License, Version 2.0 (the
* "License"); you may not use this file except in compliance
* with the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/
package org.apache.ratis.util;
import org.apache.ratis.protocol.exceptions.AlreadyClosedException;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
import java.io.Closeable;
import java.util.ArrayList;
import java.util.Iterator;
import java.util.List;
import java.util.SortedMap;
import java.util.TreeMap;
import java.util.concurrent.ConcurrentSkipListMap;
import java.util.concurrent.TimeUnit;
import java.util.function.Consumer;
import java.util.function.LongFunction;
/**
* A single-client-to-multiple-server sliding window.
* The client only talks to a server at any time.
* When the current server fails, the client fails over to another server.
*/
public interface SlidingWindow {
Logger LOG = LoggerFactory.getLogger(SlidingWindow.class);
static String getName(Class<?> clazz, Object name) {
return JavaUtils.getClassSimpleName(SlidingWindow.class) + "$" + JavaUtils.getClassSimpleName(clazz) + ":" + name;
}
interface Request<REPLY> {
long getSeqNum();
void setReply(REPLY reply);
boolean hasReply();
void fail(Throwable e);
default void release() {
}
}
interface ClientSideRequest<REPLY> extends Request<REPLY> {
void setFirstRequest();
}
interface ServerSideRequest<REPLY> extends Request<REPLY> {
boolean isFirstRequest();
}
/** A seqNum-to-request map, sorted by seqNum. */
class RequestMap<REQUEST extends Request<REPLY>, REPLY> implements Iterable<REQUEST> {
private final Object name;
/** Request map: seqNum -> request */
private final SortedMap<Long, REQUEST> requests = new ConcurrentSkipListMap<>();
RequestMap(Object name) {
this.name = name;
if (LOG.isDebugEnabled()) {
JavaUtils.runRepeatedly(this::log, 5, 10, TimeUnit.SECONDS);
}
}
Object getName() {
return name;
}
boolean isEmpty() {
return requests.isEmpty();
}
private REQUEST get(long seqNum) {
return requests.get(seqNum);
}
/**
* If the request with the given seqNum is non-replied, return it.
* Otherwise, return null.
*
* A request is non-replied if
* (1) it is in the request map, and
* (2) it does not has reply.
*/
REQUEST getNonRepliedRequest(long seqNum, String op) {
final REQUEST request = get(seqNum);
if (request == null) {
LOG.debug("{}: {}, seq={} not found in {}", getName(), op, seqNum, this);
return null;
}
if (request.hasReply()) {
LOG.debug("{}: {}, seq={} already has replied in {}", getName(), op, seqNum, this);
return null;
}
return request;
}
long firstSeqNum() {
return requests.firstKey();
}
long lastSeqNum() {
return requests.lastKey();
}
/** Iterate the requests in the order of seqNum. */
@Override
public Iterator<REQUEST> iterator() {
return requests.values().iterator();
}
/** @return true iff the request already exists. */
boolean putIfAbsent(REQUEST request) {
final long seqNum = request.getSeqNum();
final REQUEST previous = requests.putIfAbsent(seqNum, request);
return previous != null;
}
void putNewRequest(REQUEST request) {
final long seqNum = request.getSeqNum();
CollectionUtils.putNew(seqNum, request, requests, () -> getName() + ":requests");
}
/**
* Set reply for the request with the given seqNum if it is non-replied.
* Otherwise, do nothing.
*
* @return true iff this method does set the reply for the request.
*/
boolean setReply(long seqNum, REPLY reply) {
final REQUEST request = getNonRepliedRequest(seqNum, "setReply");
if (request == null) {
LOG.debug("{}: DUPLICATED reply {} for seq={} in {}", getName(), reply, seqNum, this);
return false;
}
LOG.debug("{}: set reply {} for seq={} in {}", getName(), reply, seqNum, this);
request.setReply(reply);
return true;
}
void endOfRequests(long nextToProcess, REQUEST end, Consumer<REQUEST> replyMethod) {
final REQUEST nextToProcessRequest = requests.get(nextToProcess);
Preconditions.assertNull(nextToProcessRequest,
() -> "nextToProcessRequest = " + nextToProcessRequest + " != null, nextToProcess = " + nextToProcess);
final SortedMap<Long, REQUEST> tail = requests.tailMap(nextToProcess);
for (REQUEST r : tail.values()) {
final AlreadyClosedException e = new AlreadyClosedException(
getName() + " is closing: seq = " + r.getSeqNum() + " > nextToProcess = " + nextToProcess
+ " will NEVER be processed; request = " + r);
r.fail(e);
replyMethod.accept(r);
}
tail.clear();
putNewRequest(end);
}
void clear(long nextToProcess) {
LOG.debug("close {}", this);
final SortedMap<Long, REQUEST> tail = requests.tailMap(nextToProcess);
for (REQUEST r : tail.values()) {
r.release();
}
requests.clear();
}
void log() {
LOG.debug(this.toString());
for(REQUEST r : requests.values()) {
LOG.debug(" {}: hasReply? {}", r.getSeqNum(), r.hasReply());
}
}
@Override
public String toString() {
return getName() + ": requests" + asString(requests);
}
private static String asString(SortedMap<Long, ?> map) {
return map.isEmpty()? "[]": "[" + map.firstKey() + ".." + map.lastKey() + "]";
}
}
class DelayedRequests {
private final SortedMap<Long, Long> sorted = new TreeMap<>();
synchronized Long put(Long seqNum) {
return sorted.put(seqNum, seqNum);
}
synchronized boolean containsKey(long seqNum) {
return sorted.containsKey(seqNum);
}
synchronized List<Long> getAllAndClear() {
final List<Long> keys = new ArrayList<>(sorted.keySet());
sorted.clear();
return keys;
}
synchronized Long remove(long seqNum) {
return sorted.remove(seqNum);
}
@Override
public synchronized String toString() {
return "" + sorted.keySet();
}
}
/**
* Client side sliding window.
* A client may
* (1) allocate seqNum for new requests;
* (2) send requests/retries to the server;
* (3) receive replies/exceptions from the server;
* (4) return the replies/exceptions to client.
*
* Depend on the replies/exceptions, the client may retry the requests
* to the same or a different server.
*/
class Client<REQUEST extends ClientSideRequest<REPLY>, REPLY> {
/** The requests in the sliding window. */
private final RequestMap<REQUEST, REPLY> requests;
/** Delayed requests. */
private final DelayedRequests delayedRequests = new DelayedRequests();
/** The seqNum for the next new request. */
private long nextSeqNum = 1;
/** The seqNum of the first request. */
private long firstSeqNum = -1;
/** Is the first request replied? */
private boolean firstReplied;
/** The exception, if there is any. */
private Throwable exception;
public Client(Object name) {
this.requests = new RequestMap<REQUEST, REPLY>(getName(getClass(), name)) {
@Override
void log() {
if (LOG.isDebugEnabled()) {
logDebug();
}
}
synchronized void logDebug() {
LOG.debug(super.toString());
for (REQUEST r : requests) {
LOG.debug(" {}: {}", r.getSeqNum(), r.hasReply() ? "replied"
: delayedRequests.containsKey(r.getSeqNum()) ? "delayed" : "submitted");
}
}
};
}
@Override
public synchronized String toString() {
return requests + ", nextSeqNum=" + nextSeqNum
+ ", firstSubmitted=" + firstSeqNum + ", replied? " + firstReplied
+ ", delayed=" + delayedRequests;
}
/**
* A new request arrives, create it with {@link #nextSeqNum}
* and then try sending it to the server.
*
* @param requestConstructor use seqNum to create a new request.
* @return the new request.
*/
public synchronized REQUEST submitNewRequest(
LongFunction<REQUEST> requestConstructor, Consumer<REQUEST> sendMethod) {
if (!requests.isEmpty()) {
Preconditions.assertTrue(nextSeqNum == requests.lastSeqNum() + 1,
() -> "nextSeqNum=" + nextSeqNum + " but " + this);
}
final long seqNum = nextSeqNum++;
final REQUEST r = requestConstructor.apply(seqNum);
if (exception != null) {
alreadyClosed(r, exception);
return r;
}
requests.putNewRequest(r);
final boolean submitted = sendOrDelayRequest(r, sendMethod);
LOG.debug("{}: submitting a new request {} in {}? {}",
requests.getName(), r, this, submitted? "submitted": "delayed");
return r;
}
private boolean sendOrDelayRequest(REQUEST request, Consumer<REQUEST> sendMethod) {
final long seqNum = request.getSeqNum();
Preconditions.assertTrue(requests.getNonRepliedRequest(seqNum, "sendOrDelayRequest") == request);
if (firstReplied) {
// already received the reply for the first request, submit any request.
sendMethod.accept(request);
return true;
}
if (firstSeqNum == -1 && seqNum == requests.firstSeqNum()) {
// first request is not yet submitted and this is the first request, submit it.
LOG.debug("{}: detect firstSubmitted {} in {}", requests.getName(), request, this);
firstSeqNum = seqNum;
request.setFirstRequest();
sendMethod.accept(request);
return true;
}
// delay other requests
CollectionUtils.putNew(seqNum, delayedRequests::put, () -> requests.getName() + ":delayedRequests");
return false;
}
/** Receive a retry from an existing request (may out-of-order). */
public synchronized void retry(REQUEST request, Consumer<REQUEST> sendMethod) {
if (requests.getNonRepliedRequest(request.getSeqNum(), "retry") != request) {
// out-dated or invalid retry
LOG.debug("{}: Ignore retry {} in {}", requests.getName(), request, this);
return;
}
final boolean submitted = sendOrDelayRequest(request, sendMethod);
LOG.debug("{}: submitting a retry {} in {}? {}",
requests.getName(), request, this, submitted? "submitted": "delayed");
}
private void removeRepliedFromHead() {
for (final Iterator<REQUEST> i = requests.iterator(); i.hasNext(); i.remove()) {
final REQUEST r = i.next();
if (!r.hasReply()) {
return;
}
}
}
/**
* Receive a reply with the given seqNum (may out-of-order).
* It may trigger the client to send delayed requests.
*/
public synchronized void receiveReply(
long seqNum, REPLY reply, Consumer<REQUEST> sendMethod) {
if (!requests.setReply(seqNum, reply)) {
return; // request already replied
}
if (seqNum == firstSeqNum) {
firstReplied = true; // received the reply for the first submitted request
}
removeRepliedFromHead();
trySendDelayed(sendMethod);
}
private void trySendDelayed(Consumer<REQUEST> sendMethod) {
if (firstReplied) {
// after first received, all other requests can be submitted (out-of-order)
delayedRequests.getAllAndClear().forEach(
seqNum -> sendMethod.accept(requests.getNonRepliedRequest(seqNum, "trySendDelayed")));
} else {
// Otherwise, submit the first only if it is a delayed request
final Iterator<REQUEST> i = requests.iterator();
if (i.hasNext()) {
final REQUEST r = i.next();
final Long delayed = delayedRequests.remove(r.getSeqNum());
if (delayed != null) {
sendOrDelayRequest(r, sendMethod);
}
}
}
}
/** Reset the {@link #firstSeqNum} The stream has an error. */
public synchronized void resetFirstSeqNum() {
firstSeqNum = -1;
firstReplied = false;
LOG.debug("After resetFirstSeqNum: {}", this);
}
/** Fail all requests starting from the given seqNum. */
public synchronized void fail(final long startingSeqNum, Throwable e) {
exception = e;
boolean handled = false;
for(long i = startingSeqNum; i <= requests.lastSeqNum(); i++) {
final REQUEST request = requests.getNonRepliedRequest(i, "fail");
if (request != null) {
if (request.getSeqNum() == startingSeqNum) {
request.fail(e);
} else {
alreadyClosed(request, e);
}
handled = true;
}
}
if (handled) {
removeRepliedFromHead();
}
}
private void alreadyClosed(REQUEST request, Throwable e) {
request.fail(new AlreadyClosedException(requests.getName() + " is closed.", e));
}
public synchronized boolean isFirst(long seqNum) {
return seqNum == (firstSeqNum != -1 ? firstSeqNum : requests.firstSeqNum());
}
}
/**
* Server side sliding window.
* A server may
* (1) receive requests from client;
* (2) submit the requests for processing;
* (3) receive replies from the processing unit;
* (4) send replies to the client.
*/
class Server<REQUEST extends ServerSideRequest<REPLY>, REPLY> implements Closeable {
/** The requests in the sliding window. */
private final RequestMap<REQUEST, REPLY> requests;
/** The end of requests */
private final REQUEST end;
private long nextToProcess = -1;
public Server(Object name, REQUEST end) {
this.requests = new RequestMap<>(getName(getClass(), name));
this.end = end;
Preconditions.assertTrue(end.getSeqNum() == Long.MAX_VALUE);
}
@Override
public synchronized String toString() {
return requests + ", nextToProcess=" + nextToProcess;
}
/** A request (or a retry) arrives (may be out-of-order except for the first request). */
public synchronized void receivedRequest(REQUEST request, Consumer<REQUEST> processingMethod) {
final long seqNum = request.getSeqNum();
if (nextToProcess == -1 && (request.isFirstRequest() || seqNum == 0)) {
nextToProcess = seqNum;
requests.putNewRequest(request);
LOG.debug("Received seq={} (first request), {}", seqNum, this);
} else {
final boolean isRetry = requests.putIfAbsent(request);
LOG.debug("Received seq={}, isRetry? {}, {}", seqNum, isRetry, this);
if (isRetry) {
return;
}
}
processRequestsFromHead(processingMethod);
}
private void processRequestsFromHead(Consumer<REQUEST> processingMethod) {
for(REQUEST r : requests) {
if (r.getSeqNum() > nextToProcess) {
return;
} else if (r.getSeqNum() == nextToProcess) {
processingMethod.accept(r);
r.release();
nextToProcess++;
}
}
}
/**
* Receives a reply for the given seqNum (may out-of-order) from the processor.
* It may trigger sending replies to client.
*/
public synchronized void receiveReply(long seqNum, REPLY reply, Consumer<REQUEST> replyMethod) {
if (!requests.setReply(seqNum, reply)) {
return; // request already replied
}
sendRepliesFromHead(replyMethod);
}
private void sendRepliesFromHead(Consumer<REQUEST> replyMethod) {
for(final Iterator<REQUEST> i = requests.iterator(); i.hasNext(); i.remove()) {
final REQUEST r = i.next();
if (!r.hasReply()) {
return;
}
replyMethod.accept(r);
if (r == end) {
return;
}
}
}
/**
* Signal the end of requests.
* @return true if no more outstanding requests.
*/
public synchronized boolean endOfRequests(Consumer<REQUEST> replyMethod) {
if (requests.isEmpty()) {
return true;
}
LOG.debug("{}: put end-of-request in {}", requests.getName(), this);
requests.endOfRequests(nextToProcess, end, replyMethod);
return false;
}
@Override
public void close() {
requests.clear(nextToProcess);
}
}
}