Deprecate CloudstackSnitch and remove duplicate code in snitches

The patch also refactors existing cloud snitches to get rid of the duplicate code,
this is the logical follow-up of CASSANDRA-16555 where AbstractCloudMetadataServiceConnector was introduced.

patch by Stefan Miklosovic; reviewed by Jacek Lewandowski, Jackson Fleming and Maxwell Guo for CASSANDRA-18438
diff --git a/CHANGES.txt b/CHANGES.txt
index 371026c..8198ece 100644
--- a/CHANGES.txt
+++ b/CHANGES.txt
@@ -1,4 +1,5 @@
 5.0
+ * Deprecate CloudstackSnitch and remove duplicate code in snitches (CASSANDRA-18438)
  * Add support for vectors in UDFs (CASSANDRA-18613)
  * Improve vector value validation errors (CASSANDRA-18652)
  * Upgrade Guava to 32.0.1 (CASSANDRA-18645)
diff --git a/NEWS.txt b/NEWS.txt
index 0faea57..db77d36 100644
--- a/NEWS.txt
+++ b/NEWS.txt
@@ -223,6 +223,8 @@
       for partition tombstones. That guardrail is based on the properties `partition_tombstones_warn_threshold` and
       `partition_tombstones_fail_threshold`. The warn threshold has a very similar behaviour to the old config property.
       The old property is still supported for backward compatibility, but now it is disabled by default.
+    - CloudstackSnitch is marked as deprecated and it is not actively maintained anymore. It is scheduled to be removed 
+      in the next major version of Cassandra.
 
 4.1
 ===
diff --git a/conf/cassandra-rackdc.properties b/conf/cassandra-rackdc.properties
index 2d17808..588d6a7 100644
--- a/conf/cassandra-rackdc.properties
+++ b/conf/cassandra-rackdc.properties
@@ -19,8 +19,7 @@
 dc=dc1
 rack=rack1
 
-# Add a suffix to a datacenter name. Used by the Ec2Snitch and Ec2MultiRegionSnitch
-# to append a string to the EC2 region name.
+# Add a suffix to a datacenter name. Used by all cloud-based snitches.
 #dc_suffix=
 
 # Uncomment the following line to make this snitch prefer the internal ip when possible, as the Ec2MultiRegionSnitch does.
@@ -43,3 +42,15 @@
 # If AWS IMDS of v2 is configured, ec2_metadata_token_ttl_seconds says how many seconds a token will be valid until
 # it is refreshed. Defaults to 21600. Can not be smaller than 30 and bigger than 21600. Has to be an integer.
 # ec2_metadata_token_ttl_seconds=21600
+
+# For all cloud-based snitches, there are following options available:
+#
+# Property to change metadata service url for a cloud-based snitch. The endpoint of a particular
+# snitch will be appended to this property. A user is not normally using this property, it is here
+# for tweaking the url of a service itself, e.g. for testing purposes.
+# metadata_url=http://some/service
+#
+# Sets a specified timeout value, in duration format, to be used when opening a communications link to metadata service,
+# referenced by an URLConnection. The timeout of zero (0s) is interpreted as an infinite timeout.
+# Defaults to 30 seconds.
+# metadata_request_timeout=30s
diff --git a/conf/cassandra.yaml b/conf/cassandra.yaml
index b8aa59e..942812a 100644
--- a/conf/cassandra.yaml
+++ b/conf/cassandra.yaml
@@ -1298,6 +1298,17 @@
 #    Proximity is determined by rack and data center, which are
 #    explicitly configured in cassandra-topology.properties.
 #
+# AlibabaCloudSnitch:
+#    Snitch for getting dc and rack of a node from metadata service of Alibaba cloud.
+#    This snitch that assumes an ECS region is a DC and an ECS availability_zone is a rack.
+#
+# CloudstackSnitch:
+#    A snitch that assumes a Cloudstack Zone follows the typical convention 
+#    country-location-az and uses a country/location tuple as a datacenter
+#    and the availability zone as a rack.
+#    WARNING: This snitch is deprecated and it is scheduled to be removed 
+#    in the next major version of Cassandra.
+#
 # Ec2Snitch:
 #    Appropriate for EC2 deployments in a single Region. Loads Region
 #    and Availability Zone information from the EC2 API. The Region is
@@ -1313,6 +1324,10 @@
 #    traffic, Cassandra will switch to the private IP after
 #    establishing a connection.)
 #
+# GoogleCloudSnitch:
+#    Snitch for getting dc and rack of a node from metadata service of Google cloud.
+#    This snitch that assumes an GCE region is a DC and an GCE availability_zone is a rack.
+#
 # RackInferringSnitch:
 #    Proximity is determined by rack and data center, which are
 #    assumed to correspond to the 3rd and 2nd octet of each node's IP
diff --git a/src/java/org/apache/cassandra/locator/AbstractCloudMetadataServiceConnector.java b/src/java/org/apache/cassandra/locator/AbstractCloudMetadataServiceConnector.java
index 6210ce4..026ab4f 100644
--- a/src/java/org/apache/cassandra/locator/AbstractCloudMetadataServiceConnector.java
+++ b/src/java/org/apache/cassandra/locator/AbstractCloudMetadataServiceConnector.java
@@ -22,30 +22,72 @@
 import java.io.IOException;
 import java.io.InputStream;
 import java.net.HttpURLConnection;
+import java.net.MalformedURLException;
+import java.net.URISyntaxException;
 import java.net.URL;
 import java.util.Map;
 
 import com.google.common.collect.ImmutableMap;
 
+import org.apache.cassandra.config.CassandraRelevantProperties;
+import org.apache.cassandra.config.DurationSpec;
+import org.apache.cassandra.exceptions.ConfigurationException;
+
+import static java.lang.String.format;
 import static java.nio.charset.StandardCharsets.UTF_8;
 
 abstract class AbstractCloudMetadataServiceConnector
 {
+    static final String METADATA_URL_PROPERTY = "metadata_url";
+    static final String METADATA_REQUEST_TIMEOUT_PROPERTY = "metadata_request_timeout";
+    static final String DEFAULT_METADATA_REQUEST_TIMEOUT = "30s";
+
     protected final String metadataServiceUrl;
+    protected final int requestTimeoutMs;
 
-    protected AbstractCloudMetadataServiceConnector(String metadataServiceUrl)
+    public AbstractCloudMetadataServiceConnector(SnitchProperties properties)
     {
-        this.metadataServiceUrl = metadataServiceUrl;
+        String parsedMetadataServiceUrl = properties.get(METADATA_URL_PROPERTY, null);
+
+        try
+        {
+            URL url = new URL(parsedMetadataServiceUrl);
+            url.toURI();
+
+            this.metadataServiceUrl = parsedMetadataServiceUrl;
+        }
+        catch (MalformedURLException | IllegalArgumentException | URISyntaxException ex)
+        {
+            throw new ConfigurationException(format("Snitch metadata service URL '%s' is invalid. Please review snitch properties " +
+                                                    "defined in the configured '%s' configuration file.",
+                                                    parsedMetadataServiceUrl,
+                                                    CassandraRelevantProperties.CASSANDRA_RACKDC_PROPERTIES.getKey()),
+                                             ex);
+        }
+
+
+        String metadataRequestTimeout = properties.get(METADATA_REQUEST_TIMEOUT_PROPERTY, DEFAULT_METADATA_REQUEST_TIMEOUT);
+
+        try
+        {
+            this.requestTimeoutMs = new DurationSpec.IntMillisecondsBound(metadataRequestTimeout).toMilliseconds();
+        }
+        catch (IllegalArgumentException ex)
+        {
+            throw new ConfigurationException(format("%s as value of %s is invalid duration! " + ex.getMessage(),
+                                                    metadataRequestTimeout,
+                                                    METADATA_REQUEST_TIMEOUT_PROPERTY));
+        }
     }
 
-    public String apiCall(String query) throws IOException
+    public final String apiCall(String query) throws IOException
     {
-        return apiCall(metadataServiceUrl, query, 200);
+        return apiCall(metadataServiceUrl, query, "GET", ImmutableMap.of(), 200);
     }
 
-    public String apiCall(String url, String query, int expectedResponseCode) throws IOException
+    public final String apiCall(String query, Map<String, String> extraHeaders) throws IOException
     {
-        return apiCall(url, query, "GET", ImmutableMap.of(), expectedResponseCode);
+        return apiCall(metadataServiceUrl, query, "GET", extraHeaders, 200);
     }
 
     public String apiCall(String url,
@@ -61,6 +103,7 @@
             conn = (HttpURLConnection) new URL(url + query).openConnection();
             extraHeaders.forEach(conn::setRequestProperty);
             conn.setRequestMethod(method);
+            conn.setConnectTimeout(requestTimeoutMs);
             if (conn.getResponseCode() != expectedResponseCode)
                 throw new HttpException(conn.getResponseCode(), conn.getResponseMessage());
 
@@ -84,6 +127,14 @@
         }
     }
 
+    @Override
+    public String toString()
+    {
+        return format("%s{%s=%s,%s=%s}", getClass().getName(),
+                      METADATA_URL_PROPERTY, metadataServiceUrl,
+                      METADATA_REQUEST_TIMEOUT_PROPERTY, requestTimeoutMs);
+    }
+
     public static final class HttpException extends IOException
     {
         public final int responseCode;
@@ -96,4 +147,12 @@
             this.responseMessage = responseMessage;
         }
     }
+
+    public static class DefaultCloudMetadataServiceConnector extends AbstractCloudMetadataServiceConnector
+    {
+        public DefaultCloudMetadataServiceConnector(SnitchProperties properties)
+        {
+            super(properties);
+        }
+    }
 }
diff --git a/src/java/org/apache/cassandra/locator/AbstractCloudMetadataServiceSnitch.java b/src/java/org/apache/cassandra/locator/AbstractCloudMetadataServiceSnitch.java
new file mode 100644
index 0000000..2e34f78
--- /dev/null
+++ b/src/java/org/apache/cassandra/locator/AbstractCloudMetadataServiceSnitch.java
@@ -0,0 +1,108 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.cassandra.locator;
+
+import java.util.Map;
+
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+
+import org.apache.cassandra.db.SystemKeyspace;
+import org.apache.cassandra.gms.ApplicationState;
+import org.apache.cassandra.gms.EndpointState;
+import org.apache.cassandra.gms.Gossiper;
+import org.apache.cassandra.utils.FBUtilities;
+import org.apache.cassandra.utils.Pair;
+
+import static java.lang.String.format;
+
+abstract class AbstractCloudMetadataServiceSnitch extends AbstractNetworkTopologySnitch
+{
+    static final Logger logger = LoggerFactory.getLogger(AbstractCloudMetadataServiceSnitch.class);
+
+    static final String DEFAULT_DC = "UNKNOWN-DC";
+    static final String DEFAULT_RACK = "UNKNOWN-RACK";
+
+    protected final AbstractCloudMetadataServiceConnector connector;
+    protected final SnitchProperties snitchProperties;
+
+    private final String localRack;
+    private final String localDc;
+
+    private Map<InetAddressAndPort, Map<String, String>> savedEndpoints;
+
+    public AbstractCloudMetadataServiceSnitch(AbstractCloudMetadataServiceConnector connector,
+                                              SnitchProperties snitchProperties,
+                                              Pair<String, String> dcAndRack)
+    {
+        this.connector = connector;
+        this.snitchProperties = snitchProperties;
+        this.localDc = dcAndRack.left;
+        this.localRack = dcAndRack.right;
+
+        logger.info(format("%s using datacenter: %s, rack: %s, connector: %s, properties: %s",
+                           getClass().getName(), getLocalDatacenter(), getLocalRack(), connector, snitchProperties));
+    }
+
+    @Override
+    public final String getLocalRack()
+    {
+        return localRack;
+    }
+
+    @Override
+    public final String getLocalDatacenter()
+    {
+        return localDc;
+    }
+
+    @Override
+    public final String getRack(InetAddressAndPort endpoint)
+    {
+        if (endpoint.equals(FBUtilities.getBroadcastAddressAndPort()))
+            return getLocalRack();
+        EndpointState state = Gossiper.instance.getEndpointStateForEndpoint(endpoint);
+        if (state == null || state.getApplicationState(ApplicationState.RACK) == null)
+        {
+            if (savedEndpoints == null)
+                savedEndpoints = SystemKeyspace.loadDcRackInfo();
+            if (savedEndpoints.containsKey(endpoint))
+                return savedEndpoints.get(endpoint).get("rack");
+            return DEFAULT_RACK;
+        }
+        return state.getApplicationState(ApplicationState.RACK).value;
+    }
+
+    @Override
+    public final String getDatacenter(InetAddressAndPort endpoint)
+    {
+        if (endpoint.equals(FBUtilities.getBroadcastAddressAndPort()))
+            return getLocalDatacenter();
+        EndpointState state = Gossiper.instance.getEndpointStateForEndpoint(endpoint);
+        if (state == null || state.getApplicationState(ApplicationState.DC) == null)
+        {
+            if (savedEndpoints == null)
+                savedEndpoints = SystemKeyspace.loadDcRackInfo();
+            if (savedEndpoints.containsKey(endpoint))
+                return savedEndpoints.get(endpoint).get("data_center");
+            return DEFAULT_DC;
+        }
+        return state.getApplicationState(ApplicationState.DC).value;
+    }
+}
diff --git a/src/java/org/apache/cassandra/locator/AlibabaCloudSnitch.java b/src/java/org/apache/cassandra/locator/AlibabaCloudSnitch.java
index 729e1b3..2c76bf5 100644
--- a/src/java/org/apache/cassandra/locator/AlibabaCloudSnitch.java
+++ b/src/java/org/apache/cassandra/locator/AlibabaCloudSnitch.java
@@ -17,130 +17,41 @@
  */
 package org.apache.cassandra.locator;
 
-import java.io.DataInputStream;
-import java.io.FilterInputStream;
 import java.io.IOException;
-import java.net.HttpURLConnection;
-import java.net.MalformedURLException;
-import java.net.SocketTimeoutException;
-import java.net.URL;
-import java.nio.charset.StandardCharsets;
-import java.util.Map;
-import org.apache.cassandra.db.SystemKeyspace;
-import org.apache.cassandra.exceptions.ConfigurationException;
-import org.apache.cassandra.gms.ApplicationState;
-import org.apache.cassandra.gms.EndpointState;
-import org.apache.cassandra.gms.Gossiper;
-import org.apache.cassandra.io.util.FileUtils;
-import org.apache.cassandra.utils.FBUtilities;
-import org.slf4j.Logger;
-import org.slf4j.LoggerFactory;
+
+import com.google.common.collect.ImmutableMap;
+
+import org.apache.cassandra.locator.AbstractCloudMetadataServiceConnector.DefaultCloudMetadataServiceConnector;
+
+import static org.apache.cassandra.locator.AbstractCloudMetadataServiceConnector.METADATA_URL_PROPERTY;
 
 /**
- *  A snitch that assumes an ECS region is a DC and an ECS availability_zone
- *  is a rack. This information is available in the config for the node. the 
- *  format of the zone-id is like :cn-hangzhou-a where cn means china, hangzhou
- *  means the hangzhou region, a means the az id. We use cn-hangzhou as the dc,
- *  and f as the zone-id.
+ * A snitch that assumes an ECS region is a DC and an ECS availability_zone
+ * is a rack. This information is available in the config for the node. the
+ * format of the zone-id is like 'cn-hangzhou-a' where cn means china, hangzhou
+ * means the hangzhou region, a means the az id. We use 'cn-hangzhou' as the dc,
+ * and 'a' as the zone-id.
  */
-public class AlibabaCloudSnitch extends AbstractNetworkTopologySnitch
+public class AlibabaCloudSnitch extends AbstractCloudMetadataServiceSnitch
 {
-    protected static final Logger logger = LoggerFactory.getLogger(AlibabaCloudSnitch.class);
-    protected static final String ZONE_NAME_QUERY_URL = "http://100.100.100.200/latest/meta-data/zone-id";
-    private static final String DEFAULT_DC = "UNKNOWN-DC";
-    private static final String DEFAULT_RACK = "UNKNOWN-RACK";
-    private Map<InetAddressAndPort, Map<String, String>> savedEndpoints; 
-    protected String ecsZone;
-    protected String ecsRegion;
-    
-    private static final int HTTP_CONNECT_TIMEOUT = 30000;
-    
-    
-    public AlibabaCloudSnitch() throws MalformedURLException, IOException 
+    static final String DEFAULT_METADATA_SERVICE_URL = "http://100.100.100.200";
+    static final String ZONE_NAME_QUERY_URL = "/latest/meta-data/zone-id";
+
+    public AlibabaCloudSnitch() throws IOException
     {
-        String response = alibabaApiCall(ZONE_NAME_QUERY_URL);
-        String[] splits = response.split("/");
-        String az = splits[splits.length - 1];
-
-        // Split "us-central1-a" or "asia-east1-a" into "us-central1"/"a" and "asia-east1"/"a".
-        splits = az.split("-");
-        ecsZone = splits[splits.length - 1];
-
-        int lastRegionIndex = az.lastIndexOf("-");
-        ecsRegion = az.substring(0, lastRegionIndex);
-
-        String datacenterSuffix = (new SnitchProperties()).get("dc_suffix", "");
-        ecsRegion = ecsRegion.concat(datacenterSuffix);
-        logger.info("AlibabaSnitch using region: {}, zone: {}.", ecsRegion, ecsZone);
-    
-    }
-    
-    String alibabaApiCall(String url) throws ConfigurationException, IOException, SocketTimeoutException
-    {
-        // Populate the region and zone by introspection, fail if 404 on metadata
-        HttpURLConnection conn = (HttpURLConnection) new URL(url).openConnection();
-        DataInputStream d = null;
-        try
-        {
-            conn.setConnectTimeout(HTTP_CONNECT_TIMEOUT);
-            conn.setRequestMethod("GET");
-            
-            int code = conn.getResponseCode();
-            if (code != HttpURLConnection.HTTP_OK)
-                throw new ConfigurationException("AlibabaSnitch was unable to execute the API call. Not an ecs node? and the returun code is " + code);
-
-            // Read the information. I wish I could say (String) conn.getContent() here...
-            int cl = conn.getContentLength();
-            byte[] b = new byte[cl];
-            d = new DataInputStream((FilterInputStream) conn.getContent());
-            d.readFully(b);
-            return new String(b, StandardCharsets.UTF_8);
-        }
-        catch (SocketTimeoutException e)
-        {
-            throw new SocketTimeoutException("Timeout occurred reading a response from the Alibaba ECS metadata");
-        }
-        finally
-        {
-            FileUtils.close(d);
-            conn.disconnect();
-        }
-    }
-    
-    @Override
-    public String getRack(InetAddressAndPort endpoint)
-    {
-        if (endpoint.equals(FBUtilities.getBroadcastAddressAndPort()))
-            return ecsZone;
-        EndpointState state = Gossiper.instance.getEndpointStateForEndpoint(endpoint);
-        if (state == null || state.getApplicationState(ApplicationState.RACK) == null)
-        {
-            if (savedEndpoints == null)
-                savedEndpoints = SystemKeyspace.loadDcRackInfo();
-            if (savedEndpoints.containsKey(endpoint))
-                return savedEndpoints.get(endpoint).get("rack");
-            return DEFAULT_RACK;
-        }
-        return state.getApplicationState(ApplicationState.RACK).value;
-    
+        this(new SnitchProperties());
     }
 
-    @Override
-    public String getDatacenter(InetAddressAndPort endpoint) 
+    public AlibabaCloudSnitch(SnitchProperties properties) throws IOException
     {
-        if (endpoint.equals(FBUtilities.getBroadcastAddressAndPort()))
-            return ecsRegion;
-        EndpointState state = Gossiper.instance.getEndpointStateForEndpoint(endpoint);
-        if (state == null || state.getApplicationState(ApplicationState.DC) == null)
-        {
-            if (savedEndpoints == null)
-                savedEndpoints = SystemKeyspace.loadDcRackInfo();
-            if (savedEndpoints.containsKey(endpoint))
-                return savedEndpoints.get(endpoint).get("data_center");
-            return DEFAULT_DC;
-        }
-        return state.getApplicationState(ApplicationState.DC).value;
-    
+        this(properties, new DefaultCloudMetadataServiceConnector(properties.putIfAbsent(METADATA_URL_PROPERTY,
+                                                                                         DEFAULT_METADATA_SERVICE_URL)));
     }
 
+    public AlibabaCloudSnitch(SnitchProperties properties, AbstractCloudMetadataServiceConnector connector) throws IOException
+    {
+        super(connector, properties, SnitchUtils.parseDcAndRack(connector.apiCall(ZONE_NAME_QUERY_URL,
+                                                                                  ImmutableMap.of()),
+                                                                properties.getDcSuffix()));
+    }
 }
diff --git a/src/java/org/apache/cassandra/locator/CloudstackSnitch.java b/src/java/org/apache/cassandra/locator/CloudstackSnitch.java
index d857953..a0c4947 100644
--- a/src/java/org/apache/cassandra/locator/CloudstackSnitch.java
+++ b/src/java/org/apache/cassandra/locator/CloudstackSnitch.java
@@ -17,140 +17,79 @@
  */
 package org.apache.cassandra.locator;
 
-import java.io.DataInputStream;
-import java.io.BufferedInputStream;
 import java.io.BufferedReader;
 import java.io.IOException;
-import java.net.HttpURLConnection;
-import java.net.URL;
 import java.net.URI;
-import java.nio.charset.StandardCharsets;
-import java.util.Map;
-import java.util.regex.Pattern;
+import java.util.Properties;
 import java.util.regex.Matcher;
+import java.util.regex.Pattern;
 
+import org.apache.cassandra.exceptions.ConfigurationException;
 import org.apache.cassandra.io.util.File;
 import org.apache.cassandra.io.util.FileReader;
-import org.slf4j.Logger;
-import org.slf4j.LoggerFactory;
-import org.apache.cassandra.db.SystemKeyspace;
-import org.apache.cassandra.exceptions.ConfigurationException;
-import org.apache.cassandra.gms.ApplicationState;
-import org.apache.cassandra.gms.EndpointState;
-import org.apache.cassandra.gms.Gossiper;
-import org.apache.cassandra.io.util.FileUtils;
-import org.apache.cassandra.utils.FBUtilities;
 import org.apache.cassandra.utils.JVMStabilityInspector;
+import org.apache.cassandra.utils.Pair;
+
+import static org.apache.cassandra.locator.AbstractCloudMetadataServiceConnector.METADATA_URL_PROPERTY;
+import static org.apache.cassandra.locator.CloudstackSnitch.CloudstackConnector.ZONE_NAME_QUERY_URI;
 
 /**
- * {@code
  * A snitch that assumes a Cloudstack Zone follows the typical convention
- * <country>-<location>-<availability zone> and uses the country/location
+ * {@code country-location-availability zone} and uses the country/location
  * tuple as a datacenter and the availability zone as a rack
- * }
+ *
+ * This snitch is deprecated, and it is eligible for the removal in the next major release of Cassandra.
  */
-
-public class CloudstackSnitch extends AbstractNetworkTopologySnitch
+@Deprecated
+public class CloudstackSnitch extends AbstractCloudMetadataServiceSnitch
 {
-    protected static final Logger logger = LoggerFactory.getLogger(CloudstackSnitch.class);
-    protected static final String ZONE_NAME_QUERY_URI = "/latest/meta-data/availability-zone";
-
-    private Map<InetAddressAndPort, Map<String, String>> savedEndpoints;
-
-    private static final String DEFAULT_DC = "UNKNOWN-DC";
-    private static final String DEFAULT_RACK = "UNKNOWN-RACK";
     private static final String[] LEASE_FILES =
     {
-        "file:///var/lib/dhcp/dhclient.eth0.leases",
-        "file:///var/lib/dhclient/dhclient.eth0.leases"
+    "file:///var/lib/dhcp/dhclient.eth0.leases",
+    "file:///var/lib/dhclient/dhclient.eth0.leases"
     };
 
-    protected String csZoneDc;
-    protected String csZoneRack;
-
-    public CloudstackSnitch() throws IOException, ConfigurationException
+    public CloudstackSnitch() throws IOException
     {
-        String endpoint = csMetadataEndpoint();
-        String zone = csQueryMetadata(endpoint + ZONE_NAME_QUERY_URI);
-        String zone_parts[] = zone.split("-");
+        this(new SnitchProperties(new Properties()));
+    }
 
-        if (zone_parts.length != 3)
+    public CloudstackSnitch(SnitchProperties snitchProperties) throws IOException
+    {
+        this(snitchProperties, new CloudstackConnector(snitchProperties.putIfAbsent(METADATA_URL_PROPERTY, csMetadataEndpoint())));
+    }
+
+    public CloudstackSnitch(SnitchProperties properties, AbstractCloudMetadataServiceConnector connector) throws IOException
+    {
+        super(connector, properties, resolveDcAndRack(connector));
+        logger.warn("{} is deprecated and not actively maintained. It will be removed in the next " +
+                    "major version of Cassandra.", CloudstackSnitch.class.getName());
+    }
+
+    static class CloudstackConnector extends AbstractCloudMetadataServiceConnector
+    {
+        static final String ZONE_NAME_QUERY_URI = "/latest/meta-data/availability-zone";
+
+        protected CloudstackConnector(SnitchProperties properties)
         {
+            super(properties);
+        }
+    }
+
+    private static Pair<String, String> resolveDcAndRack(AbstractCloudMetadataServiceConnector connector) throws IOException
+    {
+        String zone = connector.apiCall(ZONE_NAME_QUERY_URI);
+        String[] zoneParts = zone.split("-");
+
+        if (zoneParts.length != 3)
             throw new ConfigurationException("CloudstackSnitch cannot handle invalid zone format: " + zone);
-        }
-        csZoneDc = zone_parts[0] + "-" + zone_parts[1];
-        csZoneRack = zone_parts[2];
+
+        return Pair.create(zoneParts[0] + '-' + zoneParts[1], zoneParts[2]);
     }
 
-    public String getRack(InetAddressAndPort endpoint)
+    private static String csMetadataEndpoint() throws ConfigurationException
     {
-        if (endpoint.equals(FBUtilities.getBroadcastAddressAndPort()))
-            return csZoneRack;
-        EndpointState state = Gossiper.instance.getEndpointStateForEndpoint(endpoint);
-        if (state == null || state.getApplicationState(ApplicationState.RACK) == null)
-        {
-            if (savedEndpoints == null)
-                savedEndpoints = SystemKeyspace.loadDcRackInfo();
-            if (savedEndpoints.containsKey(endpoint))
-                return savedEndpoints.get(endpoint).get("rack");
-            return DEFAULT_RACK;
-        }
-        return state.getApplicationState(ApplicationState.RACK).value;
-    }
-
-    public String getDatacenter(InetAddressAndPort endpoint)
-    {
-        if (endpoint.equals(FBUtilities.getBroadcastAddressAndPort()))
-            return csZoneDc;
-        EndpointState state = Gossiper.instance.getEndpointStateForEndpoint(endpoint);
-        if (state == null || state.getApplicationState(ApplicationState.DC) == null)
-        {
-            if (savedEndpoints == null)
-                savedEndpoints = SystemKeyspace.loadDcRackInfo();
-            if (savedEndpoints.containsKey(endpoint))
-                return savedEndpoints.get(endpoint).get("data_center");
-            return DEFAULT_DC;
-        }
-        return state.getApplicationState(ApplicationState.DC).value;
-    }
-
-    String csQueryMetadata(String url) throws ConfigurationException, IOException
-    {
-        HttpURLConnection conn = null;
-        DataInputStream is = null;
-
-        try
-        {
-            conn = (HttpURLConnection) new URL(url).openConnection();
-        }
-        catch (Exception e)
-        {
-            throw new ConfigurationException("CloudstackSnitch cannot query wrong metadata URL: " + url);
-        }
-        try
-        {
-            conn.setRequestMethod("GET");
-            if (conn.getResponseCode() != 200)
-            {
-                throw new ConfigurationException("CloudstackSnitch was unable to query metadata.");
-            }
-
-            int cl = conn.getContentLength();
-            byte[] b = new byte[cl];
-            is = new DataInputStream(new BufferedInputStream(conn.getInputStream()));
-            is.readFully(b);
-            return new String(b, StandardCharsets.UTF_8);
-        }
-        finally
-        {
-            FileUtils.close(is);
-            conn.disconnect();
-        }
-    }
-
-    String csMetadataEndpoint() throws ConfigurationException
-    {
-        for (String lease_uri: LEASE_FILES)
+        for (String lease_uri : LEASE_FILES)
         {
             try
             {
@@ -163,15 +102,13 @@
             catch (Exception e)
             {
                 JVMStabilityInspector.inspectThrowable(e);
-                continue;
             }
-
         }
 
         throw new ConfigurationException("No valid DHCP lease file could be found.");
     }
 
-    String csEndpointFromLease(File lease) throws ConfigurationException
+    private static String csEndpointFromLease(File lease) throws ConfigurationException
     {
         String line;
         String endpoint = null;
diff --git a/src/java/org/apache/cassandra/locator/Ec2MetadataServiceConnector.java b/src/java/org/apache/cassandra/locator/Ec2MetadataServiceConnector.java
index b5ab2b7..f975ca9 100644
--- a/src/java/org/apache/cassandra/locator/Ec2MetadataServiceConnector.java
+++ b/src/java/org/apache/cassandra/locator/Ec2MetadataServiceConnector.java
@@ -45,7 +45,7 @@
 
     Ec2MetadataServiceConnector(SnitchProperties properties)
     {
-        super(properties.get(EC2_METADATA_URL_PROPERTY, DEFAULT_EC2_METADATA_URL));
+        super(properties.putIfAbsent(METADATA_URL_PROPERTY, properties.get(EC2_METADATA_URL_PROPERTY, DEFAULT_EC2_METADATA_URL)));
     }
 
     enum EC2MetadataType
@@ -94,7 +94,7 @@
         @Override
         public String toString()
         {
-            return String.format("%s{%s=%s}", V1Connector.class.getName(), EC2_METADATA_URL_PROPERTY, metadataServiceUrl);
+            return String.format("%s{%s=%s}", V1Connector.class.getName(), METADATA_URL_PROPERTY, metadataServiceUrl);
         }
     }
 
@@ -184,7 +184,7 @@
         {
             return String.format("%s{%s=%s,%s=%s}",
                                  V2Connector.class.getName(),
-                                 EC2_METADATA_URL_PROPERTY, metadataServiceUrl,
+                                 METADATA_URL_PROPERTY, metadataServiceUrl,
                                  AWS_EC2_METADATA_TOKEN_TTL_SECONDS_HEADER_PROPERTY,
                                  tokenTTL.getSeconds());
         }
diff --git a/src/java/org/apache/cassandra/locator/Ec2MultiRegionSnitch.java b/src/java/org/apache/cassandra/locator/Ec2MultiRegionSnitch.java
index 1ddb2b8..b4dcfca 100644
--- a/src/java/org/apache/cassandra/locator/Ec2MultiRegionSnitch.java
+++ b/src/java/org/apache/cassandra/locator/Ec2MultiRegionSnitch.java
@@ -58,7 +58,7 @@
         this(props, Ec2MetadataServiceConnector.create(props));
     }
 
-    Ec2MultiRegionSnitch(SnitchProperties props, Ec2MetadataServiceConnector connector) throws IOException
+    Ec2MultiRegionSnitch(SnitchProperties props, AbstractCloudMetadataServiceConnector connector) throws IOException
     {
         super(props, connector);
         InetAddress localPublicAddress = InetAddress.getByName(connector.apiCall(PUBLIC_IP_QUERY));
@@ -88,6 +88,6 @@
         }
         Gossiper.instance.addLocalApplicationState(ApplicationState.INTERNAL_ADDRESS_AND_PORT, StorageService.instance.valueFactory.internalAddressAndPort(address));
         Gossiper.instance.addLocalApplicationState(ApplicationState.INTERNAL_IP, StorageService.instance.valueFactory.internalIP(address.getAddress()));
-        Gossiper.instance.register(new ReconnectableSnitchHelper(this, ec2region, true));
+        Gossiper.instance.register(new ReconnectableSnitchHelper(this, getLocalDatacenter(), true));
     }
 }
diff --git a/src/java/org/apache/cassandra/locator/Ec2Snitch.java b/src/java/org/apache/cassandra/locator/Ec2Snitch.java
index bf72a5f..056a265 100644
--- a/src/java/org/apache/cassandra/locator/Ec2Snitch.java
+++ b/src/java/org/apache/cassandra/locator/Ec2Snitch.java
@@ -18,19 +18,12 @@
 package org.apache.cassandra.locator;
 
 import java.io.IOException;
-import java.util.Map;
 import java.util.Set;
 
 import com.google.common.annotations.VisibleForTesting;
-import org.slf4j.Logger;
-import org.slf4j.LoggerFactory;
 
-import org.apache.cassandra.db.SystemKeyspace;
 import org.apache.cassandra.exceptions.ConfigurationException;
-import org.apache.cassandra.gms.ApplicationState;
-import org.apache.cassandra.gms.EndpointState;
-import org.apache.cassandra.gms.Gossiper;
-import org.apache.cassandra.utils.FBUtilities;
+import org.apache.cassandra.utils.Pair;
 
 /**
  * A snitch that assumes an EC2 region is a DC and an EC2 availability_zone
@@ -52,10 +45,8 @@
  * which is by default set to {@link Ec2MetadataServiceConnector.V2Connector#MAX_TOKEN_TIME_IN_SECONDS}. TTL has
  * to be an integer from the range [30, 21600].
  */
-public class Ec2Snitch extends AbstractNetworkTopologySnitch
+public class Ec2Snitch extends AbstractCloudMetadataServiceSnitch
 {
-    protected static final Logger logger = LoggerFactory.getLogger(Ec2Snitch.class);
-
     private static final String SNITCH_PROP_NAMING_SCHEME = "ec2_naming_scheme";
     static final String EC2_NAMING_LEGACY = "legacy";
     private static final String EC2_NAMING_STANDARD = "standard";
@@ -63,17 +54,8 @@
     @VisibleForTesting
     public static final String ZONE_NAME_QUERY = "/latest/meta-data/placement/availability-zone";
 
-    private static final String DEFAULT_DC = "UNKNOWN-DC";
-    private static final String DEFAULT_RACK = "UNKNOWN-RACK";
-
-    final String ec2region;
-    private final String ec2zone;
     private final boolean usingLegacyNaming;
 
-    private Map<InetAddressAndPort, Map<String, String>> savedEndpoints;
-
-    protected final Ec2MetadataServiceConnector connector;
-
     public Ec2Snitch() throws IOException, ConfigurationException
     {
         this(new SnitchProperties());
@@ -84,20 +66,27 @@
         this(props, Ec2MetadataServiceConnector.create(props));
     }
 
-    Ec2Snitch(SnitchProperties props, Ec2MetadataServiceConnector connector) throws IOException
+    Ec2Snitch(SnitchProperties props, AbstractCloudMetadataServiceConnector connector) throws IOException
     {
-        this.connector = connector;
+        super(connector, props, getDcAndRack(props, connector));
+        usingLegacyNaming = isUsingLegacyNaming(props);
+    }
+
+    private static Pair<String, String> getDcAndRack(SnitchProperties props, AbstractCloudMetadataServiceConnector connector) throws IOException
+    {
         String az = connector.apiCall(ZONE_NAME_QUERY);
 
         // if using the full naming scheme, region name is created by removing letters from the
         // end of the availability zone and zone is the full zone name
-        usingLegacyNaming = isUsingLegacyNaming(props);
+        boolean usingLegacyNaming = isUsingLegacyNaming(props);
         String region;
+        String localDc;
+        String localRack;
         if (usingLegacyNaming)
         {
             // Split "us-east-1a" or "asia-1a" into "us-east"/"1a" and "asia"/"1a".
             String[] splits = az.split("-");
-            ec2zone = splits[splits.length - 1];
+            localRack = splits[splits.length - 1];
 
             // hack for CASSANDRA-4026
             region = az.substring(0, az.length() - 1);
@@ -109,12 +98,12 @@
             // grab the region name, which is embedded in the availability zone name.
             // thus an AZ of "us-east-1a" yields the region name "us-east-1"
             region = az.replaceFirst("[a-z]+$","");
-            ec2zone = az;
+            localRack = az;
         }
 
-        String datacenterSuffix = props.get("dc_suffix", "");
-        ec2region = region.concat(datacenterSuffix);
-        logger.info("EC2Snitch using region: {}, zone: {}, properties: {}", ec2region, ec2zone, connector);
+        localDc = region.concat(props.getDcSuffix());
+
+        return Pair.create(localDc, localRack);
     }
 
     private static boolean isUsingLegacyNaming(SnitchProperties props)
@@ -122,38 +111,6 @@
         return props.get(SNITCH_PROP_NAMING_SCHEME, EC2_NAMING_STANDARD).equalsIgnoreCase(EC2_NAMING_LEGACY);
     }
 
-    public String getRack(InetAddressAndPort endpoint)
-    {
-        if (endpoint.equals(FBUtilities.getBroadcastAddressAndPort()))
-            return ec2zone;
-        EndpointState state = Gossiper.instance.getEndpointStateForEndpoint(endpoint);
-        if (state == null || state.getApplicationState(ApplicationState.RACK) == null)
-        {
-            if (savedEndpoints == null)
-                savedEndpoints = SystemKeyspace.loadDcRackInfo();
-            if (savedEndpoints.containsKey(endpoint))
-                return savedEndpoints.get(endpoint).get("rack");
-            return DEFAULT_RACK;
-        }
-        return state.getApplicationState(ApplicationState.RACK).value;
-    }
-
-    public String getDatacenter(InetAddressAndPort endpoint)
-    {
-        if (endpoint.equals(FBUtilities.getBroadcastAddressAndPort()))
-            return ec2region;
-        EndpointState state = Gossiper.instance.getEndpointStateForEndpoint(endpoint);
-        if (state == null || state.getApplicationState(ApplicationState.DC) == null)
-        {
-            if (savedEndpoints == null)
-                savedEndpoints = SystemKeyspace.loadDcRackInfo();
-            if (savedEndpoints.containsKey(endpoint))
-                return savedEndpoints.get(endpoint).get("data_center");
-            return DEFAULT_DC;
-        }
-        return state.getApplicationState(ApplicationState.DC).value;
-    }
-
     @Override
     public boolean validate(Set<String> datacenters, Set<String> racks)
     {
@@ -179,7 +136,10 @@
             // We can still identify as legacy the dc names without a number as a suffix like us-east"
             boolean dcUsesLegacyFormat = dc.matches("^[a-z]+-[a-z]+$");
             if (dcUsesLegacyFormat && !usingLegacyNaming)
+            {
                 valid = false;
+                break;
+            }
         }
 
         for (String rack : racks)
@@ -190,7 +150,10 @@
             // NOTE: the allowed custom suffix only applies to datacenter (region) names, not availability zones.
             boolean rackUsesLegacyFormat = rack.matches("[\\d][a-z]");
             if (rackUsesLegacyFormat != usingLegacyNaming)
+            {
                 valid = false;
+                break;
+            }
         }
 
         if (!valid)
diff --git a/src/java/org/apache/cassandra/locator/GoogleCloudSnitch.java b/src/java/org/apache/cassandra/locator/GoogleCloudSnitch.java
index 1e1c500..4472631 100644
--- a/src/java/org/apache/cassandra/locator/GoogleCloudSnitch.java
+++ b/src/java/org/apache/cassandra/locator/GoogleCloudSnitch.java
@@ -17,111 +17,38 @@
  */
 package org.apache.cassandra.locator;
 
-import java.io.DataInputStream;
-import java.io.FilterInputStream;
 import java.io.IOException;
-import java.net.HttpURLConnection;
-import java.net.URL;
-import java.nio.charset.StandardCharsets;
-import java.util.Map;
 
-import org.slf4j.Logger;
-import org.slf4j.LoggerFactory;
-import org.apache.cassandra.db.SystemKeyspace;
-import org.apache.cassandra.exceptions.ConfigurationException;
-import org.apache.cassandra.gms.ApplicationState;
-import org.apache.cassandra.gms.EndpointState;
-import org.apache.cassandra.gms.Gossiper;
-import org.apache.cassandra.io.util.FileUtils;
-import org.apache.cassandra.utils.FBUtilities;
+import com.google.common.collect.ImmutableMap;
+
+import org.apache.cassandra.locator.AbstractCloudMetadataServiceConnector.DefaultCloudMetadataServiceConnector;
+
+import static org.apache.cassandra.locator.AbstractCloudMetadataServiceConnector.METADATA_URL_PROPERTY;
 
 /**
  * A snitch that assumes an GCE region is a DC and an GCE availability_zone
- *  is a rack. This information is available in the config for the node.
+ * is a rack. This information is available in the config for the node.
  */
-public class GoogleCloudSnitch extends AbstractNetworkTopologySnitch
+public class GoogleCloudSnitch extends AbstractCloudMetadataServiceSnitch
 {
-    protected static final Logger logger = LoggerFactory.getLogger(GoogleCloudSnitch.class);
-    protected static final String ZONE_NAME_QUERY_URL = "http://metadata.google.internal/computeMetadata/v1/instance/zone";
-    private static final String DEFAULT_DC = "UNKNOWN-DC";
-    private static final String DEFAULT_RACK = "UNKNOWN-RACK";
-    private Map<InetAddressAndPort, Map<String, String>> savedEndpoints;
-    protected String gceZone;
-    protected String gceRegion;
+    static final String DEFAULT_METADATA_SERVICE_URL = "http://metadata.google.internal";
+    static final String ZONE_NAME_QUERY_URL = "/computeMetadata/v1/instance/zone";
 
-    public GoogleCloudSnitch() throws IOException, ConfigurationException
+    public GoogleCloudSnitch() throws IOException
     {
-        String response = gceApiCall(ZONE_NAME_QUERY_URL);
-        String[] splits = response.split("/");
-        String az = splits[splits.length - 1];
-
-        // Split "us-central1-a" or "asia-east1-a" into "us-central1"/"a" and "asia-east1"/"a".
-        splits = az.split("-");
-        gceZone = splits[splits.length - 1];
-
-        int lastRegionIndex = az.lastIndexOf("-");
-        gceRegion = az.substring(0, lastRegionIndex);
-
-        String datacenterSuffix = (new SnitchProperties()).get("dc_suffix", "");
-        gceRegion = gceRegion.concat(datacenterSuffix);
-        logger.info("GCESnitch using region: {}, zone: {}.", gceRegion, gceZone);
+        this(new SnitchProperties());
     }
 
-    String gceApiCall(String url) throws IOException, ConfigurationException
+    public GoogleCloudSnitch(SnitchProperties properties) throws IOException
     {
-        // Populate the region and zone by introspection, fail if 404 on metadata
-        HttpURLConnection conn = (HttpURLConnection) new URL(url).openConnection();
-        DataInputStream d = null;
-        try
-        {
-            conn.setRequestMethod("GET");
-	    conn.setRequestProperty("Metadata-Flavor", "Google");
-            if (conn.getResponseCode() != 200)
-                throw new ConfigurationException("GoogleCloudSnitch was unable to execute the API call. Not a gce node?");
-
-            // Read the information.
-            int cl = conn.getContentLength();
-            byte[] b = new byte[cl];
-            d = new DataInputStream((FilterInputStream) conn.getContent());
-            d.readFully(b);
-            return new String(b, StandardCharsets.UTF_8);
-        }
-        finally
-        {
-            FileUtils.close(d);
-            conn.disconnect();
-        }
+        this(properties, new DefaultCloudMetadataServiceConnector(properties.putIfAbsent(METADATA_URL_PROPERTY,
+                                                                                         DEFAULT_METADATA_SERVICE_URL)));
     }
 
-    public String getRack(InetAddressAndPort endpoint)
+    public GoogleCloudSnitch(SnitchProperties properties, AbstractCloudMetadataServiceConnector connector) throws IOException
     {
-        if (endpoint.equals(FBUtilities.getBroadcastAddressAndPort()))
-            return gceZone;
-        EndpointState state = Gossiper.instance.getEndpointStateForEndpoint(endpoint);
-        if (state == null || state.getApplicationState(ApplicationState.RACK) == null)
-        {
-            if (savedEndpoints == null)
-                savedEndpoints = SystemKeyspace.loadDcRackInfo();
-            if (savedEndpoints.containsKey(endpoint))
-                return savedEndpoints.get(endpoint).get("rack");
-            return DEFAULT_RACK;
-        }
-        return state.getApplicationState(ApplicationState.RACK).value;
-    }
-
-    public String getDatacenter(InetAddressAndPort endpoint)
-    {
-        if (endpoint.equals(FBUtilities.getBroadcastAddressAndPort()))
-            return gceRegion;
-        EndpointState state = Gossiper.instance.getEndpointStateForEndpoint(endpoint);
-        if (state == null || state.getApplicationState(ApplicationState.DC) == null)
-        {
-            if (savedEndpoints == null)
-                savedEndpoints = SystemKeyspace.loadDcRackInfo();
-            if (savedEndpoints.containsKey(endpoint))
-                return savedEndpoints.get(endpoint).get("data_center");
-            return DEFAULT_DC;
-        }
-        return state.getApplicationState(ApplicationState.DC).value;
+        super(connector, properties, SnitchUtils.parseDcAndRack(connector.apiCall(ZONE_NAME_QUERY_URL,
+                                                                                  ImmutableMap.of("Metadata-Flavor", "Google")),
+                                                                properties.getDcSuffix()));
     }
 }
diff --git a/src/java/org/apache/cassandra/locator/SnitchProperties.java b/src/java/org/apache/cassandra/locator/SnitchProperties.java
index efd2930..1745b79 100644
--- a/src/java/org/apache/cassandra/locator/SnitchProperties.java
+++ b/src/java/org/apache/cassandra/locator/SnitchProperties.java
@@ -24,6 +24,8 @@
 import com.google.common.annotations.VisibleForTesting;
 
 import org.apache.cassandra.io.util.FileUtils;
+import org.apache.cassandra.utils.Pair;
+
 import org.slf4j.Logger;
 import org.slf4j.LoggerFactory;
 
@@ -69,6 +71,14 @@
         this.properties = properties;
     }
 
+    @SafeVarargs
+    public SnitchProperties(Pair<String, String>... pairs)
+    {
+        properties = new Properties();
+        for (Pair<String, String> pair : pairs)
+            properties.setProperty(pair.left, pair.right);
+    }
+
     /**
      * Get a snitch property value or return defaultValue if not defined.
      */
@@ -77,8 +87,46 @@
         return properties.getProperty(propertyName, defaultValue);
     }
 
+    public SnitchProperties add(String key, String value)
+    {
+        properties.put(key, value);
+        return this;
+    }
+
+    /**
+     * Returns this instance of snitch properties if key is present
+     * otherwise create new instance of properties and put key with a give value into it
+     *
+     * @param key key to add
+     * @param value value to add
+     * @return same properties if key is present or new object with added key and value if not
+     */
+    public SnitchProperties putIfAbsent(String key, String value)
+    {
+        if (contains(key))
+            return this;
+
+        Properties p = new Properties();
+        p.putAll(this.properties);
+        p.put(key, value);
+        return new SnitchProperties(p);
+    }
+
     public boolean contains(String propertyName)
     {
         return properties.containsKey(propertyName);
     }
+
+    public String getDcSuffix()
+    {
+        return properties.getProperty("dc_suffix", "");
+    }
+
+    @Override
+    public String toString()
+    {
+        return "SnitchProperties{" +
+               "properties=" + (properties != null ? properties.toString() : "null") +
+               '}';
+    }
 }
diff --git a/src/java/org/apache/cassandra/locator/SnitchUtils.java b/src/java/org/apache/cassandra/locator/SnitchUtils.java
new file mode 100644
index 0000000..65c1f28
--- /dev/null
+++ b/src/java/org/apache/cassandra/locator/SnitchUtils.java
@@ -0,0 +1,50 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.cassandra.locator;
+
+import org.apache.cassandra.utils.Pair;
+
+import static java.lang.String.format;
+
+public class SnitchUtils
+{
+    private SnitchUtils() {}
+
+    static Pair<String, String> parseDcAndRack(String response, String dcSuffix)
+    {
+        String[] splits = response.split("/");
+        String az = splits[splits.length - 1];
+
+        splits = az.split("-");
+        String localRack = splits[splits.length - 1];
+
+        int lastRegionIndex = az.lastIndexOf('-');
+
+        // we would hit StringIndexOutOfBoundsException on the az.substring method if we did not do this
+        if (lastRegionIndex == -1)
+            throw new IllegalStateException(format("%s does not contain at least one '-' to differentiate " +
+                                                   "between datacenter and rack", response));
+
+        String localDc = az.substring(0, lastRegionIndex);
+
+        localDc = localDc.concat(dcSuffix);
+
+        return Pair.create(localDc, localRack);
+    }
+}
diff --git a/test/unit/org/apache/cassandra/locator/AlibabaCloudSnitchTest.java b/test/unit/org/apache/cassandra/locator/AlibabaCloudSnitchTest.java
index 82108f2..f733009 100644
--- a/test/unit/org/apache/cassandra/locator/AlibabaCloudSnitchTest.java
+++ b/test/unit/org/apache/cassandra/locator/AlibabaCloudSnitchTest.java
@@ -20,6 +20,7 @@
 import java.io.IOException;
 import java.util.EnumMap;
 import java.util.Map;
+import java.util.Properties;
 
 import org.junit.AfterClass;
 import org.junit.BeforeClass;
@@ -38,10 +39,14 @@
 import static org.apache.cassandra.ServerTestUtils.mkdirs;
 import static org.apache.cassandra.config.CassandraRelevantProperties.GOSSIP_DISABLE_THREAD_VALIDATION;
 import static org.junit.Assert.assertEquals;
+import static org.mockito.ArgumentMatchers.any;
+import static org.mockito.ArgumentMatchers.anyMap;
+import static org.mockito.Mockito.mock;
+import static org.mockito.Mockito.when;
 
-public class AlibabaCloudSnitchTest 
+public class AlibabaCloudSnitchTest
 {
-    private static String az;
+    static String az;
 
     @BeforeClass
     public static void setup() throws Exception
@@ -56,25 +61,15 @@
         StorageService.instance.initServer(0);
     }
 
-    private class TestAlibabaCloudSnitch extends AlibabaCloudSnitch
-    {
-        public TestAlibabaCloudSnitch() throws IOException, ConfigurationException
-        {
-            super();
-        }
-
-        @Override
-        String alibabaApiCall(String url) throws IOException, ConfigurationException
-        {
-            return az;
-        }
-    }
-
     @Test
     public void testRac() throws IOException, ConfigurationException
     {
         az = "cn-hangzhou-f";
-        AlibabaCloudSnitch snitch = new TestAlibabaCloudSnitch();
+
+        AbstractCloudMetadataServiceConnector mock = mock(AbstractCloudMetadataServiceConnector.class);
+        when(mock.apiCall(any(), anyMap())).thenReturn(az);
+
+        AlibabaCloudSnitch snitch = new AlibabaCloudSnitch(new SnitchProperties(new Properties()), mock);
         InetAddressAndPort local = InetAddressAndPort.getByName("127.0.0.1");
         InetAddressAndPort nonlocal = InetAddressAndPort.getByName("127.0.0.7");
 
@@ -90,12 +85,16 @@
         assertEquals("cn-hangzhou", snitch.getDatacenter(local));
         assertEquals("f", snitch.getRack(local));
     }
-    
+
     @Test
     public void testNewRegions() throws IOException, ConfigurationException
     {
         az = "us-east-1a";
-        AlibabaCloudSnitch snitch = new TestAlibabaCloudSnitch();
+
+        AbstractCloudMetadataServiceConnector mock = mock(AbstractCloudMetadataServiceConnector.class);
+        when(mock.apiCall(any(), anyMap())).thenReturn(az);
+
+        AlibabaCloudSnitch snitch = new AlibabaCloudSnitch(new SnitchProperties(new Properties()), mock);
         InetAddressAndPort local = InetAddressAndPort.getByName("127.0.0.1");
         assertEquals("us-east", snitch.getDatacenter(local));
         assertEquals("1a", snitch.getRack(local));
@@ -106,5 +105,4 @@
     {
         StorageService.instance.stopClient();
     }
-    
 }
diff --git a/test/unit/org/apache/cassandra/locator/CloudstackSnitchTest.java b/test/unit/org/apache/cassandra/locator/CloudstackSnitchTest.java
index e167e99..5b5b569 100644
--- a/test/unit/org/apache/cassandra/locator/CloudstackSnitchTest.java
+++ b/test/unit/org/apache/cassandra/locator/CloudstackSnitchTest.java
@@ -21,6 +21,7 @@
 import java.io.IOException;
 import java.util.EnumMap;
 import java.util.Map;
+import java.util.Properties;
 
 import org.junit.AfterClass;
 import org.junit.BeforeClass;
@@ -39,6 +40,9 @@
 import static org.apache.cassandra.ServerTestUtils.mkdirs;
 import static org.apache.cassandra.config.CassandraRelevantProperties.GOSSIP_DISABLE_THREAD_VALIDATION;
 import static org.junit.Assert.assertEquals;
+import static org.mockito.ArgumentMatchers.any;
+import static org.mockito.Mockito.mock;
+import static org.mockito.Mockito.when;
 
 public class CloudstackSnitchTest
 {
@@ -57,31 +61,14 @@
         StorageService.instance.initServer(0);
     }
 
-    private class TestCloudstackSnitch extends CloudstackSnitch
-    {
-        public TestCloudstackSnitch() throws IOException, ConfigurationException
-        {
-            super();
-        }
-
-        @Override
-        String csMetadataEndpoint() throws ConfigurationException
-        {
-            return "";
-        }
-
-        @Override
-        String csQueryMetadata(String endpoint) throws IOException, ConfigurationException
-        {
-            return az;
-        }
-    }
-
     @Test
     public void testRacks() throws IOException, ConfigurationException
     {
         az = "ch-gva-1";
-        CloudstackSnitch snitch = new TestCloudstackSnitch();
+        CloudstackSnitch.CloudstackConnector mock = mock(CloudstackSnitch.CloudstackConnector.class);
+        when(mock.apiCall(any())).thenReturn(az);
+
+        CloudstackSnitch snitch = new CloudstackSnitch(new SnitchProperties(new Properties()), mock);
         InetAddressAndPort local = InetAddressAndPort.getByName("127.0.0.1");
         InetAddressAndPort nonlocal = InetAddressAndPort.getByName("127.0.0.7");
 
@@ -96,14 +83,16 @@
 
         assertEquals("ch-gva", snitch.getDatacenter(local));
         assertEquals("1", snitch.getRack(local));
-
     }
 
     @Test
     public void testNewRegions() throws IOException, ConfigurationException
     {
         az = "ch-gva-1";
-        CloudstackSnitch snitch = new TestCloudstackSnitch();
+        CloudstackSnitch.CloudstackConnector mock = mock(CloudstackSnitch.CloudstackConnector.class);
+        when(mock.apiCall(any())).thenReturn(az);
+        CloudstackSnitch snitch = new CloudstackSnitch(new SnitchProperties(new Properties()), mock);
+
         InetAddressAndPort local = InetAddressAndPort.getByName("127.0.0.1");
 
         assertEquals("ch-gva", snitch.getDatacenter(local));
diff --git a/test/unit/org/apache/cassandra/locator/DefaultCloudMetadataServiceConnectorTest.java b/test/unit/org/apache/cassandra/locator/DefaultCloudMetadataServiceConnectorTest.java
new file mode 100644
index 0000000..9f450f9
--- /dev/null
+++ b/test/unit/org/apache/cassandra/locator/DefaultCloudMetadataServiceConnectorTest.java
@@ -0,0 +1,63 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.cassandra.locator;
+
+import org.junit.Test;
+
+import org.apache.cassandra.config.DurationSpec;
+import org.apache.cassandra.exceptions.ConfigurationException;
+import org.apache.cassandra.locator.AbstractCloudMetadataServiceConnector.DefaultCloudMetadataServiceConnector;
+import org.apache.cassandra.utils.Pair;
+
+import static org.apache.cassandra.locator.AbstractCloudMetadataServiceConnector.DEFAULT_METADATA_REQUEST_TIMEOUT;
+import static org.apache.cassandra.locator.AbstractCloudMetadataServiceConnector.METADATA_REQUEST_TIMEOUT_PROPERTY;
+import static org.apache.cassandra.locator.AbstractCloudMetadataServiceConnector.METADATA_URL_PROPERTY;
+import static org.assertj.core.api.Assertions.assertThatExceptionOfType;
+import static org.junit.Assert.assertEquals;
+
+public class DefaultCloudMetadataServiceConnectorTest
+{
+    @Test
+    public void testDefaultConnector()
+    {
+        assertEquals(new DurationSpec.IntMillisecondsBound(DEFAULT_METADATA_REQUEST_TIMEOUT).toMilliseconds(),
+                     new DefaultCloudMetadataServiceConnector(new SnitchProperties(Pair.create(METADATA_REQUEST_TIMEOUT_PROPERTY, DEFAULT_METADATA_REQUEST_TIMEOUT),
+                                                                                   Pair.create(METADATA_URL_PROPERTY, "http://127.0.0.1/abc"))).requestTimeoutMs);
+
+        assertEquals(0, new DefaultCloudMetadataServiceConnector(new SnitchProperties(Pair.create(METADATA_REQUEST_TIMEOUT_PROPERTY, "0s"), Pair.create(METADATA_URL_PROPERTY, "http://127.0.0.1/abc"))).requestTimeoutMs);
+        assertEquals(30000, new DefaultCloudMetadataServiceConnector(new SnitchProperties(Pair.create(METADATA_URL_PROPERTY, "http://127.0.0.1/abc"))).requestTimeoutMs);
+    }
+
+    @Test
+    public void testInvalidMetadataURL()
+    {
+        assertThatExceptionOfType(ConfigurationException.class)
+        .isThrownBy(() -> new DefaultCloudMetadataServiceConnector(new SnitchProperties(Pair.create(METADATA_URL_PROPERTY, "http:"))))
+        .withMessage("Snitch metadata service URL 'http:' is invalid. Please review snitch properties defined in the configured 'cassandra-rackdc.properties' configuration file.");
+    }
+
+    @Test
+    public void testInvalidTimeout()
+    {
+        assertThatExceptionOfType(ConfigurationException.class)
+        .isThrownBy(() -> new DefaultCloudMetadataServiceConnector(new SnitchProperties(Pair.create(METADATA_REQUEST_TIMEOUT_PROPERTY, "1x"),
+                                                                                        Pair.create(METADATA_URL_PROPERTY, "http://127.0.0.1/abc"))))
+        .withMessage("1x as value of metadata_request_timeout is invalid duration! Invalid duration: 1x Accepted units:[MILLISECONDS, SECONDS, MINUTES, HOURS, DAYS] where case matters and only non-negative values.");
+    }
+}
diff --git a/test/unit/org/apache/cassandra/locator/GoogleCloudSnitchTest.java b/test/unit/org/apache/cassandra/locator/GoogleCloudSnitchTest.java
index 1fdfd1c..63270f4 100644
--- a/test/unit/org/apache/cassandra/locator/GoogleCloudSnitchTest.java
+++ b/test/unit/org/apache/cassandra/locator/GoogleCloudSnitchTest.java
@@ -22,6 +22,7 @@
 import java.io.IOException;
 import java.util.EnumMap;
 import java.util.Map;
+import java.util.Properties;
 
 import org.junit.AfterClass;
 import org.junit.BeforeClass;
@@ -40,11 +41,13 @@
 import static org.apache.cassandra.ServerTestUtils.mkdirs;
 import static org.apache.cassandra.config.CassandraRelevantProperties.GOSSIP_DISABLE_THREAD_VALIDATION;
 import static org.junit.Assert.assertEquals;
+import static org.mockito.ArgumentMatchers.any;
+import static org.mockito.ArgumentMatchers.anyMap;
+import static org.mockito.Mockito.mock;
+import static org.mockito.Mockito.when;
 
 public class GoogleCloudSnitchTest
 {
-    private static String az;
-
     @BeforeClass
     public static void setup() throws Exception
     {
@@ -58,25 +61,15 @@
         StorageService.instance.initServer(0);
     }
 
-    private class TestGoogleCloudSnitch extends GoogleCloudSnitch
-    {
-        public TestGoogleCloudSnitch() throws IOException, ConfigurationException
-        {
-            super();
-        }
-
-        @Override
-        String gceApiCall(String url) throws IOException, ConfigurationException
-        {
-            return az;
-        }
-    }
-
     @Test
     public void testRac() throws IOException, ConfigurationException
     {
-        az = "us-central1-a";
-        GoogleCloudSnitch snitch = new TestGoogleCloudSnitch();
+        String az = "us-central1-a";
+
+        AbstractCloudMetadataServiceConnector mock = mock(AbstractCloudMetadataServiceConnector.class);
+        when(mock.apiCall(any(), anyMap())).thenReturn(az);
+
+        GoogleCloudSnitch snitch = new GoogleCloudSnitch(new SnitchProperties(new Properties()), mock);
         InetAddressAndPort local = InetAddressAndPort.getByName("127.0.0.1");
         InetAddressAndPort nonlocal = InetAddressAndPort.getByName("127.0.0.7");
 
@@ -96,8 +89,11 @@
     @Test
     public void testNewRegions() throws IOException, ConfigurationException
     {
-        az = "asia-east1-a";
-        GoogleCloudSnitch snitch = new TestGoogleCloudSnitch();
+        String az = "asia-east1-a";
+        AbstractCloudMetadataServiceConnector mock = mock(AbstractCloudMetadataServiceConnector.class);
+        when(mock.apiCall(any(), anyMap())).thenReturn(az);
+
+        GoogleCloudSnitch snitch = new GoogleCloudSnitch(new SnitchProperties(new Properties()), mock);
         InetAddressAndPort local = InetAddressAndPort.getByName("127.0.0.1");
         assertEquals("asia-east1", snitch.getDatacenter(local));
         assertEquals("a", snitch.getRack(local));
diff --git a/test/unit/org/apache/cassandra/locator/SnitchUtilsTest.java b/test/unit/org/apache/cassandra/locator/SnitchUtilsTest.java
new file mode 100644
index 0000000..b5b1400
--- /dev/null
+++ b/test/unit/org/apache/cassandra/locator/SnitchUtilsTest.java
@@ -0,0 +1,45 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.cassandra.locator;
+
+import org.junit.Test;
+
+import org.apache.cassandra.utils.Pair;
+
+import static org.assertj.core.api.Assertions.assertThatExceptionOfType;
+import static org.junit.Assert.assertEquals;
+
+public class SnitchUtilsTest
+{
+    @Test
+    public void testSnitchUtils()
+    {
+        Pair<String, String> result = SnitchUtils.parseDcAndRack("my-dc-rack1", "");
+        assertEquals("my-dc", result.left);
+        assertEquals("rack1", result.right);
+
+        result = SnitchUtils.parseDcAndRack("my-rack", "");
+        assertEquals("my", result.left);
+        assertEquals("rack", result.right);
+
+        assertThatExceptionOfType(IllegalStateException.class)
+        .isThrownBy(() -> SnitchUtils.parseDcAndRack("myresponse", ""))
+        .withMessage("myresponse does not contain at least one '-' to differentiate between datacenter and rack");
+    }
+}