RATIS-2012. Client should not retry after close. (#1025)
diff --git a/ratis-client/src/main/java/org/apache/ratis/client/impl/BlockingImpl.java b/ratis-client/src/main/java/org/apache/ratis/client/impl/BlockingImpl.java
index 4be9fa3..7698780 100644
--- a/ratis-client/src/main/java/org/apache/ratis/client/impl/BlockingImpl.java
+++ b/ratis-client/src/main/java/org/apache/ratis/client/impl/BlockingImpl.java
@@ -119,16 +119,18 @@
ioe = e;
}
- pending.incrementExceptionCount(ioe);
- ClientRetryEvent event = new ClientRetryEvent(request, ioe, pending);
- final RetryPolicy retryPolicy = client.getRetryPolicy();
- final RetryPolicy.Action action = retryPolicy.handleAttemptFailure(event);
- TimeDuration sleepTime = client.getEffectiveSleepTime(ioe, action.getSleepTime());
-
- if (!action.shouldRetry()) {
- throw (IOException)client.noMoreRetries(event);
+ if (client.isClosed()) {
+ throw new AlreadyClosedException(this + " is closed.");
}
+ final ClientRetryEvent event = pending.newClientRetryEvent(request, ioe);
+ final RetryPolicy retryPolicy = client.getRetryPolicy();
+ final RetryPolicy.Action action = retryPolicy.handleAttemptFailure(event);
+ if (!action.shouldRetry()) {
+ throw client.noMoreRetries(event);
+ }
+
+ final TimeDuration sleepTime = client.getEffectiveSleepTime(ioe, action.getSleepTime());
try {
sleepTime.sleep();
} catch (InterruptedException e) {
diff --git a/ratis-client/src/main/java/org/apache/ratis/client/impl/OrderedAsync.java b/ratis-client/src/main/java/org/apache/ratis/client/impl/OrderedAsync.java
index a1aa586..34dc3be 100644
--- a/ratis-client/src/main/java/org/apache/ratis/client/impl/OrderedAsync.java
+++ b/ratis-client/src/main/java/org/apache/ratis/client/impl/OrderedAsync.java
@@ -149,10 +149,6 @@
getSlidingWindow(request).fail(request.getSlidingWindowEntry().getSeqNum(), t);
}
- private void handleAsyncRetryFailure(ClientRetryEvent event) {
- failAllAsyncRequests(event.getRequest(), client.noMoreRetries(event));
- }
-
CompletableFuture<RaftClientReply> send(RaftClientRequest.Type type, Message message, RaftPeerId server) {
if (!type.is(TypeCase.WATCH) && !type.is(TypeCase.MESSAGESTREAM)) {
Objects.requireNonNull(message, "message == null");
@@ -187,85 +183,68 @@
if (pending == null) {
return;
}
-
- final CompletableFuture<RaftClientReply> f = pending.getReplyFuture();
- if (f.isDone()) {
+ if (pending.getReplyFuture().isDone()) {
return;
}
- final RaftClientRequest request = pending.newRequestImpl();
+ final RaftClientRequest request = pending.newRequest();
if (request == null) { // already done
- LOG.debug("{} newRequestImpl returns null", pending);
+ LOG.debug("{} newRequest returns null", pending);
return;
}
- final RetryPolicy retryPolicy = client.getRetryPolicy();
- sendRequest(pending).exceptionally(e -> {
- if (e instanceof CompletionException) {
- e = JavaUtils.unwrapCompletionException(e);
- scheduleWithTimeout(pending, request, retryPolicy, e);
- return null;
- }
- f.completeExceptionally(e);
- return null;
- });
- }
-
- private void scheduleWithTimeout(PendingOrderedRequest pending,
- RaftClientRequest request, RetryPolicy retryPolicy, Throwable e) {
- final int attempt = pending.getAttemptCount();
- final ClientRetryEvent event = new ClientRetryEvent(request, e, pending);
- final TimeDuration sleepTime = client.getEffectiveSleepTime(e,
- retryPolicy.handleAttemptFailure(event).getSleepTime());
- LOG.debug("schedule* attempt #{} with sleep {} and policy {} for {}", attempt, sleepTime, retryPolicy, request);
- scheduleWithTimeout(pending, sleepTime, getSlidingWindow(request));
- }
-
- private void scheduleWithTimeout(PendingOrderedRequest pending, TimeDuration sleepTime,
- SlidingWindow.Client<PendingOrderedRequest, RaftClientReply> slidingWindow) {
- client.getScheduler().onTimeout(sleepTime,
- () -> slidingWindow.retry(pending, this::sendRequestWithRetry),
- LOG, () -> "Failed* to retry " + pending);
- }
-
- private CompletableFuture<RaftClientReply> sendRequest(PendingOrderedRequest pending) {
- final RetryPolicy retryPolicy = client.getRetryPolicy();
- final RaftClientRequest request;
if (getSlidingWindow((RaftPeerId) null).isFirst(pending.getSeqNum())) {
pending.setFirstRequest();
}
- request = pending.newRequest();
LOG.debug("{}: send* {}", client.getId(), request);
- return client.getClientRpc().sendRequestAsync(request).thenApply(reply -> {
+ client.getClientRpc().sendRequestAsync(request).thenAccept(reply -> {
LOG.debug("{}: receive* {}", client.getId(), reply);
Objects.requireNonNull(reply, "reply == null");
client.handleReply(request, reply);
getSlidingWindow(request).receiveReply(
request.getSlidingWindowEntry().getSeqNum(), reply, this::sendRequestWithRetry);
- return reply;
}).exceptionally(e -> {
LOG.error(client.getId() + ": Failed* " + request, e);
- e = JavaUtils.unwrapCompletionException(e);
- if (e instanceof IOException && !(e instanceof GroupMismatchException)) {
- pending.incrementExceptionCount(e);
- final ClientRetryEvent event = new ClientRetryEvent(request, e, pending);
- if (!retryPolicy.handleAttemptFailure(event).shouldRetry()) {
- handleAsyncRetryFailure(event);
- } else {
- if (e instanceof NotLeaderException) {
- NotLeaderException nle = (NotLeaderException)e;
- client.handleNotLeaderException(request, nle, this::resetSlidingWindow);
- } else {
- client.handleIOException(request, (IOException) e, null, this::resetSlidingWindow);
- }
- }
- throw new CompletionException(e);
- }
- failAllAsyncRequests(request, e);
+ handleException(pending, request, e);
return null;
});
}
+ private void handleException(PendingOrderedRequest pending, RaftClientRequest request, Throwable e) {
+ final RetryPolicy retryPolicy = client.getRetryPolicy();
+ if (client.isClosed()) {
+ failAllAsyncRequests(request, new AlreadyClosedException(client + " is closed."));
+ return;
+ }
+
+ e = JavaUtils.unwrapCompletionException(e);
+ if (!(e instanceof IOException) || e instanceof GroupMismatchException) {
+ // non-retryable exceptions
+ failAllAsyncRequests(request, e);
+ return;
+ }
+
+ final ClientRetryEvent event = pending.newClientRetryEvent(request, e);
+ final RetryPolicy.Action action = retryPolicy.handleAttemptFailure(event);
+ if (!action.shouldRetry()) {
+ failAllAsyncRequests(request, client.noMoreRetries(event));
+ return;
+ }
+
+ if (e instanceof NotLeaderException) {
+ client.handleNotLeaderException(request, (NotLeaderException) e, this::resetSlidingWindow);
+ } else {
+ client.handleIOException(request, (IOException) e, null, this::resetSlidingWindow);
+ }
+ final TimeDuration sleepTime = client.getEffectiveSleepTime(e, action.getSleepTime());
+ LOG.debug("schedule* retry with sleep {} for attempt #{} of {}, {}",
+ sleepTime, event.getAttemptCount(), request, retryPolicy);
+ final SlidingWindow.Client<PendingOrderedRequest, RaftClientReply> slidingWindow = getSlidingWindow(request);
+ client.getScheduler().onTimeout(sleepTime,
+ () -> slidingWindow.retry(pending, this::sendRequestWithRetry),
+ LOG, () -> "Failed* to retry " + pending);
+ }
+
void assertRequestSemaphore(int expectedAvailablePermits, int expectedQueueLength) {
Preconditions.assertSame(expectedAvailablePermits, requestSemaphore.availablePermits(), "availablePermits");
Preconditions.assertSame(expectedQueueLength, requestSemaphore.getQueueLength(), "queueLength");
diff --git a/ratis-client/src/main/java/org/apache/ratis/client/impl/RaftClientImpl.java b/ratis-client/src/main/java/org/apache/ratis/client/impl/RaftClientImpl.java
index f423919..1b82709 100644
--- a/ratis-client/src/main/java/org/apache/ratis/client/impl/RaftClientImpl.java
+++ b/ratis-client/src/main/java/org/apache/ratis/client/impl/RaftClientImpl.java
@@ -44,11 +44,13 @@
import org.apache.ratis.thirdparty.com.google.common.cache.Cache;
import org.apache.ratis.thirdparty.com.google.common.cache.CacheBuilder;
import org.apache.ratis.util.CollectionUtils;
+import org.apache.ratis.util.IOUtils;
import org.apache.ratis.util.JavaUtils;
import org.apache.ratis.util.MemoizedSupplier;
import org.apache.ratis.util.Preconditions;
import org.apache.ratis.util.TimeDuration;
import org.apache.ratis.util.TimeoutExecutor;
+import org.apache.ratis.util.Timestamp;
import java.io.IOException;
import java.util.ArrayList;
@@ -65,6 +67,7 @@
import java.util.concurrent.ConcurrentHashMap;
import java.util.concurrent.ConcurrentMap;
import java.util.concurrent.TimeUnit;
+import java.util.concurrent.atomic.AtomicBoolean;
import java.util.concurrent.atomic.AtomicInteger;
import java.util.concurrent.atomic.AtomicReference;
import java.util.function.Consumer;
@@ -79,10 +82,10 @@
.build();
public abstract static class PendingClientRequest {
- private final long creationTimeInMs = System.currentTimeMillis();
+ private final Timestamp creationTime = Timestamp.currentTime();
private final CompletableFuture<RaftClientReply> replyFuture = new CompletableFuture<>();
private final AtomicInteger attemptCount = new AtomicInteger();
- private final Map<Class<?>, Integer> exceptionCount = new ConcurrentHashMap<>();
+ private final Map<Class<?>, Integer> exceptionCounts = new ConcurrentHashMap<>();
public abstract RaftClientRequest newRequestImpl();
@@ -101,19 +104,10 @@
return attemptCount.get();
}
- int incrementExceptionCount(Throwable t) {
- return t != null ? exceptionCount.compute(t.getClass(), (k, v) -> v != null ? v + 1 : 1) : 0;
- }
-
- public int getExceptionCount(Throwable t) {
- return t != null ? Optional.ofNullable(exceptionCount.get(t.getClass())).orElse(0) : 0;
- }
-
- public boolean isRequestTimeout(TimeDuration timeout) {
- if (timeout == null) {
- return false;
- }
- return System.currentTimeMillis() - creationTimeInMs > timeout.toLong(TimeUnit.MILLISECONDS);
+ public ClientRetryEvent newClientRetryEvent(RaftClientRequest request, Throwable throwable) {
+ final int exceptionCount = throwable == null? 0
+ : exceptionCounts.compute(throwable.getClass(), (k, v) -> v == null? 1: v+1);
+ return new ClientRetryEvent(getAttemptCount(), request, exceptionCount, throwable, creationTime);
}
}
@@ -196,6 +190,8 @@
private final ConcurrentMap<RaftPeerId, LeaderElectionManagementApi>
leaderElectionManagement = new ConcurrentHashMap<>();
+ private final AtomicBoolean closed = new AtomicBoolean();
+
@SuppressWarnings("checkstyle:ParameterNumber")
RaftClientImpl(ClientId clientId, RaftGroup group, RaftPeerId leaderId, RaftPeer primaryDataStreamServer,
RaftClientRpc clientRpc, RetryPolicy retryPolicy, RaftProperties properties, Parameters parameters) {
@@ -346,11 +342,11 @@
return dataStreamApi.get();
}
- Throwable noMoreRetries(ClientRetryEvent event) {
+ IOException noMoreRetries(ClientRetryEvent event) {
final int attemptCount = event.getAttemptCount();
final Throwable throwable = event.getCause();
if (attemptCount == 1 && throwable != null) {
- return throwable;
+ return IOUtils.asIOException(throwable);
}
return new RaftRetryFailureException(event.getRequest(), attemptCount, retryPolicy, throwable);
}
@@ -418,8 +414,7 @@
void handleIOException(RaftClientRequest request, IOException ioe,
RaftPeerId newLeader, Consumer<RaftClientRequest> handler) {
- LOG.debug("{}: suggested new leader: {}. Failed {} with {}",
- clientId, newLeader, request, ioe);
+ LOG.debug("{}: suggested new leader: {}. Failed {}", clientId, newLeader, request, ioe);
if (LOG.isTraceEnabled()) {
LOG.trace("Stack trace", new Throwable("TRACE"));
}
@@ -456,8 +451,17 @@
return clientRpc;
}
+ boolean isClosed() {
+ return closed.get();
+ }
+
@Override
public void close() throws IOException {
+ if (!closed.compareAndSet(false, true)) {
+ return;
+ }
+
+ LOG.debug("close {}", getId());
clientRpc.close();
if (dataStreamApi.isInitialized()) {
dataStreamApi.get().close();
diff --git a/ratis-client/src/main/java/org/apache/ratis/client/impl/UnorderedAsync.java b/ratis-client/src/main/java/org/apache/ratis/client/impl/UnorderedAsync.java
index 84b817b..eccda4d 100644
--- a/ratis-client/src/main/java/org/apache/ratis/client/impl/UnorderedAsync.java
+++ b/ratis-client/src/main/java/org/apache/ratis/client/impl/UnorderedAsync.java
@@ -22,6 +22,7 @@
import org.apache.ratis.protocol.ClientId;
import org.apache.ratis.protocol.Message;
import org.apache.ratis.protocol.RaftPeerId;
+import org.apache.ratis.protocol.exceptions.AlreadyClosedException;
import org.apache.ratis.protocol.exceptions.GroupMismatchException;
import org.apache.ratis.protocol.exceptions.NotLeaderException;
import org.apache.ratis.protocol.RaftClientReply;
@@ -89,11 +90,14 @@
}
final Throwable cause = replyException != null ? replyException : e;
- pending.incrementExceptionCount(cause);
- final ClientRetryEvent event = new ClientRetryEvent(request, cause, pending);
+ if (client.isClosed()) {
+ f.completeExceptionally(new AlreadyClosedException(client + " is closed"));
+ return;
+ }
+
+ final ClientRetryEvent event = pending.newClientRetryEvent(request, cause);
RetryPolicy retryPolicy = client.getRetryPolicy();
final RetryPolicy.Action action = retryPolicy.handleAttemptFailure(event);
- TimeDuration sleepTime = client.getEffectiveSleepTime(cause, action.getSleepTime());
if (!action.shouldRetry()) {
f.completeExceptionally(client.noMoreRetries(event));
return;
@@ -124,7 +128,9 @@
}
}
- LOG.debug("schedule retry for attempt #{}, policy={}, request={}", attemptCount, retryPolicy, request);
+ final TimeDuration sleepTime = client.getEffectiveSleepTime(cause, action.getSleepTime());
+ LOG.debug("schedule~ attempt #{} with sleep {} and policy {} for {}",
+ attemptCount, sleepTime, retryPolicy, request);
client.getScheduler().onTimeout(sleepTime,
() -> sendRequestWithRetry(pending, client), LOG, () -> clientId + ": Failed~ to retry " + request);
} catch (Exception ex) {
diff --git a/ratis-client/src/main/java/org/apache/ratis/client/retry/ClientRetryEvent.java b/ratis-client/src/main/java/org/apache/ratis/client/retry/ClientRetryEvent.java
index f0c38ef..c6a8beb 100644
--- a/ratis-client/src/main/java/org/apache/ratis/client/retry/ClientRetryEvent.java
+++ b/ratis-client/src/main/java/org/apache/ratis/client/retry/ClientRetryEvent.java
@@ -17,12 +17,11 @@
*/
package org.apache.ratis.client.retry;
-import org.apache.ratis.client.impl.RaftClientImpl.PendingClientRequest;
import org.apache.ratis.protocol.RaftClientRequest;
import org.apache.ratis.retry.RetryPolicy;
-import org.apache.ratis.thirdparty.com.google.common.annotations.VisibleForTesting;
import org.apache.ratis.util.JavaUtils;
import org.apache.ratis.util.TimeDuration;
+import org.apache.ratis.util.Timestamp;
/** An {@link RetryPolicy.Event} specific to client request failure. */
public class ClientRetryEvent implements RetryPolicy.Event {
@@ -30,23 +29,15 @@
private final int causeCount;
private final RaftClientRequest request;
private final Throwable cause;
- private PendingClientRequest pending;
+ private final Timestamp pendingRequestCreationTime;
- @VisibleForTesting
- public ClientRetryEvent(int attemptCount, RaftClientRequest request, Throwable cause) {
- this(attemptCount, request, attemptCount, cause);
- }
-
- public ClientRetryEvent(RaftClientRequest request, Throwable t, PendingClientRequest pending) {
- this(pending.getAttemptCount(), request, pending.getExceptionCount(t), t);
- this.pending = pending;
- }
-
- private ClientRetryEvent(int attemptCount, RaftClientRequest request, int causeCount, Throwable cause) {
+ public ClientRetryEvent(int attemptCount, RaftClientRequest request, int causeCount, Throwable cause,
+ Timestamp pendingRequestCreationTime) {
this.attemptCount = attemptCount;
this.causeCount = causeCount;
this.request = request;
this.cause = cause;
+ this.pendingRequestCreationTime = pendingRequestCreationTime;
}
@Override
@@ -69,7 +60,7 @@
}
boolean isRequestTimeout(TimeDuration timeout) {
- return pending != null && pending.isRequestTimeout(timeout);
+ return timeout != null && pendingRequestCreationTime.elapsedTime().compareTo(timeout) >= 0;
}
@Override
@@ -77,6 +68,7 @@
return JavaUtils.getClassSimpleName(getClass())
+ ":attempt=" + attemptCount
+ ",request=" + request
- + ",cause=" + cause;
+ + ",cause=" + cause
+ + ",causeCount=" + causeCount;
}
}
diff --git a/ratis-common/src/main/java/org/apache/ratis/util/PeerProxyMap.java b/ratis-common/src/main/java/org/apache/ratis/util/PeerProxyMap.java
index 105ecbf..0ce0595 100644
--- a/ratis-common/src/main/java/org/apache/ratis/util/PeerProxyMap.java
+++ b/ratis-common/src/main/java/org/apache/ratis/util/PeerProxyMap.java
@@ -36,6 +36,7 @@
import java.util.Objects;
import java.util.Optional;
import java.util.concurrent.ConcurrentHashMap;
+import java.util.concurrent.atomic.AtomicBoolean;
/** A map from peer id to peer and its proxy. */
public class PeerProxyMap<PROXY extends Closeable> implements RaftPeer.Add, Closeable {
@@ -65,7 +66,7 @@
throw new AlreadyClosedException(name + " is already " + current);
}
lifeCycle.startAndTransition(
- () -> proxy = createProxy.apply(peer), IOException.class);
+ () -> proxy = createProxyImpl(peer), IOException.class);
}
}
}
@@ -92,6 +93,7 @@
private final Object resetLock = new Object();
private final CheckedFunction<RaftPeer, PROXY, IOException> createProxy;
+ private final AtomicBoolean closed = new AtomicBoolean();
public PeerProxyMap(String name, CheckedFunction<RaftPeer, PROXY, IOException> createProxy) {
this.name = name;
@@ -102,6 +104,13 @@
return name;
}
+ private PROXY createProxyImpl(RaftPeer peer) throws IOException {
+ if (closed.get()) {
+ throw new AlreadyClosedException(name + ": Failed to create proxy for " + peer);
+ }
+ return createProxy.apply(peer);
+ }
+
public PROXY getProxy(RaftPeerId id) throws IOException {
Objects.requireNonNull(id, "id == null");
PeerAndProxy p = peers.get(id);
@@ -161,6 +170,10 @@
@Override
public void close() {
+ if (!closed.compareAndSet(false, true)) {
+ return;
+ }
+
final List<IOException> exceptions = Collections.synchronizedList(new ArrayList<>());
ConcurrentUtils.parallelForEachAsync(peers.values(),
pp -> pp.setNullProxyAndClose().map(proxy -> closeProxy(proxy, pp)).ifPresent(exceptions::add),
@@ -180,7 +193,7 @@
private IOException closeProxy(PROXY proxy, PeerAndProxy pp) {
try {
- LOG.debug("{}: Closing proxy for peer {}", name, pp);
+ LOG.debug("{}: Closing proxy {} {} for peer {}", name, proxy.getClass().getSimpleName(), proxy, pp);
proxy.close();
return null;
} catch (IOException e) {
diff --git a/ratis-server/src/test/java/org/apache/ratis/RaftAsyncTests.java b/ratis-server/src/test/java/org/apache/ratis/RaftAsyncTests.java
index 260f601..71c5c5e 100644
--- a/ratis-server/src/test/java/org/apache/ratis/RaftAsyncTests.java
+++ b/ratis-server/src/test/java/org/apache/ratis/RaftAsyncTests.java
@@ -372,7 +372,7 @@
@Test
public void testStateMachineMetrics() throws Exception {
runWithNewCluster(NUM_SERVERS, cluster ->
- RaftBasicTests.testStateMachineMetrics(true, cluster, LOG));
+ RaftBasicTests.runTestStateMachineMetrics(true, cluster));
}
@Test
diff --git a/ratis-server/src/test/java/org/apache/ratis/RaftBasicTests.java b/ratis-server/src/test/java/org/apache/ratis/RaftBasicTests.java
index 47c9b0e..4ff9681 100644
--- a/ratis-server/src/test/java/org/apache/ratis/RaftBasicTests.java
+++ b/ratis-server/src/test/java/org/apache/ratis/RaftBasicTests.java
@@ -168,12 +168,10 @@
final List<RaftServer.Division> divisions = cluster.getServerAliveStream().collect(Collectors.toList());
for(RaftServer.Division impl: divisions) {
- JavaUtils.attempt(() -> RaftTestUtil.assertLogEntries(impl, term, messages),
- 50, TimeDuration.valueOf(1, TimeUnit.SECONDS), impl.getId() + " assertLogEntries", LOG);
+ RaftTestUtil.assertLogEntries(impl, term, messages, 50, LOG);
}
}
-
@Test
public void testOldLeaderCommit() throws Exception {
runWithNewCluster(NUM_SERVERS, this::runTestOldLeaderCommit);
@@ -218,7 +216,7 @@
cluster.getServerAliveStream()
.map(RaftServer.Division::getRaftLog)
- .forEach(log -> RaftTestUtil.assertLogEntries(log, term, messages));
+ .forEach(log -> RaftTestUtil.assertLogEntries(log, term, messages, System.out::println));
}
@Test
@@ -453,8 +451,12 @@
}
}
- public static void testStateMachineMetrics(boolean async,
- MiniRaftCluster cluster, Logger LOG) throws Exception {
+ @Test
+ public void testStateMachineMetrics() throws Exception {
+ runWithNewCluster(NUM_SERVERS, cluster -> runTestStateMachineMetrics(false, cluster));
+ }
+
+ static void runTestStateMachineMetrics(boolean async, MiniRaftCluster cluster) throws Exception {
RaftServer.Division leader = waitForLeader(cluster);
try (final RaftClient client = cluster.createClient()) {
Gauge appliedIndexGauge = getStatemachineGaugeWithName(leader,
diff --git a/ratis-server/src/test/java/org/apache/ratis/RaftTestUtil.java b/ratis-server/src/test/java/org/apache/ratis/RaftTestUtil.java
index fa41887..41a4311 100644
--- a/ratis-server/src/test/java/org/apache/ratis/RaftTestUtil.java
+++ b/ratis-server/src/test/java/org/apache/ratis/RaftTestUtil.java
@@ -249,19 +249,16 @@
}
}
- static void assertLogEntries(RaftServer.Division server, long expectedTerm, SimpleMessage... expectedMessages) {
- LOG.info("checking raft log for {}", server.getMemberId());
- final RaftLog log = server.getRaftLog();
- try {
- RaftTestUtil.assertLogEntries(log, expectedTerm, expectedMessages);
- } catch (AssertionError e) {
- LOG.error("Unexpected raft log in {}", server.getMemberId(), e);
- throw e;
- }
+ static void assertLogEntries(RaftServer.Division server, long expectedTerm, SimpleMessage[] expectedMessages,
+ int numAttempts, Logger log) throws Exception {
+ final String name = server.getId() + " assertLogEntries";
+ final Function<Integer, Consumer<String>> print = i -> i < numAttempts? s -> {}: System.out::println;
+ JavaUtils.attempt(i -> assertLogEntries(server.getRaftLog(), expectedTerm, expectedMessages, print.apply(i)),
+ numAttempts, TimeDuration.ONE_SECOND, () -> name, log);
}
static Iterable<LogEntryProto> getLogEntryProtos(RaftLog log) {
- return CollectionUtils.as(log.getEntries(0, Long.MAX_VALUE), ti -> {
+ return CollectionUtils.as(log.getEntries(0, log.getLastEntryTermIndex().getIndex() + 1), ti -> {
try {
return log.get(ti.getIndex());
} catch (IOException exception) {
@@ -270,17 +267,17 @@
});
}
- static List<LogEntryProto> getStateMachineLogEntries(RaftLog log) {
+ static List<LogEntryProto> getStateMachineLogEntries(RaftLog log, Consumer<String> print) {
final List<LogEntryProto> entries = new ArrayList<>();
for (LogEntryProto e : getLogEntryProtos(log)) {
final String s = LogProtoUtils.toLogEntryString(e);
if (e.hasStateMachineLogEntry()) {
- LOG.info(s + ", " + e.getStateMachineLogEntry().toString().trim().replace("\n", ", "));
+ print.accept(entries.size() + ") " + s);
entries.add(e);
} else if (e.hasConfigurationEntry()) {
- LOG.info("Found {}, ignoring it.", s);
+ print.accept("Ignoring " + s);
} else if (e.hasMetadataEntry()) {
- LOG.info("Found {}, ignoring it.", s);
+ print.accept("Ignoring " + s);
} else {
throw new AssertionError("Unexpected LogEntryBodyCase " + e.getLogEntryBodyCase() + " at " + s);
}
@@ -288,13 +285,14 @@
return entries;
}
- static void assertLogEntries(RaftLog log, long expectedTerm, SimpleMessage... expectedMessages) {
- final List<LogEntryProto> entries = getStateMachineLogEntries(log);
+ static Void assertLogEntries(RaftLog log, long expectedTerm, SimpleMessage[] expectedMessages, Consumer<String> print) {
+ final List<LogEntryProto> entries = getStateMachineLogEntries(log, print);
try {
assertLogEntries(entries, expectedTerm, expectedMessages);
} catch(Exception t) {
throw new AssertionError("entries: " + entries, t);
}
+ return null;
}
static void assertLogEntries(List<LogEntryProto> entries, long expectedTerm, SimpleMessage... expectedMessages) {
diff --git a/ratis-test/src/test/java/org/apache/ratis/RaftLogTruncateTests.java b/ratis-test/src/test/java/org/apache/ratis/RaftLogTruncateTests.java
new file mode 100644
index 0000000..80c5774
--- /dev/null
+++ b/ratis-test/src/test/java/org/apache/ratis/RaftLogTruncateTests.java
@@ -0,0 +1,216 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.ratis;
+
+import org.apache.ratis.RaftTestUtil.SimpleMessage;
+import org.apache.ratis.client.RaftClient;
+import org.apache.ratis.client.RaftClientConfigKeys;
+import org.apache.ratis.client.impl.OrderedAsync;
+import org.apache.ratis.conf.RaftProperties;
+import org.apache.ratis.protocol.RaftClientReply;
+import org.apache.ratis.protocol.RaftPeerId;
+import org.apache.ratis.server.RaftServer;
+import org.apache.ratis.server.RaftServerConfigKeys;
+import org.apache.ratis.server.impl.MiniRaftCluster;
+import org.apache.ratis.server.raftlog.segmented.SegmentedRaftLog;
+import org.apache.ratis.statemachine.StateMachine;
+import org.apache.ratis.statemachine.impl.SimpleStateMachine4Testing;
+import org.apache.ratis.util.Slf4jUtils;
+import org.apache.ratis.util.TimeDuration;
+import org.junit.Assert;
+import org.junit.Test;
+import org.slf4j.event.Level;
+
+import java.util.ArrayList;
+import java.util.Collections;
+import java.util.List;
+import java.util.concurrent.TimeUnit;
+import java.util.concurrent.atomic.AtomicBoolean;
+
+import static org.apache.ratis.RaftTestUtil.waitForLeader;
+
+public abstract class RaftLogTruncateTests<CLUSTER extends MiniRaftCluster> extends BaseTest
+ implements MiniRaftCluster.Factory.Get<CLUSTER> {
+ public static final int NUM_SERVERS = 5;
+ final TimeDuration MIN_TIMEOUT = TimeDuration.valueOf(3, TimeUnit.SECONDS);
+
+ static SimpleMessage[] arraycopy(SimpleMessage[] src1, SimpleMessage[] src2) {
+ final SimpleMessage[] dst = new SimpleMessage[src1.length + src2.length];
+ System.arraycopy(src1, 0, dst, 0, src1.length);
+ System.arraycopy(src2, 0, dst, src1.length, src2.length);
+ return dst;
+ }
+
+ {
+ Slf4jUtils.setLogLevel(OrderedAsync.LOG, Level.ERROR);
+ Slf4jUtils.setLogLevel(RaftServerConfigKeys.LOG, Level.ERROR);
+ Slf4jUtils.setLogLevel(RaftClientConfigKeys.LOG, Level.ERROR);
+
+ final RaftProperties p = getProperties();
+ p.setClass(MiniRaftCluster.STATEMACHINE_CLASS_KEY, SimpleStateMachine4Testing.class, StateMachine.class);
+
+ // set a long rpc timeout so, when the leader does not have the majority, it won't step down fast.
+ RaftServerConfigKeys.Rpc.setTimeoutMin(p, MIN_TIMEOUT);
+ RaftServerConfigKeys.Rpc.setTimeoutMax(p, MIN_TIMEOUT.multiply(2));
+ RaftServerConfigKeys.Rpc.setFirstElectionTimeoutMin(p, TimeDuration.ONE_SECOND);
+ RaftServerConfigKeys.Rpc.setFirstElectionTimeoutMax(p, TimeDuration.ONE_SECOND.multiply(2));
+ }
+
+ @Override
+ public int getGlobalTimeoutSeconds() {
+ return 200;
+ }
+
+ @Test
+ public void testLogTruncate() throws Exception {
+ runWithNewCluster(NUM_SERVERS, this::runTestLogTruncate);
+ }
+
+ void runTestLogTruncate(MiniRaftCluster cluster) throws Exception {
+ final RaftServer.Division oldLeader = waitForLeader(cluster);
+ final List<RaftServer.Division> oldFollowers = cluster.getFollowers();
+ final List<RaftPeerId> killedPeers = new ArrayList<>();
+ final List<RaftPeerId> remainingPeers = new ArrayList<>();
+
+ final int majorityIndex = NUM_SERVERS / 2 + 1;
+ Assert.assertEquals(NUM_SERVERS - 1, oldFollowers.size());
+ Assert.assertTrue(majorityIndex < oldFollowers.size());
+
+ for (int i = 0; i < majorityIndex; i++) {
+ killedPeers.add(oldFollowers.get(i).getId());
+ }
+ remainingPeers.add(oldLeader.getId());
+ for (int i = majorityIndex; i < oldFollowers.size(); i++) {
+ remainingPeers.add(oldFollowers.get(i).getId());
+ }
+
+ try {
+ runTestLogTruncate(cluster, oldLeader, killedPeers, remainingPeers);
+ } catch (Throwable e) {
+ LOG.info("killedPeers : {}", killedPeers);
+ LOG.info("remainingPeers: {}", remainingPeers);
+ throw e;
+ }
+ }
+
+ void runTestLogTruncate(MiniRaftCluster cluster, RaftServer.Division oldLeader,
+ List<RaftPeerId> killedPeers, List<RaftPeerId> remainingPeers) throws Exception {
+ final List<Throwable> exceptions = Collections.synchronizedList(new ArrayList<>());
+ final long oldLeaderTerm = oldLeader.getInfo().getCurrentTerm();
+ LOG.info("oldLeader: {}, term={}", oldLeader.getId(), oldLeaderTerm);
+
+ final SimpleMessage[] firstBatch = SimpleMessage.create(5, "first");
+ final SimpleMessage[] secondBatch = SimpleMessage.create(4, "second");
+
+ try (final RaftClient client = cluster.createClient(oldLeader.getId())) {
+ // send some messages
+ for (SimpleMessage batch : firstBatch) {
+ final RaftClientReply reply = client.io().send(batch);
+ Assert.assertTrue(reply.isSuccess());
+ }
+ for (RaftServer.Division f : cluster.getFollowers()) {
+ assertLogEntries(f, oldLeaderTerm, firstBatch);
+ }
+
+ // kill a majority of followers
+ LOG.info("Before killServer {}: {}", killedPeers, cluster.printServers());
+ for (RaftPeerId f : killedPeers) {
+ cluster.killServer(f);
+ }
+ LOG.info("After killServer {}: {}", killedPeers, cluster.printServers());
+
+ // send more messages, but they won't be committed due to not enough followers
+ final SimpleMessage[] messagesToBeTruncated = SimpleMessage.create(3, "messagesToBeTruncated");
+ final AtomicBoolean done = new AtomicBoolean();
+ for (SimpleMessage message : messagesToBeTruncated) {
+ client.async().send(message).whenComplete((r, e) -> {
+ if (!done.get()) {
+ exceptions.add(new IllegalStateException(message + " is completed: reply=" + r, e));
+ }
+ });
+ }
+
+ // check log messages
+ final SimpleMessage[] expectedMessages = arraycopy(firstBatch, messagesToBeTruncated);
+ for (RaftPeerId f : remainingPeers) {
+ assertLogEntries(cluster.getDivision(f), oldLeaderTerm, expectedMessages);
+ }
+ done.set(true);
+ LOG.info("done");
+ }
+
+ // kill the remaining servers
+ LOG.info("Before killServer {}: {}", remainingPeers, cluster.printServers());
+ for (RaftPeerId f : remainingPeers) {
+ cluster.killServer(f);
+ }
+ LOG.info("After killServer {}: {}", remainingPeers, cluster.printServers());
+
+ // restart the earlier followers
+ for (RaftPeerId f : killedPeers) {
+ cluster.restartServer(f, false);
+ }
+
+ // The new leader should be one of the earlier followers
+ final RaftServer.Division newLeader = waitForLeader(cluster);
+ LOG.info("After restartServer {}: {}", killedPeers, cluster.printServers());
+ final long newLeaderTerm = newLeader.getInfo().getCurrentTerm();
+
+ final SegmentedRaftLog newLeaderLog = (SegmentedRaftLog) newLeader.getRaftLog();
+ LOG.info("newLeader: {}, term {}, last={}", newLeader.getId(), newLeaderTerm,
+ newLeaderLog.getLastEntryTermIndex());
+ Assert.assertTrue(killedPeers.contains(newLeader.getId()));
+
+ // restart the remaining servers
+ for (RaftPeerId f : remainingPeers) {
+ cluster.restartServer(f, false);
+ }
+
+ // check RaftLog truncate
+ for (RaftPeerId f : remainingPeers) {
+ assertLogEntries(cluster.getDivision(f), oldLeaderTerm, firstBatch);
+ }
+
+ try (final RaftClient client = cluster.createClient(newLeader.getId())) {
+ // send more messages
+ for (SimpleMessage batch : secondBatch) {
+ final RaftClientReply reply = client.io().send(batch);
+ Assert.assertTrue(reply.isSuccess());
+ }
+ }
+
+ // check log messages -- it should be truncated and then append the new messages
+ final SimpleMessage[] expectedMessages = arraycopy(firstBatch, secondBatch);
+ for (RaftPeerId f : killedPeers) {
+ assertLogEntries(cluster.getDivision(f), oldLeaderTerm, expectedMessages);
+ }
+
+ if (!exceptions.isEmpty()) {
+ LOG.info("{} exceptions", exceptions.size());
+ for(int i = 0 ; i < exceptions.size(); i++) {
+ LOG.info("exception {})", i, exceptions.get(i));
+ }
+ Assert.fail();
+ }
+ }
+
+ private void assertLogEntries(RaftServer.Division server, long term, SimpleMessage[] expectedMessages)
+ throws Exception {
+ RaftTestUtil.assertLogEntries(server, term, expectedMessages, 30, LOG);
+ }
+}
diff --git a/ratis-test/src/test/java/org/apache/ratis/grpc/TestRaftLogTruncateWithGrpc.java b/ratis-test/src/test/java/org/apache/ratis/grpc/TestRaftLogTruncateWithGrpc.java
new file mode 100644
index 0000000..dc28463
--- /dev/null
+++ b/ratis-test/src/test/java/org/apache/ratis/grpc/TestRaftLogTruncateWithGrpc.java
@@ -0,0 +1,24 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.ratis.grpc;
+
+import org.apache.ratis.RaftLogTruncateTests;
+
+public class TestRaftLogTruncateWithGrpc extends RaftLogTruncateTests<MiniRaftClusterWithGrpc>
+ implements MiniRaftClusterWithGrpc.FactoryGet {
+}
diff --git a/ratis-test/src/test/java/org/apache/ratis/grpc/TestRaftWithGrpc.java b/ratis-test/src/test/java/org/apache/ratis/grpc/TestRaftWithGrpc.java
index bc0061f..046453d 100644
--- a/ratis-test/src/test/java/org/apache/ratis/grpc/TestRaftWithGrpc.java
+++ b/ratis-test/src/test/java/org/apache/ratis/grpc/TestRaftWithGrpc.java
@@ -74,12 +74,6 @@
}
@Test
- public void testStateMachineMetrics() throws Exception {
- runWithNewCluster(NUM_SERVERS, cluster ->
- testStateMachineMetrics(false, cluster, LOG));
- }
-
- @Test
public void testUpdateViaHeartbeat() throws Exception {
runWithNewCluster(NUM_SERVERS, this::runTestUpdateViaHeartbeat);
}
diff --git a/ratis-test/src/test/java/org/apache/ratis/retry/TestExceptionDependentRetry.java b/ratis-test/src/test/java/org/apache/ratis/retry/TestExceptionDependentRetry.java
index 264db89..36e6dfb 100644
--- a/ratis-test/src/test/java/org/apache/ratis/retry/TestExceptionDependentRetry.java
+++ b/ratis-test/src/test/java/org/apache/ratis/retry/TestExceptionDependentRetry.java
@@ -154,7 +154,7 @@
long sleepTime) {
for (int i = 0; i < retries + 1; i++) {
RetryPolicy.Action action = exceptionDependentRetry
- .handleAttemptFailure(new ClientRetryEvent(i, null, exception));
+ .handleAttemptFailure(TestRetryPolicy.newClientRetryEvent(i, null, exception));
final boolean expected = i < retries && i < maxAttempts;
Assert.assertEquals(expected, action.shouldRetry());
diff --git a/ratis-test/src/test/java/org/apache/ratis/retry/TestRetryPolicy.java b/ratis-test/src/test/java/org/apache/ratis/retry/TestRetryPolicy.java
index d69cd1a..1b9536b 100644
--- a/ratis-test/src/test/java/org/apache/ratis/retry/TestRetryPolicy.java
+++ b/ratis-test/src/test/java/org/apache/ratis/retry/TestRetryPolicy.java
@@ -33,6 +33,7 @@
import org.apache.ratis.protocol.exceptions.TimeoutIOException;
import org.apache.ratis.protocol.exceptions.ResourceUnavailableException;
import org.apache.ratis.util.TimeDuration;
+import org.apache.ratis.util.Timestamp;
import org.junit.Assert;
import org.junit.Test;
@@ -70,6 +71,10 @@
}
}
+ static ClientRetryEvent newClientRetryEvent(int attemptCount, RaftClientRequest request, Throwable cause) {
+ return new ClientRetryEvent(attemptCount, request, attemptCount, cause, Timestamp.currentTime());
+ }
+
@Test
public void testRequestTypeDependentRetry() {
final RequestTypeDependentRetryPolicy.Builder b = RequestTypeDependentRetryPolicy.newBuilder();
@@ -88,7 +93,7 @@
RaftClientRequest.watchRequestType(1, ReplicationLevel.MAJORITY));
for(int i = 1; i < 2*n; i++) {
{ //write
- final ClientRetryEvent event = new ClientRetryEvent(i, writeRequest, null);
+ final ClientRetryEvent event = newClientRetryEvent(i, writeRequest, null);
final RetryPolicy.Action action = policy.handleAttemptFailure(event);
final boolean expected = i < n;
@@ -101,21 +106,21 @@
}
{ //read and stale read are using default
- final ClientRetryEvent event = new ClientRetryEvent(i, readRequest, null);
+ final ClientRetryEvent event = newClientRetryEvent(i, readRequest, null);
final RetryPolicy.Action action = policy.handleAttemptFailure(event);
Assert.assertTrue(action.shouldRetry());
Assert.assertEquals(0L, action.getSleepTime().getDuration());
}
{
- final ClientRetryEvent event = new ClientRetryEvent(i, staleReadRequest, null);
+ final ClientRetryEvent event = newClientRetryEvent(i, staleReadRequest, null);
final RetryPolicy.Action action = policy.handleAttemptFailure(event);
Assert.assertTrue(action.shouldRetry());
Assert.assertEquals(0L, action.getSleepTime().getDuration());
}
{ //watch has no retry
- final ClientRetryEvent event = new ClientRetryEvent(i, watchRequest, null);
+ final ClientRetryEvent event = newClientRetryEvent(i, watchRequest, null);
final RetryPolicy.Action action = policy.handleAttemptFailure(event);
Assert.assertFalse(action.shouldRetry());
Assert.assertEquals(0L, action.getSleepTime().getDuration());
@@ -148,7 +153,7 @@
};
for (RaftClientRequest request : requests) {
- final ClientRetryEvent event = new ClientRetryEvent(request, new Exception(), pending);
+ final ClientRetryEvent event = pending.newClientRetryEvent(request, new Exception());
final RetryPolicy.Action action = policy.handleAttemptFailure(event);
Assert.assertTrue(action.shouldRetry());
Assert.assertEquals(0L, action.getSleepTime().getDuration());
@@ -156,7 +161,7 @@
timeout.sleep();
for (RaftClientRequest request : requests) {
- final ClientRetryEvent event = new ClientRetryEvent(request, new Exception(), pending);
+ final ClientRetryEvent event = pending.newClientRetryEvent(request, new Exception());
final RetryPolicy.Action action = policy.handleAttemptFailure(event);
Assert.assertFalse(action.shouldRetry());
}
@@ -218,8 +223,7 @@
*/
private void checkEvent(int exceptionAttemptCount, RetryPolicy retryPolicy, RaftClientRequest raftClientRequest,
Throwable exception, Pair exceptionPolicyPair) {
- final ClientRetryEvent event =
- new ClientRetryEvent(exceptionAttemptCount, raftClientRequest, exception);
+ final ClientRetryEvent event = newClientRetryEvent(exceptionAttemptCount, raftClientRequest, exception);
final RetryPolicy.Action action = retryPolicy.handleAttemptFailure(event);
final boolean expected = exceptionAttemptCount < exceptionPolicyPair.retries;
diff --git a/ratis-test/src/test/java/org/apache/ratis/server/ServerRestartTests.java b/ratis-test/src/test/java/org/apache/ratis/server/ServerRestartTests.java
index 73ff1eb..2f3edf7 100644
--- a/ratis-test/src/test/java/org/apache/ratis/server/ServerRestartTests.java
+++ b/ratis-test/src/test/java/org/apache/ratis/server/ServerRestartTests.java
@@ -261,7 +261,7 @@
final RaftPeerId leaderId = leader.getId();
ids.add(leaderId);
- RaftTestUtil.getStateMachineLogEntries(leaderLog);
+ RaftTestUtil.getStateMachineLogEntries(leaderLog, LOG::info);
// check that the last metadata entry is written to the log
JavaUtils.attempt(() -> assertLastLogEntry(leader), 20, HUNDRED_MILLIS, "leader last metadata entry", LOG);