IGNITE-7639 Fixed NPE
diff --git a/modules/core/src/main/java/org/apache/ignite/internal/processors/cluster/DiscoveryDataClusterState.java b/modules/core/src/main/java/org/apache/ignite/internal/processors/cluster/DiscoveryDataClusterState.java
index dea2ce7..66e56b9 100644
--- a/modules/core/src/main/java/org/apache/ignite/internal/processors/cluster/DiscoveryDataClusterState.java
+++ b/modules/core/src/main/java/org/apache/ignite/internal/processors/cluster/DiscoveryDataClusterState.java
@@ -28,22 +28,36 @@
import org.jetbrains.annotations.Nullable;
/**
- * Discovery data related to cluster state.
+ * A pojo-object representing current cluster global state. The state includes cluster active flag and cluster
+ * baseline topology.
+ * <p>
+ * This object also captures a transitional cluster state, when one or more fields are changing. In this case,
+ * a {@code transitionReqId} field is set to a non-null value and {@code prevState} captures previous cluster state.
+ * A joining node catching the cluster in an intermediate state will observe {@code transitionReqId} field to be
+ * non-null, however the {@code prevState} will not be sent to the joining node.
+ *
+ * TODO https://issues.apache.org/jira/browse/IGNITE-7640 This class must be immutable, transitionRes must be set by calling finish().
*/
public class DiscoveryDataClusterState implements Serializable {
/** */
private static final long serialVersionUID = 0L;
- /** */
+ /** Flag indicating if the cluster in in active state. */
private final boolean active;
- /** */
+ /** Current cluster baseline topology. */
@Nullable private final BaselineTopology baselineTopology;
- /** */
+ /**
+ * Transition request ID. Set to a non-null value if the cluster is changing it's state.
+ * The ID is assigned on the initiating node.
+ */
private final UUID transitionReqId;
- /** Topology version for state change exchange. */
+ /**
+ * Topology version in the cluster when state change request was received by the coordinator.
+ * The exchange fired for the cluster state change will be on version {@code transitionTopVer.nextMinorVersion()}.
+ */
@GridToStringInclude
private final AffinityTopologyVersion transitionTopVer;
@@ -51,13 +65,18 @@
@GridToStringExclude
private final Set<UUID> transitionNodes;
- /** Local flag for state transition result (global state is updated asynchronously by custom message). */
+ /**
+ * Local flag for state transition active state result (global state is updated asynchronously by custom message),
+ * {@code null} means that state change is not completed yet.
+ */
private transient volatile Boolean transitionRes;
- /** */
+ /**
+ * Previous cluster state if this state is a transition state and it was not received by a joining node.
+ */
private transient DiscoveryDataClusterState prevState;
- /** */
+ /** Transition result error. */
private transient volatile Exception transitionError;
/**
@@ -86,6 +105,7 @@
assert transitionReqId != null;
assert transitionTopVer != null;
assert !F.isEmpty(transitionNodes) : transitionNodes;
+ assert prevState != null;
return new DiscoveryDataClusterState(
prevState,
@@ -156,7 +176,7 @@
* @return {@code True} if cluster active state change is in progress, {@code false} otherwise.
*/
public boolean activeStateChanging() {
- return transition() && active != prevState.active;
+ return transition() && (prevState == null || prevState.active != active);
}
/**
@@ -202,6 +222,9 @@
}
/**
+ * Creates a non-transitional cluster state. This method effectively cleans all fields identifying the
+ * state as transitional and creates a new state with the state transition result.
+ *
* @param success Transition success status.
* @return Cluster state that finished transition.
*/
diff --git a/modules/core/src/test/java/org/apache/ignite/internal/processors/cache/IgniteClusterActivateDeactivateTest.java b/modules/core/src/test/java/org/apache/ignite/internal/processors/cache/IgniteClusterActivateDeactivateTest.java
index 71718c9..2337329 100644
--- a/modules/core/src/test/java/org/apache/ignite/internal/processors/cache/IgniteClusterActivateDeactivateTest.java
+++ b/modules/core/src/test/java/org/apache/ignite/internal/processors/cache/IgniteClusterActivateDeactivateTest.java
@@ -37,7 +37,6 @@
import org.apache.ignite.internal.IgniteClientReconnectAbstractTest;
import org.apache.ignite.internal.IgniteEx;
import org.apache.ignite.internal.IgniteInternalFuture;
-import org.apache.ignite.internal.IgniteKernal;
import org.apache.ignite.internal.TestRecordingCommunicationSpi;
import org.apache.ignite.internal.processors.affinity.AffinityTopologyVersion;
import org.apache.ignite.internal.processors.cache.distributed.dht.preloader.GridDhtPartitionsFullMessage;
@@ -49,7 +48,6 @@
import org.apache.ignite.lang.IgniteBiPredicate;
import org.apache.ignite.plugin.extensions.communication.Message;
import org.apache.ignite.spi.discovery.tcp.TcpDiscoverySpi;
-import org.apache.ignite.spi.discovery.tcp.TestTcpDiscoverySpi;
import org.apache.ignite.spi.discovery.tcp.ipfinder.TcpDiscoveryIpFinder;
import org.apache.ignite.spi.discovery.tcp.ipfinder.vm.TcpDiscoveryVmIpFinder;
import org.apache.ignite.testframework.GridTestUtils;
@@ -85,9 +83,6 @@
private boolean testSpi;
/** */
- private boolean testDiscoSpi;
-
- /** */
private boolean testReconnectSpi;
/** */
@@ -104,8 +99,6 @@
spi.setJoinTimeout(2 * 60_000);
}
- else if (testDiscoSpi)
- cfg.setDiscoverySpi(new TestTcpDiscoverySpi());
((TcpDiscoverySpi)cfg.getDiscoverySpi()).setIpFinder(IP_FINDER);
@@ -220,14 +213,14 @@
}
for (int i = 0; i < srvs + clients; i++)
- assertFalse(ignite(i).active());
+ assertFalse(ignite(i).cluster().active());
- ignite(activateFrom).active(false); // Should be no-op.
+ ignite(activateFrom).cluster().active(false); // Should be no-op.
- ignite(activateFrom).active(true);
+ ignite(activateFrom).cluster().active(true);
for (int i = 0; i < srvs + clients; i++)
- assertTrue(ignite(i).active());
+ assertTrue(ignite(i).cluster().active());
for (int i = 0; i < srvs + clients; i++) {
for (int c = 0; c < 2; c++)
@@ -308,16 +301,14 @@
private void joinWhileActivate1(final boolean startClient, final boolean withNewCache) throws Exception {
IgniteInternalFuture<?> activeFut = startNodesAndBlockStatusChange(2, 0, 0, false);
- IgniteInternalFuture<?> startFut = GridTestUtils.runAsync(new Callable<Void>() {
- @Override public Void call() throws Exception {
- client = startClient;
+ IgniteInternalFuture<?> startFut = GridTestUtils.runAsync((Callable<Void>)() -> {
+ client = startClient;
- ccfgs = withNewCache ? cacheConfigurations2() : cacheConfigurations1();
+ ccfgs = withNewCache ? cacheConfigurations2() : cacheConfigurations1();
- startGrid(2);
+ startGrid(2);
- return null;
- }
+ return null;
});
TestRecordingCommunicationSpi spi1 = TestRecordingCommunicationSpi.spi(ignite(1));
@@ -376,7 +367,7 @@
int minorVer = 1;
if (initiallyActive && persistenceEnabled()) {
- ignite(0).active(true);
+ ignite(0).cluster().active(true);
minorVer++;
}
@@ -396,11 +387,9 @@
blockExchangeSingleMessage(spi, STATE_CHANGE_TOP_VER);
}
- IgniteInternalFuture<?> stateChangeFut = GridTestUtils.runAsync(new Runnable() {
- @Override public void run() {
- ignite(stateChangeFrom).active(!initiallyActive);
- }
- });
+ IgniteInternalFuture<?> stateChangeFut = GridTestUtils.runAsync(() ->
+ ignite(stateChangeFrom).cluster().active(!initiallyActive)
+ );
for (TestRecordingCommunicationSpi spi : spis)
spi.waitForBlocked();
@@ -417,17 +406,15 @@
* @param topVer Exchange topology version.
*/
private void blockExchangeSingleMessage(TestRecordingCommunicationSpi spi, final AffinityTopologyVersion topVer) {
- spi.blockMessages(new IgniteBiPredicate<ClusterNode, Message>() {
- @Override public boolean apply(ClusterNode clusterNode, Message msg) {
- if (msg instanceof GridDhtPartitionsSingleMessage) {
- GridDhtPartitionsSingleMessage pMsg = (GridDhtPartitionsSingleMessage)msg;
+ spi.blockMessages((IgniteBiPredicate<ClusterNode, Message>)(clusterNode, msg) -> {
+ if (msg instanceof GridDhtPartitionsSingleMessage) {
+ GridDhtPartitionsSingleMessage pMsg = (GridDhtPartitionsSingleMessage)msg;
- if (pMsg.exchangeId() != null && pMsg.exchangeId().topologyVersion().equals(topVer))
- return true;
- }
-
- return false;
+ if (pMsg.exchangeId() != null && pMsg.exchangeId().topologyVersion().equals(topVer))
+ return true;
}
+
+ return false;
});
}
@@ -460,16 +447,14 @@
private void joinWhileDeactivate1(final boolean startClient, final boolean withNewCache) throws Exception {
IgniteInternalFuture<?> activeFut = startNodesAndBlockStatusChange(2, 0, 0, true);
- IgniteInternalFuture<?> startFut = GridTestUtils.runAsync(new Callable<Void>() {
- @Override public Void call() throws Exception {
- client = startClient;
+ IgniteInternalFuture<?> startFut = GridTestUtils.runAsync((Callable<Void>)() -> {
+ client = startClient;
- ccfgs = withNewCache ? cacheConfigurations2() : cacheConfigurations1();
+ ccfgs = withNewCache ? cacheConfigurations2() : cacheConfigurations1();
- startGrid(2);
+ startGrid(2);
- return null;
- }
+ return null;
});
TestRecordingCommunicationSpi spi1 = TestRecordingCommunicationSpi.spi(ignite(1));
@@ -481,7 +466,7 @@
checkNoCaches(3);
- ignite(2).active(true);
+ ignite(2).cluster().active(true);
for (int c = 0; c < 2; c++)
checkCache(ignite(2), CACHE_NAME_PREFIX + c, true);
@@ -529,30 +514,26 @@
final CyclicBarrier b = new CyclicBarrier(START_NODES + 1);
- IgniteInternalFuture<?> fut1 = GridTestUtils.runAsync(new Callable<Void>() {
- @Override public Void call() throws Exception {
- b.await();
+ IgniteInternalFuture<Void> fut1 = GridTestUtils.runAsync(() -> {
+ b.await();
- Thread.sleep(ThreadLocalRandom.current().nextLong(100) + 1);
+ U.sleep(ThreadLocalRandom.current().nextLong(100) + 1);
- ignite(0).active(true);
+ ignite(0).cluster().active(true);
- return null;
- }
+ return null;
});
final AtomicInteger nodeIdx = new AtomicInteger(3);
- IgniteInternalFuture<?> fut2 = GridTestUtils.runMultiThreadedAsync(new Callable<Void>() {
- @Override public Void call() throws Exception {
- int idx = nodeIdx.getAndIncrement();
+ IgniteInternalFuture<Long> fut2 = GridTestUtils.runMultiThreadedAsync((Callable<Void>)() -> {
+ int idx = nodeIdx.getAndIncrement();
- b.await();
+ b.await();
- startGrid(idx);
+ startGrid(idx);
- return null;
- }
+ return null;
}, START_NODES, "start-node");
fut1.get();
@@ -619,19 +600,19 @@
}
if (persistenceEnabled())
- ignite(deactivateFrom).active(true);
+ ignite(deactivateFrom).cluster().active(true);
- ignite(deactivateFrom).active(true); // Should be no-op.
+ ignite(deactivateFrom).cluster().active(true); // Should be no-op.
checkCaches(srvs + clients, CACHES);
for (int i = 0; i < srvs + clients; i++)
- assertTrue(ignite(i).active());
+ assertTrue(ignite(i).cluster().active());
- ignite(deactivateFrom).active(false);
+ ignite(deactivateFrom).cluster().active(false);
for (int i = 0; i < srvs + clients; i++)
- assertFalse(ignite(i).active());
+ assertFalse(ignite(i).cluster().active());
checkNoCaches(srvs + clients);
@@ -648,12 +629,12 @@
checkNoCaches(srvs + clients + 2);
for (int i = 0; i < srvs + clients + 2; i++)
- assertFalse(ignite(i).active());
+ assertFalse(ignite(i).cluster().active());
- ignite(deactivateFrom).active(true);
+ ignite(deactivateFrom).cluster().active(true);
for (int i = 0; i < srvs + clients + 2; i++) {
- assertTrue(ignite(i).active());
+ assertTrue(ignite(i).cluster().active());
checkCache(ignite(i), CU.UTILITY_CACHE_NAME, true);
}
@@ -695,7 +676,7 @@
startWithCaches1(SRVS, CLIENTS);
if (persistenceEnabled())
- ignite(0).active(true);
+ ignite(0).cluster().active(true);
Ignite srv = ignite(0);
Ignite client = ignite(SRVS);
@@ -741,7 +722,7 @@
checkNoCaches(SRVS + CLIENTS);
- ignite(0).active(true);
+ ignite(0).cluster().active(true);
checkCache(client, CU.UTILITY_CACHE_NAME, true);
@@ -789,39 +770,38 @@
IgniteEx client = grid(SRVS);
if (persistenceEnabled())
- ignite(0).active(true);
+ ignite(0).cluster().active(true);
checkCache(client, CU.UTILITY_CACHE_NAME, true);
checkCaches1(SRVS + CLIENTS);
+ // Wait for late affinity assignment to finish.
+ grid(0).context().cache().context().exchange().affinityReadyFuture(
+ new AffinityTopologyVersion(SRVS + CLIENTS, 1)).get();
+
final AffinityTopologyVersion STATE_CHANGE_TOP_VER = new AffinityTopologyVersion(SRVS + CLIENTS + 1, 1);
final TestRecordingCommunicationSpi spi1 = transition ? TestRecordingCommunicationSpi.spi(ignite(1)) : null;
final AtomicReference<IgniteInternalFuture> stateFut = new AtomicReference<>();
- IgniteClientReconnectAbstractTest.reconnectClientNode(log, client, srv, new Runnable() {
- @Override public void run() {
- if (transition) {
- blockExchangeSingleMessage(spi1, STATE_CHANGE_TOP_VER);
+ IgniteClientReconnectAbstractTest.reconnectClientNode(log, client, srv, () -> {
+ if (transition) {
+ blockExchangeSingleMessage(spi1, STATE_CHANGE_TOP_VER);
- stateFut.set(GridTestUtils.runAsync(new Runnable() {
- @Override public void run() {
- srv.active(false);
- }
- }, "deactivate"));
+ stateFut.set(GridTestUtils.runAsync(() -> srv.cluster().active(false),
+ "deactivate"));
- try {
- U.sleep(500);
- }
- catch (Exception e) {
- e.printStackTrace();
- }
+ try {
+ U.sleep(500);
}
- else
- srv.active(false);
+ catch (Exception e) {
+ e.printStackTrace();
+ }
}
+ else
+ srv.cluster().active(false);
});
if (transition) {
@@ -839,11 +819,11 @@
checkNoCaches(SRVS + CLIENTS);
- ignite(0).active(true);
+ ignite(0).cluster().active(true);
checkCache(client, CU.UTILITY_CACHE_NAME, true);
- assertTrue(client.active());
+ assertTrue(client.cluster().active());
checkCaches1(SRVS + CLIENTS);
@@ -900,27 +880,22 @@
final AtomicReference<IgniteInternalFuture> stateFut = new AtomicReference<>();
- IgniteClientReconnectAbstractTest.reconnectClientNode(log, client, srv, new Runnable() {
- @Override public void run() {
- if (transition) {
- blockExchangeSingleMessage(spi1, STATE_CHANGE_TOP_VER);
+ IgniteClientReconnectAbstractTest.reconnectClientNode(log, client, srv, () -> {
+ if (transition) {
+ blockExchangeSingleMessage(spi1, STATE_CHANGE_TOP_VER);
- stateFut.set(GridTestUtils.runAsync(new Runnable() {
- @Override public void run() {
- srv.active(true);
- }
- }, "activate"));
+ stateFut.set(GridTestUtils.runAsync(() -> srv.cluster().active(true),
+ "activate"));
- try {
- U.sleep(500);
- }
- catch (Exception e) {
- e.printStackTrace();
- }
+ try {
+ U.sleep(500);
}
- else
- srv.active(true);
+ catch (Exception e) {
+ e.printStackTrace();
+ }
}
+ else
+ srv.cluster().active(true);
});
if (transition) {
@@ -989,7 +964,7 @@
checkRecordedMessages(false);
- ignite(0).active(true);
+ ignite(0).cluster().active(true);
checkCaches1(SRVS + CLIENTS);
@@ -1033,12 +1008,10 @@
client = false;
// Start one more node while transition is in progress.
- IgniteInternalFuture startFut = GridTestUtils.runAsync(new Callable() {
- @Override public Object call() throws Exception {
- startGrid(8);
+ IgniteInternalFuture<Void> startFut = GridTestUtils.runAsync(() -> {
+ startGrid(8);
- return null;
- }
+ return null;
}, "start-node");
U.sleep(500);
@@ -1061,7 +1034,7 @@
if (!activate) {
checkNoCaches(9);
- ignite(0).active(true);
+ ignite(0).cluster().active(true);
}
checkCaches1(9);
@@ -1092,19 +1065,16 @@
client = false;
// Start more nodes while transition is in progress.
- IgniteInternalFuture startFut1 = GridTestUtils.runAsync(new Callable() {
- @Override public Object call() throws Exception {
- startGrid(8);
+ IgniteInternalFuture<Void> startFut1 = GridTestUtils.runAsync(() -> {
+ startGrid(8);
- return null;
- }
+ return null;
}, "start-node1");
- IgniteInternalFuture startFut2 = GridTestUtils.runAsync(new Callable() {
- @Override public Object call() throws Exception {
- startGrid(9);
- return null;
- }
+ IgniteInternalFuture<Void> startFut2 = GridTestUtils.runAsync(() -> {
+ startGrid(9);
+
+ return null;
}, "start-node2");
U.sleep(500);
@@ -1132,7 +1102,7 @@
if (!activate) {
checkNoCaches(10);
- ignite(0).active(true);
+ ignite(0).cluster().active(true);
}
checkCaches1(10);
@@ -1214,7 +1184,7 @@
((IgniteEx)node).context().state().publicApiActiveState(true);
- GridCacheAdapter cache = ((IgniteKernal)node).context().cache().internalCache(cacheName);
+ GridCacheAdapter cache = ((IgniteEx)node).context().cache().internalCache(cacheName);
if (exp)
assertNotNull("Cache not found [cache=" + cacheName + ", node=" + node.name() + ']', cache);
@@ -1229,7 +1199,7 @@
for (int i = 0; i < nodes; i++) {
grid(i).context().state().publicApiActiveState(true);
- GridCacheProcessor cache = ((IgniteKernal)ignite(i)).context().cache();
+ GridCacheProcessor cache = ((IgniteEx)ignite(i)).context().cache();
assertTrue(cache.caches().isEmpty());
assertTrue(cache.internalCaches().isEmpty());