RATIS-1942. GrpcLogAppender has `ILLEGAL TRANSITION: STARTING -> STARTING` (#994)
diff --git a/ratis-common/src/main/java/org/apache/ratis/util/LifeCycle.java b/ratis-common/src/main/java/org/apache/ratis/util/LifeCycle.java
index d543d58..9870fe3 100644
--- a/ratis-common/src/main/java/org/apache/ratis/util/LifeCycle.java
+++ b/ratis-common/src/main/java/org/apache/ratis/util/LifeCycle.java
@@ -160,16 +160,20 @@
/** Transition from the current state to the given state. */
public void transition(final State to) {
- final State from = current.getAndSet(to);
- State.validate(name, from, to);
+ current.updateAndGet(from -> {
+ State.validate(name, from, to);
+ return to;
+ });
}
/** Transition from the current state to the given state if the current state is not equal to the given state. */
public void transitionIfNotEqual(final State to) {
- final State from = current.getAndSet(to);
- if (from != to) {
- State.validate(name, from, to);
- }
+ current.updateAndGet(from -> {
+ if (from != to) {
+ State.validate(name, from, to);
+ }
+ return to;
+ });
}
/**
@@ -226,11 +230,14 @@
* @return true iff the current state is equal to the specified from state.
*/
public boolean compareAndTransition(final State from, final State to) {
- if (current.compareAndSet(from, to)) {
+ final State previous = current.getAndUpdate(state -> {
+ if (state != from) {
+ return state;
+ }
State.validate(name, from, to);
- return true;
- }
- return false;
+ return to;
+ });
+ return previous == from;
}
/** @return the current state. */
diff --git a/ratis-grpc/src/main/java/org/apache/ratis/grpc/server/GrpcServerProtocolClient.java b/ratis-grpc/src/main/java/org/apache/ratis/grpc/server/GrpcServerProtocolClient.java
index 3928645..dd4e199 100644
--- a/ratis-grpc/src/main/java/org/apache/ratis/grpc/server/GrpcServerProtocolClient.java
+++ b/ratis-grpc/src/main/java/org/apache/ratis/grpc/server/GrpcServerProtocolClient.java
@@ -62,7 +62,7 @@
public GrpcServerProtocolClient(RaftPeer target, int flowControlWindow,
TimeDuration requestTimeout, GrpcTlsConfig tlsConfig, boolean separateHBChannel) {
raftPeerId = target.getId();
- LOG.info("Build channel for {}", raftPeerId);
+ LOG.info("Build channel for {}", target);
useSeparateHBChannel = separateHBChannel;
channel = buildChannel(target, flowControlWindow, tlsConfig);
blockingStub = RaftServerProtocolServiceGrpc.newBlockingStub(channel);
diff --git a/ratis-server/src/test/java/org/apache/ratis/server/impl/MiniRaftCluster.java b/ratis-server/src/test/java/org/apache/ratis/server/impl/MiniRaftCluster.java
index 35916db..795ca6d 100644
--- a/ratis-server/src/test/java/org/apache/ratis/server/impl/MiniRaftCluster.java
+++ b/ratis-server/src/test/java/org/apache/ratis/server/impl/MiniRaftCluster.java
@@ -250,27 +250,27 @@
public static RaftGroup initRaftGroup(Collection<String> ids, Collection<String> listenerIds) {
Iterator<InetSocketAddress> addresses = NetUtils.createLocalServerAddress(4 * (ids.size() + listenerIds.size())).iterator();
Stream<RaftPeer> peer = ids.stream()
- .map(RaftPeerId::valueOf)
- .map(id -> RaftPeer.newBuilder().setId(id)
- .setAddress(addresses.next())
- .setAdminAddress(addresses.next())
- .setClientAddress(addresses.next())
- .setDataStreamAddress(addresses.next())
- .build());
+ .map(id -> RaftPeer.newBuilder().setId(id))
+ .map(p -> assignAddresses(p, addresses))
+ .map(RaftPeer.Builder::build);
Stream<RaftPeer> listener = listenerIds.stream()
- .map(RaftPeerId::valueOf)
- .map(id -> RaftPeer.newBuilder().setId(id)
- .setAddress(addresses.next())
- .setAdminAddress(addresses.next())
- .setClientAddress(addresses.next())
- .setDataStreamAddress(addresses.next())
- .setStartupRole(RaftProtos.RaftPeerRole.LISTENER)
- .build());
+ .map(id -> RaftPeer.newBuilder().setId(id))
+ .map(p -> assignAddresses(p, addresses))
+ .map(p -> p.setStartupRole(RaftProtos.RaftPeerRole.LISTENER))
+ .map(RaftPeer.Builder::build);
final RaftPeer[] peers = Stream.concat(peer, listener).toArray(RaftPeer[]::new);
return RaftGroup.valueOf(RaftGroupId.randomId(), peers);
}
+ private static RaftPeer.Builder assignAddresses(RaftPeer.Builder builder, Iterator<InetSocketAddress> addresses) {
+ return builder
+ .setAddress(addresses.next())
+ .setAdminAddress(addresses.next())
+ .setClientAddress(addresses.next())
+ .setDataStreamAddress(addresses.next());
+ }
+
private final Supplier<File> rootTestDir = JavaUtils.memoize(
() -> new File(BaseTest.getRootTestDir(),
JavaUtils.getClassSimpleName(getClass()) + Integer.toHexString(ThreadLocalRandom.current().nextInt())));
@@ -468,10 +468,13 @@
if (emptyPeer) {
raftGroup = RaftGroup.valueOf(group.getGroupId(), Collections.emptyList());
} else {
+ Iterator<InetSocketAddress> addresses = NetUtils.createLocalServerAddress(4 * ids.length).iterator();
final Collection<RaftPeer> newPeers = StreamSupport.stream(peerIds.spliterator(), false)
.map(id -> RaftPeer.newBuilder().setId(id)
- .setStartupRole(startRole)
- .build()).collect(Collectors.toSet());
+ .setStartupRole(startRole))
+ .map(p -> assignAddresses(p, addresses))
+ .map(RaftPeer.Builder::build)
+ .collect(Collectors.toSet());
newPeers.addAll(group.getPeers());
raftGroup = RaftGroup.valueOf(group.getGroupId(), newPeers);
}
diff --git a/ratis-test/src/test/java/org/apache/ratis/util/TestLifeCycle.java b/ratis-test/src/test/java/org/apache/ratis/util/TestLifeCycle.java
index 9782792..3faf2a4 100644
--- a/ratis-test/src/test/java/org/apache/ratis/util/TestLifeCycle.java
+++ b/ratis-test/src/test/java/org/apache/ratis/util/TestLifeCycle.java
@@ -17,10 +17,13 @@
*/
package org.apache.ratis.util;
-import org.junit.Assert;
+import org.apache.ratis.util.function.TriConsumer;
import org.junit.Test;
import static org.apache.ratis.util.LifeCycle.State.*;
+import static org.junit.Assert.assertEquals;
+import static org.junit.Assert.assertFalse;
+import static org.junit.Assert.assertTrue;
import java.util.*;
@@ -31,7 +34,7 @@
* while this test uses successors.
*/
@Test(timeout = 1000)
- public void testIsValid() throws Exception {
+ public void testIsValid() {
final Map<LifeCycle.State, List<LifeCycle.State>> successors
= new EnumMap<>(LifeCycle.State.class);
put(NEW, successors, STARTING, CLOSED);
@@ -44,10 +47,52 @@
put(CLOSED, successors);
final List<LifeCycle.State> states = Arrays.asList(LifeCycle.State.values());
- states.stream().forEach(
+ states.forEach(
from -> states.forEach(
- to -> Assert.assertEquals(from + " -> " + to,
+ to -> assertEquals(from + " -> " + to,
successors.get(from).contains(to),
isValid(from, to))));
}
+
+ @Test
+ public void validTransitions() {
+ testValidTransition((from, subject, to) -> assertTrue(subject.compareAndTransition(from, to)));
+ testValidTransition((from, subject, to) -> subject.transition(to));
+ testValidTransition((from, subject, to) -> assertEquals(to, subject.transitionAndGet(any -> to)));
+ testValidTransition((from, subject, to) -> subject.transitionIfNotEqual(to));
+ testValidTransition((from, subject, to) -> assertTrue(subject.transitionIfValid(to)));
+ }
+
+ private static void testValidTransition(TriConsumer<LifeCycle.State, LifeCycle, LifeCycle.State> op) {
+ LifeCycle subject = new LifeCycle("subject");
+ for (LifeCycle.State to : new LifeCycle.State[] { STARTING, RUNNING, PAUSING, PAUSED, CLOSING, CLOSED }) {
+ LifeCycle.State from = subject.getCurrentState();
+ op.accept(from, subject, to);
+ assertEquals(to, subject.getCurrentState());
+ }
+ }
+
+ @Test
+ public void invalidTransitions() {
+ testInvalidTransition((from, subject, to) -> subject.compareAndTransition(from, to), true);
+ testInvalidTransition((from, subject, to) -> subject.transition(to), true);
+ testInvalidTransition((from, subject, to) -> subject.transitionIfNotEqual(to), true);
+ testInvalidTransition((from, subject, to) -> assertFalse(subject.transitionIfValid(to)), false);
+ testInvalidTransition((from, subject, to) -> subject.transitionAndGet(any -> to), true);
+ }
+
+ private static void testInvalidTransition(TriConsumer<LifeCycle.State, LifeCycle, LifeCycle.State> op, boolean shouldThrow) {
+ LifeCycle subject = new LifeCycle("subject");
+ for (LifeCycle.State to : new LifeCycle.State[] { RUNNING, EXCEPTION, CLOSING }) {
+ LifeCycle.State from = subject.getCurrentState();
+ try {
+ op.accept(from, subject, to);
+ assertFalse(shouldThrow);
+ } catch (IllegalStateException e) {
+ assertTrue(shouldThrow);
+ assertEquals("Should be in original state", from, subject.getCurrentState());
+ }
+ }
+ }
+
}