blob: 44df5c3ff646f12e639cabd833b028189f4e91a6 [file] [log] [blame]
* Licensed to the Apache Software Foundation (ASF) under one
* or more contributor license agreements. See the NOTICE file
* distributed with this work for additional information
* regarding copyright ownership. The ASF licenses this file
* to you under the Apache License, Version 2.0 (the
* "License"); you may not use this file except in compliance
* with the License. You may obtain a copy of the License at
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* See the License for the specific language governing permissions and
* limitations under the License.
package org.apache.ratis.client.impl;
import org.apache.ratis.client.RaftClient;
import org.apache.ratis.client.RaftClientConfigKeys;
import org.apache.ratis.client.RaftClientRpc;
import org.apache.ratis.conf.RaftProperties;
import org.apache.ratis.protocol.*;
import org.apache.ratis.shaded.proto.RaftProtos.ReplicationLevel;
import org.apache.ratis.util.*;
import java.util.Arrays;
import java.util.Collection;
import java.util.Objects;
import java.util.concurrent.*;
import java.util.concurrent.atomic.AtomicLong;
import java.util.function.Function;
import java.util.function.LongFunction;
import java.util.function.Supplier;
import static org.apache.ratis.shaded.proto.RaftProtos.RaftClientRequestProto.TypeCase.STALEREAD;
/** A client who sends requests to a raft service. */
final class RaftClientImpl implements RaftClient {
private static final AtomicLong callIdCounter = new AtomicLong();
private static long nextCallId() {
return callIdCounter.getAndIncrement() & Long.MAX_VALUE;
static class PendingAsyncRequest implements SlidingWindow.Request<RaftClientReply> {
private final long seqNum;
private final LongFunction<RaftClientRequest> requestConstructor;
private final CompletableFuture<RaftClientReply> replyFuture = new CompletableFuture<>();
PendingAsyncRequest(long seqNum, LongFunction<RaftClientRequest> requestConstructor) {
this.seqNum = seqNum;
this.requestConstructor = requestConstructor;
RaftClientRequest newRequest() {
return requestConstructor.apply(seqNum);
public long getSeqNum() {
return seqNum;
public boolean hasReply() {
return replyFuture.isDone();
public void setReply(RaftClientReply reply) {
CompletableFuture<RaftClientReply> getReplyFuture() {
return replyFuture;
public String toString() {
return "[seq=" + getSeqNum() + "]";
private final ClientId clientId;
private final RaftClientRpc clientRpc;
private final Collection<RaftPeer> peers;
private final RaftGroupId groupId;
private final TimeDuration retryInterval;
private volatile RaftPeerId leaderId;
/** Map: id -> {@link SlidingWindow}, in order to support async calls to the RAFT service or individual servers. */
private final ConcurrentMap<String, SlidingWindow.Client<PendingAsyncRequest, RaftClientReply>>
slidingWindows = new ConcurrentHashMap<>();
private final ScheduledExecutorService scheduler;
private final Semaphore asyncRequestSemaphore;
RaftClientImpl(ClientId clientId, RaftGroup group, RaftPeerId leaderId,
RaftClientRpc clientRpc, RaftProperties properties) {
this.clientId = clientId;
this.clientRpc = clientRpc;
this.peers = new ConcurrentLinkedQueue<>(group.getPeers());
this.groupId = group.getGroupId();
this.leaderId = leaderId != null? leaderId
: !peers.isEmpty()? peers.iterator().next().getId(): null;
this.retryInterval = RaftClientConfigKeys.Rpc.retryInterval(properties);
asyncRequestSemaphore = new Semaphore(RaftClientConfigKeys.Async.maxOutstandingRequests(properties));
scheduler = Executors.newScheduledThreadPool(RaftClientConfigKeys.Async.schedulerThreads(properties));
public ClientId getId() {
return clientId;
private SlidingWindow.Client<PendingAsyncRequest, RaftClientReply> getSlidingWindow(RaftClientRequest request) {
return getSlidingWindow( request.getServerId(): null);
private SlidingWindow.Client<PendingAsyncRequest, RaftClientReply> getSlidingWindow(RaftPeerId target) {
final String id = target != null? target.toString(): "RAFT";
return slidingWindows.computeIfAbsent(id, key -> new SlidingWindow.Client<>(getId() + "->" + key));
public CompletableFuture<RaftClientReply> sendAsync(Message message, ReplicationLevel replication) {
return sendAsync(RaftClientRequest.writeRequestType(replication), message, null);
public CompletableFuture<RaftClientReply> sendReadOnlyAsync(Message message) {
return sendAsync(RaftClientRequest.readRequestType(), message, null);
public CompletableFuture<RaftClientReply> sendStaleReadAsync(Message message, long minIndex, RaftPeerId server) {
return sendAsync(RaftClientRequest.staleReadRequestType(minIndex), message, server);
private CompletableFuture<RaftClientReply> sendAsync(
RaftClientRequest.Type type, Message message, RaftPeerId server) {
Objects.requireNonNull(message, "message == null");
try {
} catch (InterruptedException e) {
throw new CompletionException(IOUtils.toInterruptedIOException(
"Interrupted when sending " + message, e));
final long callId = nextCallId();
final LongFunction<PendingAsyncRequest> constructor = seqNum -> new PendingAsyncRequest(seqNum,
seq -> newRaftClientRequest(server, callId, seq, message, type));
return getSlidingWindow(server).submitNewRequest(constructor, this::sendRequestWithRetryAsync
).thenApply(reply -> handleStateMachineException(reply, CompletionException::new)
).whenComplete((r, e) -> asyncRequestSemaphore.release());
private RaftClientRequest newRaftClientRequest(
RaftPeerId server, long callId, long seq, Message message, RaftClientRequest.Type type) {
return new RaftClientRequest(clientId, server != null? server: leaderId, groupId,
callId, seq, message, type);
public RaftClientReply send(Message message, ReplicationLevel replication) throws IOException {
return send(RaftClientRequest.writeRequestType(replication), message, null);
public RaftClientReply sendReadOnly(Message message) throws IOException {
return send(RaftClientRequest.readRequestType(), message, null);
public RaftClientReply sendStaleRead(Message message, long minIndex, RaftPeerId server)
throws IOException {
return send(RaftClientRequest.staleReadRequestType(minIndex), message, server);
private RaftClientReply send(RaftClientRequest.Type type, Message message, RaftPeerId server)
throws IOException {
Objects.requireNonNull(message, "message == null");
final long callId = nextCallId();
return sendRequestWithRetry(() -> newRaftClientRequest(
server, callId, 0L, message, type));
public RaftClientReply setConfiguration(RaftPeer[] peersInNewConf)
throws IOException {
Objects.requireNonNull(peersInNewConf, "peersInNewConf == null");
final long callId = nextCallId();
// also refresh the rpc proxies for these peers
return sendRequestWithRetry(() -> new SetConfigurationRequest(
clientId, leaderId, groupId, callId, peersInNewConf));
public RaftClientReply reinitialize(RaftGroup newGroup, RaftPeerId server)
throws IOException {
Objects.requireNonNull(newGroup, "newGroup == null");
Objects.requireNonNull(server, "server == null");
final long callId = nextCallId();
return sendRequest(new ReinitializeRequest(
clientId, server, groupId, callId, newGroup));
public RaftClientReply serverInformation(RaftPeerId server)
throws IOException {
Objects.requireNonNull(server, "server == null");
return sendRequest(new ServerInformationRequest(clientId, server,
groupId, nextCallId()));
private void addServers(Stream<RaftPeer> peersInNewConf) {
peersInNewConf.filter(p -> !peers.contains(p))::iterator);
private CompletableFuture<RaftClientReply> sendRequestWithRetryAsync(
PendingAsyncRequest pending) {
final RaftClientRequest request = pending.newRequest();
final CompletableFuture<RaftClientReply> f = pending.getReplyFuture();
return sendRequestAsync(request).thenCompose(reply -> {
if (reply == null) {
final TimeUnit unit = retryInterval.getUnit();
scheduler.schedule(() -> getSlidingWindow(request).retry(pending, this::sendRequestWithRetryAsync),
retryInterval.toLong(unit), unit);
} else {
return f;
private RaftClientReply sendRequestWithRetry(
Supplier<RaftClientRequest> supplier)
throws InterruptedIOException, StateMachineException, GroupMismatchException {
for(;;) {
final RaftClientRequest request = supplier.get();
final RaftClientReply reply = sendRequest(request);
if (reply != null) {
return reply;
// sleep and then retry
try {
} catch (InterruptedException ie) {
throw IOUtils.toInterruptedIOException(
"Interrupted when sending " + request, ie);
private CompletableFuture<RaftClientReply> sendRequestAsync(
RaftClientRequest request) {
LOG.debug("{}: send* {}", clientId, request);
return clientRpc.sendRequestAsync(request).thenApply(reply -> {
LOG.debug("{}: receive* {}", clientId, reply);
reply = handleNotLeaderException(request, reply);
if (reply != null) {
request.getSeqNum(), reply, this::sendRequestWithRetryAsync);
return reply;
}).exceptionally(e -> {
LOG.debug("{}: Failed {} with {}", clientId, request, e);
e = JavaUtils.unwrapCompletionException(e);
if (e instanceof GroupMismatchException) {
throw new CompletionException(e);
} else if (e instanceof IOException) {
handleIOException(request, (IOException)e, null);
} else {
throw new CompletionException(e);
return null;
private RaftClientReply sendRequest(RaftClientRequest request)
throws StateMachineException, GroupMismatchException {
LOG.debug("{}: send {}", clientId, request);
RaftClientReply reply = null;
try {
reply = clientRpc.sendRequest(request);
} catch (GroupMismatchException gme) {
throw gme;
} catch (IOException ioe) {
handleIOException(request, ioe, null);
LOG.debug("{}: receive {}", clientId, reply);
reply = handleNotLeaderException(request, reply);
reply = handleStateMachineException(reply, Function.identity());
return reply;
static <E extends Throwable> RaftClientReply handleStateMachineException(
RaftClientReply reply, Function<StateMachineException, E> converter) throws E {
if (reply != null) {
final StateMachineException sme = reply.getStateMachineException();
if (sme != null) {
throw converter.apply(sme);
return reply;
* @return null if the reply is null or it has {@link NotLeaderException};
* otherwise return the same reply.
private RaftClientReply handleNotLeaderException(RaftClientRequest request, RaftClientReply reply) {
if (reply == null) {
return null;
final NotLeaderException nle = reply.getNotLeaderException();
if (nle == null) {
return reply;
final RaftPeerId newLeader = nle.getSuggestedLeader() == null ? null
: nle.getSuggestedLeader().getId();
handleIOException(request, nle, newLeader);
return null;
private void refreshPeers(Collection<RaftPeer> newPeers) {
if (newPeers != null && newPeers.size() > 0) {
// also refresh the rpc proxies for these peers
private void handleIOException(RaftClientRequest request, IOException ioe,
RaftPeerId newLeader) {
LOG.debug("{}: suggested new leader: {}. Failed {} with {}",
clientId, newLeader, request, ioe);
if (LOG.isTraceEnabled()) {
LOG.trace("Stack trace", new Throwable("TRACE"));
if (ioe instanceof LeaderNotReadyException) {
final RaftPeerId oldLeader = request.getServerId();
final boolean stillLeader = oldLeader.equals(leaderId);
if (newLeader == null && stillLeader) {
newLeader = CollectionUtils.random(oldLeader,, RaftPeer::getId));
final boolean changeLeader = newLeader != null && stillLeader;
if (changeLeader) {
LOG.debug("{}: change Leader from {} to {}", clientId, oldLeader, newLeader);
this.leaderId = newLeader;
clientRpc.handleException(oldLeader, ioe, changeLeader);
void assertAsyncRequestSemaphore(int expectedAvailablePermits, int expectedQueueLength) {
Preconditions.assertTrue(asyncRequestSemaphore.availablePermits() == expectedAvailablePermits);
Preconditions.assertTrue(asyncRequestSemaphore.getQueueLength() == expectedQueueLength);
void assertScheduler(int numThreads) {
Preconditions.assertTrue(((ScheduledThreadPoolExecutor) scheduler).getCorePoolSize() == numThreads);
long getCallId() {
return callIdCounter.get();
public RaftClientRpc getClientRpc() {
return clientRpc;
public void close() throws IOException {