IGNITE-14575 Write to DMS must throw error, if client is not in topology - Fixes #9012.

Signed-off-by: Sergey Chugunov <sergey.chugunov@gmail.com>
diff --git a/modules/core/src/main/java/org/apache/ignite/internal/IgnitionEx.java b/modules/core/src/main/java/org/apache/ignite/internal/IgnitionEx.java
index e123278..f63b861 100644
--- a/modules/core/src/main/java/org/apache/ignite/internal/IgnitionEx.java
+++ b/modules/core/src/main/java/org/apache/ignite/internal/IgnitionEx.java
@@ -357,8 +357,9 @@
             grid.starterThread.interrupt();
         }
 
-        if (grid != null && grid.state() == STARTED) {
-            grid.stop(cancel, shutdown);
+        if (grid != null) {
+            if (grid.state() == STARTED)
+                grid.stop(cancel, shutdown);
 
             boolean fireEvt;
 
diff --git a/modules/core/src/main/java/org/apache/ignite/internal/managers/discovery/GridDiscoveryManager.java b/modules/core/src/main/java/org/apache/ignite/internal/managers/discovery/GridDiscoveryManager.java
index d9ebb48..e7ddd3e 100644
--- a/modules/core/src/main/java/org/apache/ignite/internal/managers/discovery/GridDiscoveryManager.java
+++ b/modules/core/src/main/java/org/apache/ignite/internal/managers/discovery/GridDiscoveryManager.java
@@ -1866,7 +1866,7 @@
         return discoCache().allNodes();
     }
 
-    /** @return all alive server nodes is topology */
+    /** @return all alive server nodes in topology */
     public Collection<ClusterNode> aliveServerNodes() {
         return discoCache().aliveServerNodes();
     }
diff --git a/modules/core/src/main/java/org/apache/ignite/spi/discovery/tcp/ClientImpl.java b/modules/core/src/main/java/org/apache/ignite/spi/discovery/tcp/ClientImpl.java
index 55adeda..794382de 100644
--- a/modules/core/src/main/java/org/apache/ignite/spi/discovery/tcp/ClientImpl.java
+++ b/modules/core/src/main/java/org/apache/ignite/spi/discovery/tcp/ClientImpl.java
@@ -507,12 +507,12 @@
     @Override public void sendCustomEvent(DiscoverySpiCustomMessage evt) {
         State state = this.state;
 
-        if (state == SEGMENTED)
-            throw new IgniteException("Failed to send custom message: client is segmented.");
-
         if (state == DISCONNECTED)
             throw new IgniteClientDisconnectedException(null, "Failed to send custom message: client is disconnected.");
 
+        if (state == STOPPED || state == SEGMENTED || state == STARTING)
+            throw new IgniteException("Failed to send custom message: client is " + state.name().toLowerCase() + ".");
+
         try {
             TcpDiscoveryCustomEventMessage msg;
 
diff --git a/modules/core/src/test/java/org/apache/ignite/internal/processors/metastorage/DistributedMetaStorageTest.java b/modules/core/src/test/java/org/apache/ignite/internal/processors/metastorage/DistributedMetaStorageTest.java
index 298e202..a02ae06 100644
--- a/modules/core/src/test/java/org/apache/ignite/internal/processors/metastorage/DistributedMetaStorageTest.java
+++ b/modules/core/src/test/java/org/apache/ignite/internal/processors/metastorage/DistributedMetaStorageTest.java
@@ -21,10 +21,12 @@
 import java.util.Arrays;
 import java.util.Comparator;
 import java.util.UUID;
+import java.util.concurrent.Callable;
 import java.util.concurrent.ThreadLocalRandom;
 import java.util.concurrent.TimeUnit;
 import java.util.concurrent.atomic.AtomicBoolean;
 import java.util.concurrent.atomic.AtomicInteger;
+import org.apache.ignite.IgniteCheckedException;
 import org.apache.ignite.configuration.DataRegionConfiguration;
 import org.apache.ignite.configuration.DataStorageConfiguration;
 import org.apache.ignite.configuration.IgniteConfiguration;
@@ -32,6 +34,8 @@
 import org.apache.ignite.failure.StopNodeFailureHandler;
 import org.apache.ignite.internal.IgniteEx;
 import org.apache.ignite.internal.IgniteInternalFuture;
+import org.apache.ignite.internal.IgniteKernal;
+import org.apache.ignite.internal.IgnitionEx;
 import org.apache.ignite.internal.NodeStoppingException;
 import org.apache.ignite.internal.processors.metastorage.persistence.DistributedMetaStorageImpl;
 import org.apache.ignite.internal.util.typedef.X;
@@ -113,7 +117,7 @@
     /** */
     @After
     public void after() throws Exception {
-        stopAllGrids();
+        stopAllGrids(true, false);
     }
 
     /**
@@ -152,6 +156,48 @@
     }
 
     /**
+     * Test verifies that Distributed Metastorage on client yields error if client is not connected to some cluster.
+     *
+     * @throws Exception If failed.
+     */
+    @Test
+    public void testDistributedMetastorageOperationsOnClient() throws Exception {
+        String clientName = "client0";
+
+        String key = "key";
+        String value = "value";
+
+        GridTestUtils.runAsync(() -> startClientGrid(clientName));
+
+        boolean dmsStarted = GridTestUtils.waitForCondition(() -> {
+            try {
+                IgniteKernal clientGrid = IgnitionEx.gridx(clientName);
+
+                return clientGrid != null && clientGrid.context().distributedMetastorage() != null;
+            }
+            catch (Exception ignored) {
+                return false;
+            }
+        }, 20_000);
+
+        assertTrue(dmsStarted);
+
+        IgniteKernal cl0 = IgnitionEx.gridx("client0");
+
+        final DistributedMetaStorage clDms = cl0.context().distributedMetastorage();
+
+        assertNotNull(clDms);
+
+        GridTestUtils.assertThrows(null, new Callable<Object>() {
+            @Override public Object call() throws Exception {
+                clDms.write(key, value);
+
+                return null;
+            }
+        }, IgniteCheckedException.class, null);
+    }
+
+    /**
      * @throws Exception If failed.
      */
     @Test
diff --git a/modules/core/src/test/java/org/apache/ignite/testframework/junits/GridAbstractTest.java b/modules/core/src/test/java/org/apache/ignite/testframework/junits/GridAbstractTest.java
index 8fec5ba..7b2c317 100755
--- a/modules/core/src/test/java/org/apache/ignite/testframework/junits/GridAbstractTest.java
+++ b/modules/core/src/test/java/org/apache/ignite/testframework/junits/GridAbstractTest.java
@@ -164,6 +164,7 @@
 import static org.apache.ignite.cache.CacheAtomicityMode.TRANSACTIONAL_SNAPSHOT;
 import static org.apache.ignite.cache.CacheWriteSynchronizationMode.FULL_SYNC;
 import static org.apache.ignite.internal.GridKernalState.DISCONNECTED;
+import static org.apache.ignite.internal.IgnitionEx.gridx;
 import static org.apache.ignite.testframework.GridTestUtils.getFieldValue;
 import static org.apache.ignite.testframework.GridTestUtils.getFieldValueHierarchy;
 import static org.apache.ignite.testframework.GridTestUtils.setFieldValue;
@@ -1471,13 +1472,33 @@
     }
 
     /**
+     * Stop grid waiting for it to come up
      * @param igniteInstanceName Ignite instance name.
      * @param cancel Cancel flag.
      * @param awaitTop Await topology change flag.
      */
     protected void stopGrid(@Nullable String igniteInstanceName, boolean cancel, boolean awaitTop) {
+        stopGrid0(igniteInstanceName, cancel, awaitTop, false);
+    }
+
+    /**
+     * Stop grid without waiting for it to come up
+     * @param igniteInstanceName Ignite instance name.
+     * @param cancel Cancel flag.
+     * @param awaitTop Await topology change flag.
+     */
+    protected void stopGridx(@Nullable String igniteInstanceName, boolean cancel, boolean awaitTop) {
+        stopGrid0(igniteInstanceName, cancel, awaitTop, true);
+    }
+
+    /**
+     * @param igniteInstanceName Ignite instance name.
+     * @param cancel Cancel flag.
+     * @param awaitTop Await topology change flag.
+     */
+    private void stopGrid0(@Nullable String igniteInstanceName, boolean cancel, boolean awaitTop, boolean stopNotStarted) {
         try {
-            IgniteEx ignite = grid(igniteInstanceName);
+            IgniteEx ignite = stopNotStarted ? gridx(igniteInstanceName) : grid(igniteInstanceName);
 
             assert ignite != null : "Ignite returned null grid for name: " + igniteInstanceName;
 
@@ -1489,7 +1510,7 @@
                 IgniteUtils.setCurrentIgniteName(igniteInstanceName);
 
                 try {
-                    G.stop(igniteInstanceName, cancel);
+                    IgnitionEx.stop(igniteInstanceName, cancel, null, stopNotStarted);
                 }
                 finally {
                     IgniteUtils.setCurrentIgniteName(null);
@@ -1519,14 +1540,24 @@
     }
 
     /**
+     * Stop all grids waiting for them to start
      * @param cancel Cancel flag.
      */
     protected void stopAllGrids(boolean cancel) {
+        stopAllGrids(cancel, true);
+    }
+
+    /**
+     * Stop all grids
+     * @param cancel Cancel flag.
+     * @param wait Wait for grids to start first.
+     */
+    protected void stopAllGrids(boolean cancel, boolean wait) {
         try {
             Collection<Ignite> clients = new ArrayList<>();
             Collection<Ignite> srvs = new ArrayList<>();
 
-            for (Ignite g : G.allGrids()) {
+            for (Ignite g : wait ? G.allGrids() : IgnitionEx.allGridsx()) {
                 if (g.configuration().getDiscoverySpi().isClientMode())
                     clients.add(g);
                 else
@@ -1534,10 +1565,16 @@
             }
 
             for (Ignite g : clients)
-                stopGrid(g.name(), cancel, false);
+                if (wait)
+                    stopGrid(g.name(), cancel, false);
+                else
+                    stopGridx(g.name(), cancel, false);
 
             for (Ignite g : srvs)
-                stopGrid(g.name(), cancel, false);
+                if (wait)
+                    stopGrid(g.name(), cancel, false);
+                else
+                    stopGridx(g.name(), cancel, false);
 
             List<Ignite> nodes = G.allGrids();
 
diff --git a/modules/zookeeper/src/main/java/org/apache/ignite/spi/discovery/zk/internal/ZookeeperDiscoveryImpl.java b/modules/zookeeper/src/main/java/org/apache/ignite/spi/discovery/zk/internal/ZookeeperDiscoveryImpl.java
index 279aa1d..4eb0a3a 100644
--- a/modules/zookeeper/src/main/java/org/apache/ignite/spi/discovery/zk/internal/ZookeeperDiscoveryImpl.java
+++ b/modules/zookeeper/src/main/java/org/apache/ignite/spi/discovery/zk/internal/ZookeeperDiscoveryImpl.java
@@ -668,6 +668,20 @@
     public void sendCustomMessage(DiscoverySpiCustomMessage msg) {
         assert msg != null;
 
+        List<ClusterNode> nodes = rtState.top.topologySnapshot();
+
+        boolean hasServerNode = false;
+
+        for (int i = 0, size = nodes.size(); i < size; i++) {
+            ClusterNode node = nodes.get(i);
+
+            if (!node.isClient())
+                hasServerNode = true;
+        }
+
+        if (!hasServerNode)
+            throw new IgniteException("Failed to send custom message: no server nodes in topology.");
+
         byte[] msgBytes;
 
         try {