Merge branch 'cassandra-3.0' into cassandra-3.11
diff --git a/CHANGES.txt b/CHANGES.txt
index 0f0b17a..7297b3d 100644
--- a/CHANGES.txt
+++ b/CHANGES.txt
@@ -2,6 +2,7 @@
  * Creating of a keyspace on insufficient number of replicas should filter out gosspping-only members (CASSANDRA-17759)
  * Only use statically defined subcolumns when determining column definition for supercolumn cell (CASSANDRA-14113)
 Merged from 3.0:
+ * Fix restarting of services on gossipping-only member (CASSANDRA-17752)
  * Fix writetime and ttl functions forbidden for collections instead of multicell columns (CASSANDRA-17628)
  * Supress CVE-2020-7238 (CASSANDRA-17697)
  * Fix issue where frozen maps may not be serialized in the correct order (CASSANDRA-17623)
diff --git a/src/java/org/apache/cassandra/service/StorageService.java b/src/java/org/apache/cassandra/service/StorageService.java
index 8c08437..6c403d1 100644
--- a/src/java/org/apache/cassandra/service/StorageService.java
+++ b/src/java/org/apache/cassandra/service/StorageService.java
@@ -354,7 +354,7 @@
     {
         if (gossipActive)
         {
-            if (!isNormal())
+            if (!isNormal() && joinRing)
                 throw new IllegalStateException("Unable to stop gossip because the node is not in the normal state. Try to stop the node instead.");
 
             logger.warn("Stopping gossip by operator request");
@@ -4846,7 +4846,7 @@
         if (isShutdown()) // do not rely on operationMode in case it gets changed to decomissioned or other
             throw new IllegalStateException(String.format("Unable to start %s because the node was drained.", service));
 
-        if (!isNormal())
+        if (!isNormal() && joinRing) // if the node is not joining the ring, it is gossipping-only member which is in STARTING state forever
             throw new IllegalStateException(String.format("Unable to start %s because the node is not in the normal state.", service));
     }
 
diff --git a/test/distributed/org/apache/cassandra/distributed/test/GossipTest.java b/test/distributed/org/apache/cassandra/distributed/test/GossipTest.java
index 0cff2b0..25bb973 100644
--- a/test/distributed/org/apache/cassandra/distributed/test/GossipTest.java
+++ b/test/distributed/org/apache/cassandra/distributed/test/GossipTest.java
@@ -50,9 +50,14 @@
 
 import static net.bytebuddy.matcher.ElementMatchers.named;
 import static net.bytebuddy.matcher.ElementMatchers.takesArguments;
+import static org.apache.cassandra.distributed.action.GossipHelper.withProperty;
 import static org.apache.cassandra.distributed.api.Feature.GOSSIP;
 import static org.apache.cassandra.distributed.api.Feature.NETWORK;
+import static org.apache.cassandra.distributed.api.TokenSupplier.evenlyDistributedTokens;
+import static org.apache.cassandra.distributed.shared.NetworkTopology.singleDcNetworkTopology;
 import static org.junit.Assert.assertEquals;
+import static org.junit.Assert.assertFalse;
+import static org.junit.Assert.assertTrue;
 
 public class GossipTest extends TestBaseImpl
 {
@@ -266,6 +271,37 @@
         }
     }
 
+    @Test
+    public void restartGossipOnGossippingOnlyMember() throws Throwable
+    {
+        int originalNodeCount = 1;
+        int expandedNodeCount = originalNodeCount + 1;
+
+        try (Cluster cluster = builder().withNodes(originalNodeCount)
+                                        .withTokenSupplier(evenlyDistributedTokens(expandedNodeCount, 1))
+                                        .withNodeIdTopology(singleDcNetworkTopology(expandedNodeCount, "dc0", "rack0"))
+                                        .withConfig(config -> config.with(NETWORK, GOSSIP))
+                                        .start())
+        {
+            IInstanceConfig config = cluster.newInstanceConfig();
+            IInvokableInstance gossippingOnlyMember = cluster.bootstrap(config);
+            withProperty("cassandra.join_ring", Boolean.toString(false), () -> gossippingOnlyMember.startup(cluster));
+
+            assertTrue(gossippingOnlyMember.callOnInstance((IIsolatedExecutor.SerializableCallable<Boolean>)
+                                                           () -> StorageService.instance.isGossipRunning()));
+
+            gossippingOnlyMember.runOnInstance((IIsolatedExecutor.SerializableRunnable) () -> StorageService.instance.stopGossiping());
+
+            assertFalse(gossippingOnlyMember.callOnInstance((IIsolatedExecutor.SerializableCallable<Boolean>)
+                                                            () -> StorageService.instance.isGossipRunning()));
+
+            gossippingOnlyMember.runOnInstance((IIsolatedExecutor.SerializableRunnable) () -> StorageService.instance.startGossiping());
+
+            assertTrue(gossippingOnlyMember.callOnInstance((IIsolatedExecutor.SerializableCallable<Boolean>)
+                                                           () -> StorageService.instance.isGossipRunning()));
+        }
+    }
+
     void assertPendingRangesForPeer(final boolean expectPending, final InetAddress movingAddress, final Cluster cluster)
     {
         for (IInvokableInstance inst : new IInvokableInstance[]{ cluster.get(1), cluster.get(3)})
diff --git a/test/distributed/org/apache/cassandra/distributed/test/NativeProtocolTest.java b/test/distributed/org/apache/cassandra/distributed/test/NativeProtocolTest.java
index 92751ef..a8bf5eb 100644
--- a/test/distributed/org/apache/cassandra/distributed/test/NativeProtocolTest.java
+++ b/test/distributed/org/apache/cassandra/distributed/test/NativeProtocolTest.java
@@ -26,14 +26,24 @@
 import com.datastax.driver.core.Session;
 import com.datastax.driver.core.SimpleStatement;
 import com.datastax.driver.core.Statement;
+import org.apache.cassandra.distributed.Cluster;
 import org.apache.cassandra.distributed.api.ICluster;
+import org.apache.cassandra.distributed.api.IInstanceConfig;
+import org.apache.cassandra.distributed.api.IInvokableInstance;
+import org.apache.cassandra.distributed.api.IIsolatedExecutor;
 import org.apache.cassandra.distributed.impl.RowUtil;
+import org.apache.cassandra.service.StorageService;
 
+import static org.apache.cassandra.distributed.action.GossipHelper.withProperty;
 import static org.apache.cassandra.distributed.api.Feature.GOSSIP;
 import static org.apache.cassandra.distributed.api.Feature.NATIVE_PROTOCOL;
 import static org.apache.cassandra.distributed.api.Feature.NETWORK;
+import static org.apache.cassandra.distributed.api.TokenSupplier.evenlyDistributedTokens;
 import static org.apache.cassandra.distributed.shared.AssertUtils.assertRows;
 import static org.apache.cassandra.distributed.shared.AssertUtils.row;
+import static org.apache.cassandra.distributed.shared.NetworkTopology.singleDcNetworkTopology;
+import static org.junit.Assert.assertFalse;
+import static org.junit.Assert.assertTrue;
 
 // TODO: this test should be removed after running in-jvm dtests is set up via the shared API repository
 public class NativeProtocolTest extends TestBaseImpl
@@ -78,4 +88,35 @@
             cluster.close();
         }
     }
+
+    @Test
+    public void restartTransportOnGossippingOnlyMember() throws Throwable
+    {
+        int originalNodeCount = 1;
+        int expandedNodeCount = originalNodeCount + 1;
+
+        try (Cluster cluster = builder().withNodes(originalNodeCount)
+                                        .withTokenSupplier(evenlyDistributedTokens(expandedNodeCount, 1))
+                                        .withNodeIdTopology(singleDcNetworkTopology(expandedNodeCount, "dc0", "rack0"))
+                                        .withConfig(config -> config.with(NETWORK, GOSSIP, NATIVE_PROTOCOL))
+                                        .start())
+        {
+            IInstanceConfig config = cluster.newInstanceConfig();
+            IInvokableInstance gossippingOnlyMember = cluster.bootstrap(config);
+            withProperty("cassandra.join_ring", Boolean.toString(false), () -> gossippingOnlyMember.startup(cluster));
+
+            assertTrue(gossippingOnlyMember.callOnInstance((IIsolatedExecutor.SerializableCallable<Boolean>)
+                                                           () -> StorageService.instance.isNativeTransportRunning()));
+
+            gossippingOnlyMember.runOnInstance((IIsolatedExecutor.SerializableRunnable) () -> StorageService.instance.stopNativeTransport());
+
+            assertFalse(gossippingOnlyMember.callOnInstance((IIsolatedExecutor.SerializableCallable<Boolean>)
+                                                            () -> StorageService.instance.isNativeTransportRunning()));
+
+            gossippingOnlyMember.runOnInstance((IIsolatedExecutor.SerializableRunnable) () -> StorageService.instance.startNativeTransport());
+
+            assertTrue(gossippingOnlyMember.callOnInstance((IIsolatedExecutor.SerializableCallable<Boolean>)
+                                                           () -> StorageService.instance.isNativeTransportRunning()));
+        }
+    }
 }
\ No newline at end of file
diff --git a/test/distributed/org/apache/cassandra/distributed/test/ThriftClientTest.java b/test/distributed/org/apache/cassandra/distributed/test/ThriftClientTest.java
deleted file mode 100644
index 0548e47..0000000
--- a/test/distributed/org/apache/cassandra/distributed/test/ThriftClientTest.java
+++ /dev/null
@@ -1,69 +0,0 @@
-/*
- * Licensed to the Apache Software Foundation (ASF) under one
- * or more contributor license agreements.  See the NOTICE file
- * distributed with this work for additional information
- * regarding copyright ownership.  The ASF licenses this file
- * to you under the Apache License, Version 2.0 (the
- * "License"); you may not use this file except in compliance
- * with the License.  You may obtain a copy of the License at
- *
- *     http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-
-package org.apache.cassandra.distributed.test;
-
-import java.io.IOException;
-import java.util.Arrays;
-import java.util.Collections;
-
-import org.junit.Test;
-
-import org.apache.cassandra.db.marshal.CompositeType;
-import org.apache.cassandra.distributed.Cluster;
-import org.apache.cassandra.distributed.api.ConsistencyLevel;
-import org.apache.cassandra.distributed.api.Feature;
-import org.apache.cassandra.distributed.api.QueryResults;
-import org.apache.cassandra.distributed.api.SimpleQueryResult;
-import org.apache.cassandra.distributed.shared.AssertUtils;
-import org.apache.cassandra.thrift.Column;
-import org.apache.cassandra.thrift.ColumnOrSuperColumn;
-import org.apache.cassandra.thrift.Mutation;
-import org.apache.cassandra.utils.ByteBufferUtil;
-import org.apache.thrift.TException;
-
-public class ThriftClientTest extends TestBaseImpl
-{
-    @Test
-    public void writeThenReadCQL() throws IOException, TException
-    {
-        try (Cluster cluster = init(Cluster.build(1).withConfig(c -> c.with(Feature.NATIVE_PROTOCOL)).start()))
-        {
-            cluster.schemaChange("CREATE TABLE " + KEYSPACE + ".tbl (pk int, value int, PRIMARY KEY (pk))");
-
-            ThriftClientUtils.thriftClient(cluster.get(1), thrift -> {
-                thrift.set_keyspace(KEYSPACE);
-                Mutation mutation = new Mutation();
-                ColumnOrSuperColumn csoc = new ColumnOrSuperColumn();
-                Column column = new Column();
-                column.setName(CompositeType.build(ByteBufferUtil.bytes("value")));
-                column.setValue(ByteBufferUtil.bytes(0));
-                column.setTimestamp(System.currentTimeMillis());
-                csoc.setColumn(column);
-                mutation.setColumn_or_supercolumn(csoc);
-
-                thrift.batch_mutate(Collections.singletonMap(ByteBufferUtil.bytes(0),
-                                                             Collections.singletonMap("tbl", Arrays.asList(mutation))),
-                                    org.apache.cassandra.thrift.ConsistencyLevel.ALL);
-            });
-
-            SimpleQueryResult qr = cluster.coordinator(1).executeWithResult("SELECT * FROM " + KEYSPACE + ".tbl", ConsistencyLevel.ALL);
-            AssertUtils.assertRows(qr, QueryResults.builder().row(0, 0).build());
-        }
-    }
-}
diff --git a/test/distributed/org/apache/cassandra/distributed/test/ThriftTest.java b/test/distributed/org/apache/cassandra/distributed/test/ThriftTest.java
new file mode 100644
index 0000000..cde1bdd
--- /dev/null
+++ b/test/distributed/org/apache/cassandra/distributed/test/ThriftTest.java
@@ -0,0 +1,113 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.cassandra.distributed.test;
+
+import java.io.IOException;
+import java.util.Arrays;
+import java.util.Collections;
+
+import org.junit.Test;
+
+import org.apache.cassandra.db.marshal.CompositeType;
+import org.apache.cassandra.distributed.Cluster;
+import org.apache.cassandra.distributed.api.ConsistencyLevel;
+import org.apache.cassandra.distributed.api.Feature;
+import org.apache.cassandra.distributed.api.IInstanceConfig;
+import org.apache.cassandra.distributed.api.IInvokableInstance;
+import org.apache.cassandra.distributed.api.IIsolatedExecutor;
+import org.apache.cassandra.distributed.api.QueryResults;
+import org.apache.cassandra.distributed.api.SimpleQueryResult;
+import org.apache.cassandra.distributed.shared.AssertUtils;
+import org.apache.cassandra.service.StorageService;
+import org.apache.cassandra.thrift.Column;
+import org.apache.cassandra.thrift.ColumnOrSuperColumn;
+import org.apache.cassandra.thrift.Mutation;
+import org.apache.cassandra.utils.ByteBufferUtil;
+import org.apache.thrift.TException;
+
+import static org.apache.cassandra.distributed.action.GossipHelper.withProperty;
+import static org.apache.cassandra.distributed.api.Feature.GOSSIP;
+import static org.apache.cassandra.distributed.api.Feature.NATIVE_PROTOCOL;
+import static org.apache.cassandra.distributed.api.Feature.NETWORK;
+import static org.apache.cassandra.distributed.api.TokenSupplier.evenlyDistributedTokens;
+import static org.apache.cassandra.distributed.shared.NetworkTopology.singleDcNetworkTopology;
+import static org.junit.Assert.assertFalse;
+import static org.junit.Assert.assertTrue;
+
+public class ThriftTest extends TestBaseImpl
+{
+    @Test
+    public void writeThenReadCQL() throws IOException, TException
+    {
+        try (Cluster cluster = init(Cluster.build(1).withConfig(c -> c.with(Feature.NATIVE_PROTOCOL)).start()))
+        {
+            cluster.schemaChange("CREATE TABLE " + KEYSPACE + ".tbl (pk int, value int, PRIMARY KEY (pk))");
+
+            ThriftClientUtils.thriftClient(cluster.get(1), thrift -> {
+                thrift.set_keyspace(KEYSPACE);
+                Mutation mutation = new Mutation();
+                ColumnOrSuperColumn csoc = new ColumnOrSuperColumn();
+                Column column = new Column();
+                column.setName(CompositeType.build(ByteBufferUtil.bytes("value")));
+                column.setValue(ByteBufferUtil.bytes(0));
+                column.setTimestamp(System.currentTimeMillis());
+                csoc.setColumn(column);
+                mutation.setColumn_or_supercolumn(csoc);
+
+                thrift.batch_mutate(Collections.singletonMap(ByteBufferUtil.bytes(0),
+                                                             Collections.singletonMap("tbl", Arrays.asList(mutation))),
+                                    org.apache.cassandra.thrift.ConsistencyLevel.ALL);
+            });
+
+            SimpleQueryResult qr = cluster.coordinator(1).executeWithResult("SELECT * FROM " + KEYSPACE + ".tbl", ConsistencyLevel.ALL);
+            AssertUtils.assertRows(qr, QueryResults.builder().row(0, 0).build());
+        }
+    }
+
+    @Test
+    public void restartThriftOnGossippingOnlyMember() throws Throwable
+    {
+        int originalNodeCount = 1;
+        int expandedNodeCount = originalNodeCount + 1;
+
+        try (Cluster cluster = builder().withNodes(originalNodeCount)
+                                        .withTokenSupplier(evenlyDistributedTokens(expandedNodeCount, 1))
+                                        .withNodeIdTopology(singleDcNetworkTopology(expandedNodeCount, "dc0", "rack0"))
+                                        .withConfig(config -> config.with(NETWORK, GOSSIP, NATIVE_PROTOCOL))
+                                        .start())
+        {
+            IInstanceConfig config = cluster.newInstanceConfig();
+            IInvokableInstance gossippingOnlyMember = cluster.bootstrap(config);
+            withProperty("cassandra.join_ring", Boolean.toString(false), () -> gossippingOnlyMember.startup(cluster));
+
+            assertTrue(gossippingOnlyMember.callOnInstance((IIsolatedExecutor.SerializableCallable<Boolean>)
+                                                           () -> StorageService.instance.isRPCServerRunning()));
+
+            gossippingOnlyMember.runOnInstance((IIsolatedExecutor.SerializableRunnable) () -> StorageService.instance.stopRPCServer());
+
+            assertFalse(gossippingOnlyMember.callOnInstance((IIsolatedExecutor.SerializableCallable<Boolean>)
+                                                            () -> StorageService.instance.isRPCServerRunning()));
+
+            gossippingOnlyMember.runOnInstance((IIsolatedExecutor.SerializableRunnable) () -> StorageService.instance.startRPCServer());
+
+            assertTrue(gossippingOnlyMember.callOnInstance((IIsolatedExecutor.SerializableCallable<Boolean>)
+                                                           () -> StorageService.instance.isRPCServerRunning()));
+        }
+    }
+}