Remove BackPressureStrategy

patch by Benedict; reviewed by Sergio Bossa and Robert Stupp for CASSANDRA-15375
diff --git a/conf/cassandra.yaml b/conf/cassandra.yaml
index 4bd72cf..573c936 100644
--- a/conf/cassandra.yaml
+++ b/conf/cassandra.yaml
@@ -1253,28 +1253,6 @@
 # as corrupted. This should be positive and less than 2048.
 # max_value_size_in_mb: 256
 
-# Back-pressure settings #
-# If enabled, the coordinator will apply the back-pressure strategy specified below to each mutation
-# sent to replicas, with the aim of reducing pressure on overloaded replicas.
-back_pressure_enabled: false
-# The back-pressure strategy applied.
-# The default implementation, RateBasedBackPressure, takes three arguments:
-# high ratio, factor, and flow type, and uses the ratio between incoming mutation responses and outgoing mutation requests.
-# If below high ratio, outgoing mutations are rate limited according to the incoming rate decreased by the given factor;
-# if above high ratio, the rate limiting is increased by the given factor;
-# such factor is usually best configured between 1 and 10, use larger values for a faster recovery
-# at the expense of potentially more dropped mutations;
-# the rate limiting is applied according to the flow type: if FAST, it's rate limited at the speed of the fastest replica,
-# if SLOW at the speed of the slowest one.
-# New strategies can be added. Implementors need to implement org.apache.cassandra.net.BackpressureStrategy and
-# provide a public constructor accepting a Map<String, Object>.
-back_pressure_strategy:
-    - class_name: org.apache.cassandra.net.RateBasedBackPressure
-      parameters:
-        - high_ratio: 0.90
-          factor: 5
-          flow: FAST
-
 # Coalescing Strategies #
 # Coalescing multiples messages turns out to significantly boost message processing throughput (think doubling or more).
 # On bare metal, the floor for packet processing throughput is high enough that many applications won't notice, but in
diff --git a/src/java/org/apache/cassandra/config/Config.java b/src/java/org/apache/cassandra/config/Config.java
index 63cfff4..c8af291 100644
--- a/src/java/org/apache/cassandra/config/Config.java
+++ b/src/java/org/apache/cassandra/config/Config.java
@@ -412,7 +412,9 @@
      */
     public UserFunctionTimeoutPolicy user_function_timeout_policy = UserFunctionTimeoutPolicy.die;
 
+    @Deprecated
     public volatile boolean back_pressure_enabled = false;
+    @Deprecated
     public volatile ParameterizedClass back_pressure_strategy;
 
     public volatile int concurrent_validations;
diff --git a/src/java/org/apache/cassandra/config/DatabaseDescriptor.java b/src/java/org/apache/cassandra/config/DatabaseDescriptor.java
index 5f94710..a7559f1 100644
--- a/src/java/org/apache/cassandra/config/DatabaseDescriptor.java
+++ b/src/java/org/apache/cassandra/config/DatabaseDescriptor.java
@@ -68,8 +68,6 @@
 import org.apache.cassandra.locator.InetAddressAndPort;
 import org.apache.cassandra.locator.Replica;
 import org.apache.cassandra.locator.SeedProvider;
-import org.apache.cassandra.net.BackPressureStrategy;
-import org.apache.cassandra.net.RateBasedBackPressure;
 import org.apache.cassandra.security.EncryptionContext;
 import org.apache.cassandra.security.SSLFactory;
 import org.apache.cassandra.service.CacheService.CacheType;
@@ -137,7 +135,6 @@
     private static EncryptionContext encryptionContext;
     private static boolean hasLoggedConfig;
 
-    private static BackPressureStrategy backPressureStrategy;
     private static DiskOptimizationStrategy diskOptimizationStrategy;
 
     private static boolean clientInitialized;
@@ -783,27 +780,6 @@
                 break;
         }
 
-        try
-        {
-            ParameterizedClass strategy = conf.back_pressure_strategy != null ? conf.back_pressure_strategy : RateBasedBackPressure.withDefaultParams();
-            Class<?> clazz = Class.forName(strategy.class_name);
-            if (!BackPressureStrategy.class.isAssignableFrom(clazz))
-                throw new ConfigurationException(strategy + " is not an instance of " + BackPressureStrategy.class.getCanonicalName(), false);
-
-            Constructor<?> ctor = clazz.getConstructor(Map.class);
-            BackPressureStrategy instance = (BackPressureStrategy) ctor.newInstance(strategy.parameters);
-            logger.info("Back-pressure is {} with strategy {}.", backPressureEnabled() ? "enabled" : "disabled", conf.back_pressure_strategy);
-            backPressureStrategy = instance;
-        }
-        catch (ConfigurationException ex)
-        {
-            throw ex;
-        }
-        catch (Exception ex)
-        {
-            throw new ConfigurationException("Error configuring back-pressure strategy: " + conf.back_pressure_strategy, ex);
-        }
-
         if (conf.otc_coalescing_enough_coalesced_messages > 128)
             throw new ConfigurationException("otc_coalescing_enough_coalesced_messages must be smaller than 128", false);
 
@@ -2872,16 +2848,6 @@
         return unsafeSystem;
     }
 
-    public static void setBackPressureEnabled(boolean backPressureEnabled)
-    {
-        conf.back_pressure_enabled = backPressureEnabled;
-    }
-
-    public static boolean backPressureEnabled()
-    {
-        return conf.back_pressure_enabled;
-    }
-
     public static boolean diagnosticEventsEnabled()
     {
         return conf.diagnostic_events_enabled;
@@ -2892,17 +2858,6 @@
         conf.diagnostic_events_enabled = enabled;
     }
 
-    @VisibleForTesting
-    public static void setBackPressureStrategy(BackPressureStrategy strategy)
-    {
-        backPressureStrategy = strategy;
-    }
-
-    public static BackPressureStrategy getBackPressureStrategy()
-    {
-        return backPressureStrategy;
-    }
-
     public static ConsistencyLevel getIdealConsistencyLevel()
     {
         return conf.ideal_consistency_level;
diff --git a/src/java/org/apache/cassandra/hints/HintsDispatcher.java b/src/java/org/apache/cassandra/hints/HintsDispatcher.java
index 39e4b25..743b275 100644
--- a/src/java/org/apache/cassandra/hints/HintsDispatcher.java
+++ b/src/java/org/apache/cassandra/hints/HintsDispatcher.java
@@ -255,11 +255,5 @@
             outcome = Outcome.SUCCESS;
             condition.signalAll();
         }
-
-        @Override
-        public boolean supportsBackPressure()
-        {
-            return true;
-        }
     }
 }
diff --git a/src/java/org/apache/cassandra/net/BackPressureState.java b/src/java/org/apache/cassandra/net/BackPressureState.java
deleted file mode 100644
index de19bf3..0000000
--- a/src/java/org/apache/cassandra/net/BackPressureState.java
+++ /dev/null
@@ -1,51 +0,0 @@
-/*
- * Licensed to the Apache Software Foundation (ASF) under one
- * or more contributor license agreements.  See the NOTICE file
- * distributed with this work for additional information
- * regarding copyright ownership.  The ASF licenses this file
- * to you under the Apache License, Version 2.0 (the
- * "License"); you may not use this file except in compliance
- * with the License.  You may obtain a copy of the License at
- *
- *     http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-package org.apache.cassandra.net;
-
-import org.apache.cassandra.locator.InetAddressAndPort;
-
-/**
- * Interface meant to track the back-pressure state per replica host.
- */
-public interface BackPressureState
-{
-    /**
-     * Called when a message is sent to a replica.
-     */
-    void onMessageSent(Message<?> message);
-
-    /**
-     * Called when a response is received from a replica.
-     */
-    void onResponseReceived();
-
-    /**
-     * Called when no response is received from replica.
-     */
-    void onResponseTimeout();
-
-    /**
-     * Gets the current back-pressure rate limit.
-     */
-    double getBackPressureRateLimit();
-
-    /**
-     * Returns the host this state refers to.
-     */
-    InetAddressAndPort getHost();
-}
diff --git a/src/java/org/apache/cassandra/net/BackPressureStrategy.java b/src/java/org/apache/cassandra/net/BackPressureStrategy.java
deleted file mode 100644
index 6b49495..0000000
--- a/src/java/org/apache/cassandra/net/BackPressureStrategy.java
+++ /dev/null
@@ -1,44 +0,0 @@
-/*
- * Licensed to the Apache Software Foundation (ASF) under one
- * or more contributor license agreements.  See the NOTICE file
- * distributed with this work for additional information
- * regarding copyright ownership.  The ASF licenses this file
- * to you under the Apache License, Version 2.0 (the
- * "License"); you may not use this file except in compliance
- * with the License.  You may obtain a copy of the License at
- *
- *     http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-package org.apache.cassandra.net;
-
-import java.util.Set;
-import java.util.concurrent.TimeUnit;
-
-import org.apache.cassandra.locator.InetAddressAndPort;
-
-/**
- * Back-pressure algorithm interface.
- * <p>
- * For experts usage only. Implementors must provide a constructor accepting a single {@code Map<String, Object>} argument,
- * representing any parameters eventually required by the specific implementation.
- * </p>
- */
-public interface BackPressureStrategy<S extends BackPressureState>
-{
-    /**
-     * Applies the back-pressure algorithm, based and acting on the given {@link BackPressureState}s, and up to the given
-     * timeout.
-     */
-    void apply(Set<S> states, long timeout, TimeUnit unit);
-
-    /**
-     * Creates a new {@link BackPressureState} initialized as needed by the specific implementation.
-     */
-    S newState(InetAddressAndPort host);
-}
diff --git a/src/java/org/apache/cassandra/net/MessagingService.java b/src/java/org/apache/cassandra/net/MessagingService.java
index beec083..0827f78 100644
--- a/src/java/org/apache/cassandra/net/MessagingService.java
+++ b/src/java/org/apache/cassandra/net/MessagingService.java
@@ -242,9 +242,6 @@
     final ResourceLimits.Limit outboundGlobalReserveLimit =
         new ResourceLimits.Concurrent(DatabaseDescriptor.getInternodeApplicationSendQueueReserveGlobalCapacityInBytes());
 
-    // back-pressure implementation
-    private final BackPressureStrategy backPressure = DatabaseDescriptor.getBackPressureStrategy();
-
     private volatile boolean isShuttingDown;
 
     @VisibleForTesting
@@ -271,7 +268,6 @@
     public void sendWithCallback(Message message, InetAddressAndPort to, RequestCallback cb, ConnectionType specifyConnection)
     {
         callbacks.addWithExpiration(cb, message, to);
-        updateBackPressureOnSend(to, cb, message);
         if (cb.invokeOnFailure() && !message.callBackOnFailure())
             message = message.withCallBackOnFailure();
         send(message, to, specifyConnection);
@@ -292,7 +288,6 @@
     {
         assert message.callBackOnFailure();
         callbacks.addWithExpiration(handler, message, to, handler.consistencyLevel(), allowHints);
-        updateBackPressureOnSend(to.endpoint(), handler, message);
         send(message, to.endpoint(), null);
     }
 
@@ -343,74 +338,6 @@
         }
     }
 
-    /**
-     * Updates the back-pressure state on sending to the given host if enabled and the given message callback supports it.
-     *
-     * @param host The replica host the back-pressure state refers to.
-     * @param callback The message callback.
-     * @param message The actual message.
-     */
-    void updateBackPressureOnSend(InetAddressAndPort host, RequestCallback callback, Message<?> message)
-    {
-        if (DatabaseDescriptor.backPressureEnabled() && callback.supportsBackPressure())
-        {
-            BackPressureState backPressureState = getBackPressureState(host);
-            if (backPressureState != null)
-                backPressureState.onMessageSent(message);
-        }
-    }
-
-    /**
-     * Updates the back-pressure state on reception from the given host if enabled and the given message callback supports it.
-     *
-     * @param host The replica host the back-pressure state refers to.
-     * @param callback The message callback.
-     * @param timeout True if updated following a timeout, false otherwise.
-     */
-    void updateBackPressureOnReceive(InetAddressAndPort host, RequestCallback callback, boolean timeout)
-    {
-        if (DatabaseDescriptor.backPressureEnabled() && callback.supportsBackPressure())
-        {
-            BackPressureState backPressureState = getBackPressureState(host);
-            if (backPressureState == null)
-                return;
-            if (!timeout)
-                backPressureState.onResponseReceived();
-            else
-                backPressureState.onResponseTimeout();
-        }
-    }
-
-    /**
-     * Applies back-pressure for the given hosts, according to the configured strategy.
-     *
-     * If the local host is present, it is removed from the pool, as back-pressure is only applied
-     * to remote hosts.
-     *
-     * @param hosts The hosts to apply back-pressure to.
-     * @param timeoutInNanos The max back-pressure timeout.
-     */
-    public void applyBackPressure(Iterable<InetAddressAndPort> hosts, long timeoutInNanos)
-    {
-        if (DatabaseDescriptor.backPressureEnabled())
-        {
-            Set<BackPressureState> states = new HashSet<>();
-            for (InetAddressAndPort host : hosts)
-            {
-                if (host.equals(FBUtilities.getBroadcastAddressAndPort()))
-                    continue;
-                states.add(getOutbound(host).getBackPressureState());
-            }
-            //noinspection unchecked
-            backPressure.apply(states, timeoutInNanos, NANOSECONDS);
-        }
-    }
-
-    BackPressureState getBackPressureState(InetAddressAndPort host)
-    {
-        return getOutbound(host).getBackPressureState();
-    }
-
     void markExpiredCallback(InetAddressAndPort addr)
     {
         OutboundConnections conn = channelManagers.get(addr);
@@ -552,7 +479,7 @@
     {
         OutboundConnections connections = channelManagers.get(to);
         if (connections == null)
-            connections = OutboundConnections.tryRegister(channelManagers, to, new OutboundConnectionSettings(to).withDefaults(ConnectionCategory.MESSAGING), backPressure.newState(to));
+            connections = OutboundConnections.tryRegister(channelManagers, to, new OutboundConnectionSettings(to).withDefaults(ConnectionCategory.MESSAGING));
         return connections;
     }
 
diff --git a/src/java/org/apache/cassandra/net/MessagingServiceMBean.java b/src/java/org/apache/cassandra/net/MessagingServiceMBean.java
index 732a5ed..886459e 100644
--- a/src/java/org/apache/cassandra/net/MessagingServiceMBean.java
+++ b/src/java/org/apache/cassandra/net/MessagingServiceMBean.java
@@ -117,16 +117,17 @@
      */
     @Deprecated
     public Map<String, Double> getBackPressurePerHost();
-    public Map<String, Double> getBackPressurePerHostWithPort();
 
     /**
      * Enable/Disable back-pressure
      */
+    @Deprecated
     public void setBackPressureEnabled(boolean enabled);
 
     /**
      * Get back-pressure enabled state
      */
+    @Deprecated
     public boolean isBackPressureEnabled();
 
     public int getVersion(String address) throws UnknownHostException;
diff --git a/src/java/org/apache/cassandra/net/MessagingServiceMBeanImpl.java b/src/java/org/apache/cassandra/net/MessagingServiceMBeanImpl.java
index b48ae1c..bea2b8c 100644
--- a/src/java/org/apache/cassandra/net/MessagingServiceMBeanImpl.java
+++ b/src/java/org/apache/cassandra/net/MessagingServiceMBeanImpl.java
@@ -19,6 +19,7 @@
 
 import java.io.IOException;
 import java.net.UnknownHostException;
+import java.util.Collections;
 import java.util.HashMap;
 import java.util.Map;
 import java.util.concurrent.ConcurrentHashMap;
@@ -258,33 +259,19 @@
     @Override
     public Map<String, Double> getBackPressurePerHost()
     {
-        Map<String, Double> map = new HashMap<>(channelManagers.size());
-        for (Map.Entry<InetAddressAndPort, OutboundConnections> entry : channelManagers.entrySet())
-            map.put(entry.getKey().toString(false), entry.getValue().getBackPressureState().getBackPressureRateLimit());
-
-        return map;
-    }
-
-    @Override
-    public Map<String, Double> getBackPressurePerHostWithPort()
-    {
-        Map<String, Double> map = new HashMap<>(channelManagers.size());
-        for (Map.Entry<InetAddressAndPort, OutboundConnections> entry : channelManagers.entrySet())
-            map.put(entry.getKey().toString(false), entry.getValue().getBackPressureState().getBackPressureRateLimit());
-
-        return map;
+        throw new UnsupportedOperationException("This feature has been removed");
     }
 
     @Override
     public void setBackPressureEnabled(boolean enabled)
     {
-        DatabaseDescriptor.setBackPressureEnabled(enabled);
+        throw new UnsupportedOperationException("This feature has been removed");
     }
 
     @Override
     public boolean isBackPressureEnabled()
     {
-        return DatabaseDescriptor.backPressureEnabled();
+        return false;
     }
 
     @Override
diff --git a/src/java/org/apache/cassandra/net/OutboundConnections.java b/src/java/org/apache/cassandra/net/OutboundConnections.java
index c900908..029d5e1 100644
--- a/src/java/org/apache/cassandra/net/OutboundConnections.java
+++ b/src/java/org/apache/cassandra/net/OutboundConnections.java
@@ -53,7 +53,6 @@
 
     private final SimpleCondition metricsReady = new SimpleCondition();
     private volatile InternodeOutboundMetrics metrics;
-    private final BackPressureState backPressureState;
     private final ResourceLimits.Limit reserveCapacity;
 
     private OutboundConnectionSettings template;
@@ -61,9 +60,8 @@
     public final OutboundConnection large;
     public final OutboundConnection urgent;
 
-    private OutboundConnections(OutboundConnectionSettings template, BackPressureState backPressureState)
+    private OutboundConnections(OutboundConnectionSettings template)
     {
-        this.backPressureState = backPressureState;
         this.template = template = template.withDefaultReserveLimits();
         reserveCapacity = new ResourceLimits.Concurrent(template.applicationSendQueueReserveEndpointCapacityInBytes);
         ResourceLimits.EndpointAndGlobal reserveCapacityInBytes = new ResourceLimits.EndpointAndGlobal(reserveCapacity, template.applicationSendQueueReserveGlobalCapacityInBytes);
@@ -80,12 +78,12 @@
         connectionFor(msg, type).enqueue(msg);
     }
 
-    static <K> OutboundConnections tryRegister(ConcurrentMap<K, OutboundConnections> in, K key, OutboundConnectionSettings settings, BackPressureState backPressureState)
+    static <K> OutboundConnections tryRegister(ConcurrentMap<K, OutboundConnections> in, K key, OutboundConnectionSettings settings)
     {
         OutboundConnections connections = in.get(key);
         if (connections == null)
         {
-            connections = new OutboundConnections(settings, backPressureState);
+            connections = new OutboundConnections(settings);
             OutboundConnections existing = in.putIfAbsent(key, connections);
 
             if (existing == null)
@@ -103,11 +101,6 @@
         return connections;
     }
 
-    BackPressureState getBackPressureState()
-    {
-        return backPressureState;
-    }
-
     /**
      * Reconnect to the peer using the given {@code addr}. Outstanding messages in each channel will be sent on the
      * current channel. Typically this function is used for something like EC2 public IP addresses which need to be used
@@ -313,9 +306,9 @@
     }
 
     @VisibleForTesting
-    static OutboundConnections unsafeCreate(OutboundConnectionSettings template, BackPressureState backPressureState)
+    static OutboundConnections unsafeCreate(OutboundConnectionSettings template)
     {
-        OutboundConnections connections = new OutboundConnections(template, backPressureState);
+        OutboundConnections connections = new OutboundConnections(template);
         connections.metricsReady.signalAll();
         return connections;
     }
diff --git a/src/java/org/apache/cassandra/net/RateBasedBackPressure.java b/src/java/org/apache/cassandra/net/RateBasedBackPressure.java
deleted file mode 100644
index 02d8cce..0000000
--- a/src/java/org/apache/cassandra/net/RateBasedBackPressure.java
+++ /dev/null
@@ -1,297 +0,0 @@
-/*
- * Licensed to the Apache Software Foundation (ASF) under one
- * or more contributor license agreements.  See the NOTICE file
- * distributed with this work for additional information
- * regarding copyright ownership.  The ASF licenses this file
- * to you under the Apache License, Version 2.0 (the
- * "License"); you may not use this file except in compliance
- * with the License.  You may obtain a copy of the License at
- *
- *     http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-package org.apache.cassandra.net;
-
-import java.util.Map;
-import java.util.Set;
-import java.util.concurrent.TimeUnit;
-
-import com.github.benmanes.caffeine.cache.Cache;
-import com.github.benmanes.caffeine.cache.Caffeine;
-import com.google.common.annotations.VisibleForTesting;
-import com.google.common.collect.ImmutableMap;
-import com.google.common.util.concurrent.MoreExecutors;
-import com.google.common.util.concurrent.RateLimiter;
-
-import org.slf4j.Logger;
-import org.slf4j.LoggerFactory;
-
-import org.apache.cassandra.config.DatabaseDescriptor;
-import org.apache.cassandra.config.ParameterizedClass;
-import org.apache.cassandra.locator.InetAddressAndPort;
-import org.apache.cassandra.utils.NoSpamLogger;
-import org.apache.cassandra.utils.SystemTimeSource;
-import org.apache.cassandra.utils.TimeSource;
-import org.apache.cassandra.utils.concurrent.IntervalLock;
-
-import static java.util.concurrent.TimeUnit.MILLISECONDS;
-
-/**
- * Back-pressure algorithm based on rate limiting according to the ratio between incoming and outgoing rates, computed
- * over a sliding time window with size equal to write RPC timeout.
- */
-public class RateBasedBackPressure implements BackPressureStrategy<RateBasedBackPressureState>
-{
-    static final String HIGH_RATIO = "high_ratio";
-    static final String FACTOR = "factor";
-    static final String FLOW = "flow";
-    private static final String BACK_PRESSURE_HIGH_RATIO = "0.90";
-    private static final String BACK_PRESSURE_FACTOR = "5";
-    private static final String BACK_PRESSURE_FLOW = "FAST";
-
-    private static final Logger logger = LoggerFactory.getLogger(RateBasedBackPressure.class);
-    private static final NoSpamLogger tenSecsNoSpamLogger = NoSpamLogger.getLogger(logger, 10, TimeUnit.SECONDS);
-    private static final NoSpamLogger oneMinNoSpamLogger = NoSpamLogger.getLogger(logger, 1, TimeUnit.MINUTES);
-
-    protected final TimeSource timeSource;
-    protected final double highRatio;
-    protected final int factor;
-    protected final Flow flow;
-    protected final long windowSize;
-
-    private final Cache<Set<RateBasedBackPressureState>, IntervalRateLimiter> rateLimiters =
-            Caffeine.newBuilder()
-                    .expireAfterAccess(1, TimeUnit.HOURS)
-                    .executor(MoreExecutors.directExecutor())
-                    .build();
-
-    enum Flow
-    {
-        FAST,
-        SLOW
-    }
-
-    public static ParameterizedClass withDefaultParams()
-    {
-        return new ParameterizedClass(RateBasedBackPressure.class.getName(),
-                                      ImmutableMap.of(HIGH_RATIO, BACK_PRESSURE_HIGH_RATIO,
-                                                      FACTOR, BACK_PRESSURE_FACTOR,
-                                                      FLOW, BACK_PRESSURE_FLOW));
-    }
-
-    public RateBasedBackPressure(Map<String, Object> args)
-    {
-        this(args, new SystemTimeSource(), DatabaseDescriptor.getWriteRpcTimeout(MILLISECONDS));
-    }
-
-    @VisibleForTesting
-    public RateBasedBackPressure(Map<String, Object> args, TimeSource timeSource, long windowSize)
-    {
-        if (args.size() != 3)
-        {
-            throw new IllegalArgumentException(RateBasedBackPressure.class.getCanonicalName()
-                    + " requires 3 arguments: high ratio, back-pressure factor and flow type.");
-        }
-
-        try
-        {
-            highRatio = Double.parseDouble(args.getOrDefault(HIGH_RATIO, "").toString().trim());
-            factor = Integer.parseInt(args.getOrDefault(FACTOR, "").toString().trim());
-            flow = Flow.valueOf(args.getOrDefault(FLOW, "").toString().trim().toUpperCase());
-        }
-        catch (Exception ex)
-        {
-            throw new IllegalArgumentException(ex.getMessage(), ex);
-        }
-
-        if (highRatio <= 0 || highRatio > 1)
-        {
-            throw new IllegalArgumentException("Back-pressure high ratio must be > 0 and <= 1");
-        }
-        if (factor < 1)
-        {
-            throw new IllegalArgumentException("Back-pressure factor must be >= 1");
-        }
-        if (windowSize < 10)
-        {
-            throw new IllegalArgumentException("Back-pressure window size must be >= 10");
-        }
-
-        this.timeSource = timeSource;
-        this.windowSize = windowSize;
-
-        logger.info("Initialized back-pressure with high ratio: {}, factor: {}, flow: {}, window size: {}.",
-                    highRatio, factor, flow, windowSize);
-    }
-
-    @Override
-    public void apply(Set<RateBasedBackPressureState> states, long timeout, TimeUnit unit)
-    {
-        // Go through the back-pressure states, try updating each of them and collect min/max rates:
-        boolean isUpdated = false;
-        double minRateLimit = Double.POSITIVE_INFINITY;
-        double maxRateLimit = Double.NEGATIVE_INFINITY;
-        double minIncomingRate = Double.POSITIVE_INFINITY;
-        RateLimiter currentMin = null;
-        RateLimiter currentMax = null;
-        for (RateBasedBackPressureState backPressure : states)
-        {
-            // Get the incoming/outgoing rates:
-            double incomingRate = backPressure.incomingRate.get(TimeUnit.SECONDS);
-            double outgoingRate = backPressure.outgoingRate.get(TimeUnit.SECONDS);
-            // Compute the min incoming rate:
-            if (incomingRate < minIncomingRate)
-                minIncomingRate = incomingRate;
-
-            // Try acquiring the interval lock:
-            if (backPressure.tryIntervalLock(windowSize))
-            {
-                // If acquired, proceed updating thi back-pressure state rate limit:
-                isUpdated = true;
-                try
-                {
-                    RateLimiter limiter = backPressure.rateLimiter;
-
-                    // If we have sent any outgoing requests during this time window, go ahead with rate limiting
-                    // (this is safe against concurrent back-pressure state updates thanks to the rw-locking in
-                    // RateBasedBackPressureState):
-                    if (outgoingRate > 0)
-                    {
-                        // Compute the incoming/outgoing ratio:
-                        double actualRatio = incomingRate / outgoingRate;
-
-                        // If the ratio is above the high mark, try growing by the back-pressure factor:
-                        double limiterRate = limiter.getRate();
-                        if (actualRatio >= highRatio)
-                        {
-                            // Only if the outgoing rate is able to keep up with the rate increase:
-                            if (limiterRate <= outgoingRate)
-                            {
-                                double newRate = limiterRate + ((limiterRate * factor) / 100);
-                                if (newRate > 0 && newRate != Double.POSITIVE_INFINITY)
-                                {
-                                    limiter.setRate(newRate);
-                                }
-                            }
-                        }
-                        // If below, set the rate limiter at the incoming rate, decreased by factor:
-                        else
-                        {
-                            // Only if the new rate is actually less than the actual rate:
-                            double newRate = incomingRate - ((incomingRate * factor) / 100);
-                            if (newRate > 0 && newRate < limiterRate)
-                            {
-                                limiter.setRate(newRate);
-                            }
-                        }
-                        if (logger.isTraceEnabled())
-                        {
-                            logger.trace("Back-pressure state for {}: incoming rate {}, outgoing rate {}, ratio {}, rate limiting {}",
-                                         backPressure.getHost(), incomingRate, outgoingRate, actualRatio, limiter.getRate());
-                        }
-                    }
-                    // Otherwise reset the rate limiter:
-                    else
-                    {
-                        limiter.setRate(Double.POSITIVE_INFINITY);
-                    }
-
-                    // Housekeeping: pruning windows and resetting the last check timestamp!
-                    backPressure.incomingRate.prune();
-                    backPressure.outgoingRate.prune();
-                }
-                finally
-                {
-                    backPressure.releaseIntervalLock();
-                }
-            }
-            if (backPressure.rateLimiter.getRate() <= minRateLimit)
-            {
-                minRateLimit = backPressure.rateLimiter.getRate();
-                currentMin = backPressure.rateLimiter;
-            }
-            if (backPressure.rateLimiter.getRate() >= maxRateLimit)
-            {
-                maxRateLimit = backPressure.rateLimiter.getRate();
-                currentMax = backPressure.rateLimiter;
-            }
-        }
-
-        // Now find the rate limiter corresponding to the replica group represented by these back-pressure states:
-        if (!states.isEmpty())
-        {
-            // Get the rate limiter:
-            IntervalRateLimiter rateLimiter = rateLimiters.get(states, key -> new IntervalRateLimiter(timeSource));
-
-            // If the back-pressure was updated and we acquire the interval lock for the rate limiter of this group:
-            if (isUpdated && rateLimiter.tryIntervalLock(windowSize))
-            {
-                try
-                {
-                    // Update the rate limiter value based on the configured flow:
-                    if (flow.equals(Flow.FAST))
-                        rateLimiter.limiter = currentMax;
-                    else
-                        rateLimiter.limiter = currentMin;
-
-                    tenSecsNoSpamLogger.info("{} currently applied for remote replicas: {}", rateLimiter.limiter, states);
-                }
-                finally
-                {
-                    rateLimiter.releaseIntervalLock();
-                }
-            }
-            // Assigning a single rate limiter per replica group once per window size allows the back-pressure rate
-            // limiting to be stable within the group itself.
-
-            // Finally apply the rate limit with a max pause time equal to the provided timeout minus the
-            // response time computed from the incoming rate, to reduce the number of client timeouts by taking into
-            // account how long it could take to process responses after back-pressure:
-            long responseTimeInNanos = (long) (TimeUnit.NANOSECONDS.convert(1, TimeUnit.SECONDS) / minIncomingRate);
-            doRateLimit(rateLimiter.limiter, Math.max(0, TimeUnit.NANOSECONDS.convert(timeout, unit) - responseTimeInNanos));
-        }
-    }
-
-    @Override
-    public RateBasedBackPressureState newState(InetAddressAndPort host)
-    {
-        return new RateBasedBackPressureState(host, timeSource, windowSize);
-    }
-
-    @VisibleForTesting
-    RateLimiter getRateLimiterForReplicaGroup(Set<RateBasedBackPressureState> states)
-    {
-        IntervalRateLimiter rateLimiter = rateLimiters.getIfPresent(states);
-        return rateLimiter != null ? rateLimiter.limiter : RateLimiter.create(Double.POSITIVE_INFINITY);
-    }
-
-    @VisibleForTesting
-    boolean doRateLimit(RateLimiter rateLimiter, long timeoutInNanos)
-    {
-        if (!rateLimiter.tryAcquire(1, timeoutInNanos, TimeUnit.NANOSECONDS))
-        {
-            timeSource.sleepUninterruptibly(timeoutInNanos, TimeUnit.NANOSECONDS);
-            oneMinNoSpamLogger.info("Cannot apply {} due to exceeding write timeout, pausing {} nanoseconds instead.",
-                                    rateLimiter, timeoutInNanos);
-
-            return false;
-        }
-
-        return true;
-    }
-
-    private static class IntervalRateLimiter extends IntervalLock
-    {
-        public volatile RateLimiter limiter = RateLimiter.create(Double.POSITIVE_INFINITY);
-
-        IntervalRateLimiter(TimeSource timeSource)
-        {
-            super(timeSource);
-        }
-    }
-}
diff --git a/src/java/org/apache/cassandra/net/RateBasedBackPressureState.java b/src/java/org/apache/cassandra/net/RateBasedBackPressureState.java
deleted file mode 100644
index a150874..0000000
--- a/src/java/org/apache/cassandra/net/RateBasedBackPressureState.java
+++ /dev/null
@@ -1,130 +0,0 @@
-/*
- * Licensed to the Apache Software Foundation (ASF) under one
- * or more contributor license agreements.  See the NOTICE file
- * distributed with this work for additional information
- * regarding copyright ownership.  The ASF licenses this file
- * to you under the Apache License, Version 2.0 (the
- * "License"); you may not use this file except in compliance
- * with the License.  You may obtain a copy of the License at
- *
- *     http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-package org.apache.cassandra.net;
-
-import java.util.concurrent.TimeUnit;
-
-import com.google.common.util.concurrent.RateLimiter;
-
-import org.apache.cassandra.locator.InetAddressAndPort;
-import org.apache.cassandra.utils.SlidingTimeRate;
-import org.apache.cassandra.utils.TimeSource;
-import org.apache.cassandra.utils.concurrent.IntervalLock;
-
-/**
- * The rate-based back-pressure state, tracked per replica host.
- * <p>
- * This back-pressure state is made up of the following attributes:
- * <ul>
- * <li>windowSize: the length of the back-pressure window in milliseconds.</li>
- * <li>incomingRate: the rate of back-pressure supporting incoming messages.</li>
- * <li>outgoingRate: the rate of back-pressure supporting outgoing messages.</li>
- * <li>rateLimiter: the rate limiter to eventually apply to outgoing messages.</li>
- * </ul>
- * The incomingRate and outgoingRate are updated together when a response is received to guarantee consistency between
- * the two.
- * <p>
- * It also provides methods to exclusively lock/release back-pressure windows at given intervals;
- * this allows to apply back-pressure even under concurrent modifications. Please also note a read lock is acquired
- * during response processing so that no concurrent rate updates can screw rate computations.
- * </p>
- */
-class RateBasedBackPressureState extends IntervalLock implements BackPressureState
-{
-    private final InetAddressAndPort host;
-    final SlidingTimeRate incomingRate;
-    final SlidingTimeRate outgoingRate;
-    final RateLimiter rateLimiter;
-
-    RateBasedBackPressureState(InetAddressAndPort host, TimeSource timeSource, long windowSize)
-    {
-        super(timeSource);
-        this.host = host;
-        this.incomingRate = new SlidingTimeRate(timeSource, windowSize, windowSize / 10, TimeUnit.MILLISECONDS);
-        this.outgoingRate = new SlidingTimeRate(timeSource, windowSize, windowSize / 10, TimeUnit.MILLISECONDS);
-        this.rateLimiter = RateLimiter.create(Double.POSITIVE_INFINITY);
-    }
-
-    @Override
-    public void onMessageSent(Message<?> message) {}
-
-    @Override
-    public void onResponseReceived()
-    {
-        readLock().lock();
-        try
-        {
-            incomingRate.update(1);
-            outgoingRate.update(1);
-        }
-        finally
-        {
-            readLock().unlock();
-        }
-    }
-
-    @Override
-    public void onResponseTimeout()
-    {
-        readLock().lock();
-        try
-        {
-            outgoingRate.update(1);
-        }
-        finally
-        {
-            readLock().unlock();
-        }
-    }
-
-    @Override
-    public double getBackPressureRateLimit()
-    {
-        return rateLimiter.getRate();
-    }
-
-    @Override
-    public InetAddressAndPort getHost()
-    {
-        return host;
-    }
-
-    @Override
-    public boolean equals(Object obj)
-    {
-        if (obj instanceof RateBasedBackPressureState)
-        {
-            RateBasedBackPressureState other = (RateBasedBackPressureState) obj;
-            return this.host.equals(other.host);
-        }
-        return false;
-    }
-
-    @Override
-    public int hashCode()
-    {
-        return this.host.hashCode();
-    }
-
-    @Override
-    public String toString()
-    {
-        return String.format("[host: %s, incoming rate: %.3f, outgoing rate: %.3f, rate limit: %.3f]",
-                             host, incomingRate.get(TimeUnit.SECONDS), outgoingRate.get(TimeUnit.SECONDS), rateLimiter.getRate());
-    }
-}
diff --git a/src/java/org/apache/cassandra/net/RequestCallback.java b/src/java/org/apache/cassandra/net/RequestCallback.java
index 9ed3a4b..5bbe011 100644
--- a/src/java/org/apache/cassandra/net/RequestCallback.java
+++ b/src/java/org/apache/cassandra/net/RequestCallback.java
@@ -56,9 +56,4 @@
     {
         return false;
     }
-
-    default boolean supportsBackPressure()
-    {
-        return false;
-    }
 }
diff --git a/src/java/org/apache/cassandra/net/RequestCallbacks.java b/src/java/org/apache/cassandra/net/RequestCallbacks.java
index d5424ed..94044e1 100644
--- a/src/java/org/apache/cassandra/net/RequestCallbacks.java
+++ b/src/java/org/apache/cassandra/net/RequestCallbacks.java
@@ -167,9 +167,6 @@
         InternodeOutboundMetrics.totalExpiredCallbacks.mark();
         messagingService.markExpiredCallback(info.peer);
 
-        if (info.callback.supportsBackPressure())
-            messagingService.updateBackPressureOnReceive(info.peer, info.callback, true);
-
         if (info.invokeOnFailure())
             INTERNAL_RESPONSE.submit(() -> info.callback.onFailure(info.peer, RequestFailureReason.TIMEOUT));
 
diff --git a/src/java/org/apache/cassandra/net/ResponseVerbHandler.java b/src/java/org/apache/cassandra/net/ResponseVerbHandler.java
index e5779ab..369e5f4 100644
--- a/src/java/org/apache/cassandra/net/ResponseVerbHandler.java
+++ b/src/java/org/apache/cassandra/net/ResponseVerbHandler.java
@@ -57,10 +57,5 @@
             MessagingService.instance().latencySubscribers.maybeAdd(cb, message.from(), latencyNanos, NANOSECONDS);
             cb.onResponse(message);
         }
-
-        if (callbackInfo.callback.supportsBackPressure())
-        {
-            MessagingService.instance().updateBackPressureOnReceive(message.from(), cb, false);
-        }
     }
 }
diff --git a/src/java/org/apache/cassandra/service/AbstractWriteResponseHandler.java b/src/java/org/apache/cassandra/service/AbstractWriteResponseHandler.java
index b1eb5b3..4f384a4 100644
--- a/src/java/org/apache/cassandra/service/AbstractWriteResponseHandler.java
+++ b/src/java/org/apache/cassandra/service/AbstractWriteResponseHandler.java
@@ -63,7 +63,6 @@
     private volatile int failures = 0;
     private final Map<InetAddressAndPort, RequestFailureReason> failureReasonByEndpoint;
     private final long queryStartNanoTime;
-    private volatile boolean supportsBackPressure = true;
 
     /**
       * Delegate to another WriteResponseHandler or possibly this one to track if the ideal consistency level was reached.
@@ -270,17 +269,6 @@
         return true;
     }
 
-    @Override
-    public boolean supportsBackPressure()
-    {
-        return supportsBackPressure;
-    }
-
-    public void setSupportsBackPressure(boolean supportsBackPressure)
-    {
-        this.supportsBackPressure = supportsBackPressure;
-    }
-
     /**
      * Decrement the counter for all responses/expirations and if the counter
      * hits 0 check to see if the ideal consistency level (this write response handler)
diff --git a/src/java/org/apache/cassandra/service/StorageProxy.java b/src/java/org/apache/cassandra/service/StorageProxy.java
index 0aedd3d..a5a6e3c 100644
--- a/src/java/org/apache/cassandra/service/StorageProxy.java
+++ b/src/java/org/apache/cassandra/service/StorageProxy.java
@@ -617,7 +617,6 @@
         {
             AbstractReplicationStrategy rs = keyspace.getReplicationStrategy();
             responseHandler = rs.getWriteResponseHandler(replicaPlan, null, WriteType.SIMPLE, queryStartNanoTime);
-            responseHandler.setSupportsBackPressure(false);
         }
 
         Message<Commit> message = Message.outWithFlag(PAXOS_COMMIT_REQ, proposal, MessageFlag.CALL_BACK_ON_FAILURE);
@@ -1339,9 +1338,6 @@
             }
         }
 
-        if (backPressureHosts != null)
-            MessagingService.instance().applyBackPressure(backPressureHosts, responseHandler.currentTimeoutNanos());
-
         if (endpointsToHint != null)
             submitHint(mutation, EndpointsForToken.copyOf(mutation.key().getToken(), endpointsToHint), responseHandler);
 
diff --git a/test/unit/org/apache/cassandra/config/DatabaseDescriptorRefTest.java b/test/unit/org/apache/cassandra/config/DatabaseDescriptorRefTest.java
index c43d622..576862b 100644
--- a/test/unit/org/apache/cassandra/config/DatabaseDescriptorRefTest.java
+++ b/test/unit/org/apache/cassandra/config/DatabaseDescriptorRefTest.java
@@ -140,7 +140,6 @@
     "org.apache.cassandra.locator.Replica",
     "org.apache.cassandra.locator.SimpleSeedProvider",
     "org.apache.cassandra.locator.SeedProvider",
-    "org.apache.cassandra.net.BackPressureStrategy",
     "org.apache.cassandra.security.EncryptionContext",
     "org.apache.cassandra.service.CacheService$CacheType",
     "org.apache.cassandra.utils.binlog.BinLogOptions",
diff --git a/test/unit/org/apache/cassandra/locator/ReconnectableSnitchHelperTest.java b/test/unit/org/apache/cassandra/locator/ReconnectableSnitchHelperTest.java
index b1c3775..0c2f272 100644
--- a/test/unit/org/apache/cassandra/locator/ReconnectableSnitchHelperTest.java
+++ b/test/unit/org/apache/cassandra/locator/ReconnectableSnitchHelperTest.java
@@ -38,7 +38,6 @@
     public static void beforeClass() throws UnknownHostException
     {
         DatabaseDescriptor.daemonInitialization();
-        DatabaseDescriptor.setBackPressureStrategy(new MessagingServiceTest.MockBackPressureStrategy(Collections.emptyMap()));
         DatabaseDescriptor.setBroadcastAddress(InetAddress.getByName("127.0.0.1"));
     }
 
diff --git a/test/unit/org/apache/cassandra/net/MessagingServiceTest.java b/test/unit/org/apache/cassandra/net/MessagingServiceTest.java
index 9ce041b..7bae3fa 100644
--- a/test/unit/org/apache/cassandra/net/MessagingServiceTest.java
+++ b/test/unit/org/apache/cassandra/net/MessagingServiceTest.java
@@ -85,7 +85,6 @@
     {
         DatabaseDescriptor.daemonInitialization();
         CommitLog.instance.start();
-        DatabaseDescriptor.setBackPressureStrategy(new MockBackPressureStrategy(Collections.emptyMap()));
         DatabaseDescriptor.setBroadcastAddress(InetAddress.getByName("127.0.0.1"));
         originalAuthenticator = DatabaseDescriptor.getInternodeAuthenticator();
         originalServerEncryptionOptions = DatabaseDescriptor.getInternodeMessagingEncyptionOptions();
@@ -98,7 +97,6 @@
     public void before() throws UnknownHostException
     {
         messagingService.metrics.resetDroppedMessages(Integer.toString(metricScopeId++));
-        MockBackPressureStrategy.applied = false;
         messagingService.closeOutbound(InetAddressAndPort.getByName("127.0.0.2"));
         messagingService.closeOutbound(InetAddressAndPort.getByName("127.0.0.3"));
     }
@@ -213,195 +211,11 @@
         assertNull(queueWaitLatency.get(verb));
     }
 
-    @Test
-    public void testUpdatesBackPressureOnSendWhenEnabledAndWithSupportedCallback() throws UnknownHostException
-    {
-        MockBackPressureStrategy.MockBackPressureState backPressureState = (MockBackPressureStrategy.MockBackPressureState) messagingService.getBackPressureState(InetAddressAndPort.getByName("127.0.0.2"));
-        RequestCallback bpCallback = new BackPressureCallback();
-        RequestCallback noCallback = new NoBackPressureCallback();
-        Message<?> ignored = null;
-
-        DatabaseDescriptor.setBackPressureEnabled(true);
-        messagingService.updateBackPressureOnSend(InetAddressAndPort.getByName("127.0.0.2"), noCallback, ignored);
-        assertFalse(backPressureState.onSend);
-
-        DatabaseDescriptor.setBackPressureEnabled(false);
-        messagingService.updateBackPressureOnSend(InetAddressAndPort.getByName("127.0.0.2"), bpCallback, ignored);
-        assertFalse(backPressureState.onSend);
-
-        DatabaseDescriptor.setBackPressureEnabled(true);
-        messagingService.updateBackPressureOnSend(InetAddressAndPort.getByName("127.0.0.2"), bpCallback, ignored);
-        assertTrue(backPressureState.onSend);
-    }
-
-    @Test
-    public void testUpdatesBackPressureOnReceiveWhenEnabledAndWithSupportedCallback() throws UnknownHostException
-    {
-        MockBackPressureStrategy.MockBackPressureState backPressureState = (MockBackPressureStrategy.MockBackPressureState) messagingService.getBackPressureState(InetAddressAndPort.getByName("127.0.0.2"));
-        RequestCallback bpCallback = new BackPressureCallback();
-        RequestCallback noCallback = new NoBackPressureCallback();
-        boolean timeout = false;
-
-        DatabaseDescriptor.setBackPressureEnabled(true);
-        messagingService.updateBackPressureOnReceive(InetAddressAndPort.getByName("127.0.0.2"), noCallback, timeout);
-        assertFalse(backPressureState.onReceive);
-        assertFalse(backPressureState.onTimeout);
-
-        DatabaseDescriptor.setBackPressureEnabled(false);
-        messagingService.updateBackPressureOnReceive(InetAddressAndPort.getByName("127.0.0.2"), bpCallback, timeout);
-        assertFalse(backPressureState.onReceive);
-        assertFalse(backPressureState.onTimeout);
-
-        DatabaseDescriptor.setBackPressureEnabled(true);
-        messagingService.updateBackPressureOnReceive(InetAddressAndPort.getByName("127.0.0.2"), bpCallback, timeout);
-        assertTrue(backPressureState.onReceive);
-        assertFalse(backPressureState.onTimeout);
-    }
-
-    @Test
-    public void testUpdatesBackPressureOnTimeoutWhenEnabledAndWithSupportedCallback() throws UnknownHostException
-    {
-        MockBackPressureStrategy.MockBackPressureState backPressureState = (MockBackPressureStrategy.MockBackPressureState) messagingService.getBackPressureState(InetAddressAndPort.getByName("127.0.0.2"));
-        RequestCallback bpCallback = new BackPressureCallback();
-        RequestCallback noCallback = new NoBackPressureCallback();
-        boolean timeout = true;
-
-        DatabaseDescriptor.setBackPressureEnabled(true);
-        messagingService.updateBackPressureOnReceive(InetAddressAndPort.getByName("127.0.0.2"), noCallback, timeout);
-        assertFalse(backPressureState.onReceive);
-        assertFalse(backPressureState.onTimeout);
-
-        DatabaseDescriptor.setBackPressureEnabled(false);
-        messagingService.updateBackPressureOnReceive(InetAddressAndPort.getByName("127.0.0.2"), bpCallback, timeout);
-        assertFalse(backPressureState.onReceive);
-        assertFalse(backPressureState.onTimeout);
-
-        DatabaseDescriptor.setBackPressureEnabled(true);
-        messagingService.updateBackPressureOnReceive(InetAddressAndPort.getByName("127.0.0.2"), bpCallback, timeout);
-        assertFalse(backPressureState.onReceive);
-        assertTrue(backPressureState.onTimeout);
-    }
-
-    @Test
-    public void testAppliesBackPressureWhenEnabled() throws UnknownHostException
-    {
-        DatabaseDescriptor.setBackPressureEnabled(false);
-        messagingService.applyBackPressure(Arrays.asList(InetAddressAndPort.getByName("127.0.0.2")), ONE_SECOND);
-        assertFalse(MockBackPressureStrategy.applied);
-
-        DatabaseDescriptor.setBackPressureEnabled(true);
-        messagingService.applyBackPressure(Arrays.asList(InetAddressAndPort.getByName("127.0.0.2")), ONE_SECOND);
-        assertTrue(MockBackPressureStrategy.applied);
-    }
-
-    @Test
-    public void testDoesntApplyBackPressureToBroadcastAddress() throws UnknownHostException
-    {
-        DatabaseDescriptor.setBackPressureEnabled(true);
-        messagingService.applyBackPressure(Arrays.asList(InetAddressAndPort.getByName("127.0.0.1")), ONE_SECOND);
-        assertFalse(MockBackPressureStrategy.applied);
-    }
-
     private static void addDCLatency(long sentAt, long nowTime) throws IOException
     {
         MessagingService.instance().metrics.internodeLatencyRecorder(InetAddressAndPort.getLocalHost()).accept(nowTime - sentAt, MILLISECONDS);
     }
 
-    public static class MockBackPressureStrategy implements BackPressureStrategy<MockBackPressureStrategy.MockBackPressureState>
-    {
-        public static volatile boolean applied = false;
-
-        public MockBackPressureStrategy(Map<String, Object> args)
-        {
-        }
-
-        @Override
-        public void apply(Set<MockBackPressureState> states, long timeout, TimeUnit unit)
-        {
-            if (!Iterables.isEmpty(states))
-                applied = true;
-        }
-
-        @Override
-        public MockBackPressureState newState(InetAddressAndPort host)
-        {
-            return new MockBackPressureState(host);
-        }
-
-        public static class MockBackPressureState implements BackPressureState
-        {
-            private final InetAddressAndPort host;
-            public volatile boolean onSend = false;
-            public volatile boolean onReceive = false;
-            public volatile boolean onTimeout = false;
-
-            private MockBackPressureState(InetAddressAndPort host)
-            {
-                this.host = host;
-            }
-
-            @Override
-            public void onMessageSent(Message<?> message)
-            {
-                onSend = true;
-            }
-
-            @Override
-            public void onResponseReceived()
-            {
-                onReceive = true;
-            }
-
-            @Override
-            public void onResponseTimeout()
-            {
-                onTimeout = true;
-            }
-
-            @Override
-            public double getBackPressureRateLimit()
-            {
-                throw new UnsupportedOperationException("Not supported yet.");
-            }
-
-            @Override
-            public InetAddressAndPort getHost()
-            {
-                return host;
-            }
-        }
-    }
-
-    private static class BackPressureCallback implements RequestCallback
-    {
-        @Override
-        public boolean supportsBackPressure()
-        {
-            return true;
-        }
-
-        @Override
-        public void onResponse(Message msg)
-        {
-            throw new UnsupportedOperationException("Not supported.");
-        }
-    }
-
-    private static class NoBackPressureCallback implements RequestCallback
-    {
-        @Override
-        public boolean supportsBackPressure()
-        {
-            return false;
-        }
-
-        @Override
-        public void onResponse(Message msg)
-        {
-            throw new UnsupportedOperationException("Not supported.");
-        }
-    }
-
     /**
      * Make sure that if internode authenticatino fails for an outbound connection that all the code that relies
      * on getting the connection pool handles the null return
diff --git a/test/unit/org/apache/cassandra/net/OutboundConnectionsTest.java b/test/unit/org/apache/cassandra/net/OutboundConnectionsTest.java
index 82543e1..32faea3 100644
--- a/test/unit/org/apache/cassandra/net/OutboundConnectionsTest.java
+++ b/test/unit/org/apache/cassandra/net/OutboundConnectionsTest.java
@@ -39,13 +39,6 @@
 import org.apache.cassandra.io.util.DataInputPlus;
 import org.apache.cassandra.io.util.DataOutputPlus;
 import org.apache.cassandra.locator.InetAddressAndPort;
-import org.apache.cassandra.net.BackPressureState;
-import org.apache.cassandra.net.ConnectionType;
-import org.apache.cassandra.net.Message;
-import org.apache.cassandra.net.OutboundConnectionSettings;
-import org.apache.cassandra.net.OutboundConnections;
-import org.apache.cassandra.net.PingRequest;
-import org.apache.cassandra.net.Verb;
 
 public class OutboundConnectionsTest
 {
@@ -66,8 +59,7 @@
     @Before
     public void setup()
     {
-        BackPressureState backPressureState = DatabaseDescriptor.getBackPressureStrategy().newState(REMOTE_ADDR);
-        connections = OutboundConnections.unsafeCreate(new OutboundConnectionSettings(REMOTE_ADDR), backPressureState);
+        connections = OutboundConnections.unsafeCreate(new OutboundConnectionSettings(REMOTE_ADDR));
     }
 
     @After
diff --git a/test/unit/org/apache/cassandra/net/RateBasedBackPressureTest.java b/test/unit/org/apache/cassandra/net/RateBasedBackPressureTest.java
deleted file mode 100644
index 4d1ae01..0000000
--- a/test/unit/org/apache/cassandra/net/RateBasedBackPressureTest.java
+++ /dev/null
@@ -1,408 +0,0 @@
-/*
-* Licensed to the Apache Software Foundation (ASF) under one
-* or more contributor license agreements.  See the NOTICE file
-* distributed with this work for additional information
-* regarding copyright ownership.  The ASF licenses this file
-* to you under the Apache License, Version 2.0 (the
-* "License"); you may not use this file except in compliance
-* with the License.  You may obtain a copy of the License at
-*
-*    http://www.apache.org/licenses/LICENSE-2.0
-*
-* Unless required by applicable law or agreed to in writing,
-* software distributed under the License is distributed on an
-* "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
-* KIND, either express or implied.  See the License for the
-* specific language governing permissions and limitations
-* under the License.
-*/
-package org.apache.cassandra.net;
-
-import java.util.Map;
-import java.util.Set;
-import java.util.concurrent.TimeUnit;
-
-import com.google.common.collect.ImmutableMap;
-import com.google.common.collect.Sets;
-import com.google.common.util.concurrent.RateLimiter;
-
-import org.junit.Test;
-
-import org.apache.cassandra.locator.InetAddressAndPort;
-import org.apache.cassandra.utils.TestTimeSource;
-import org.apache.cassandra.utils.TimeSource;
-
-import static org.apache.cassandra.net.RateBasedBackPressure.FACTOR;
-import static org.apache.cassandra.net.RateBasedBackPressure.FLOW;
-import static org.apache.cassandra.net.RateBasedBackPressure.HIGH_RATIO;
-import static org.junit.Assert.assertEquals;
-import static org.junit.Assert.assertFalse;
-import static org.junit.Assert.assertTrue;
-import static org.junit.Assert.fail;
-
-public class RateBasedBackPressureTest
-{
-    @Test(expected = IllegalArgumentException.class)
-    public void testAcceptsNoLessThanThreeArguments() throws Exception
-    {
-        new RateBasedBackPressure(ImmutableMap.of(HIGH_RATIO, "1"), new TestTimeSource(), 10);
-    }
-
-    @Test(expected = IllegalArgumentException.class)
-    public void testHighRatioMustBeBiggerThanZero() throws Exception
-    {
-        new RateBasedBackPressure(ImmutableMap.of(HIGH_RATIO, "0", FACTOR, "2", FLOW, "FAST"), new TestTimeSource(), 10);
-    }
-
-    @Test(expected = IllegalArgumentException.class)
-    public void testHighRatioMustBeSmallerEqualThanOne() throws Exception
-    {
-        new RateBasedBackPressure(ImmutableMap.of(HIGH_RATIO, "2", FACTOR, "2", FLOW, "FAST"), new TestTimeSource(), 10);
-    }
-
-    @Test(expected = IllegalArgumentException.class)
-    public void testFactorMustBeBiggerEqualThanOne() throws Exception
-    {
-        new RateBasedBackPressure(ImmutableMap.of(HIGH_RATIO, "0.9", FACTOR, "0", FLOW, "FAST"), new TestTimeSource(), 10);
-    }
-
-    @Test(expected = IllegalArgumentException.class)
-    public void testWindowSizeMustBeBiggerEqualThanTen() throws Exception
-    {
-        new RateBasedBackPressure(ImmutableMap.of(HIGH_RATIO, "0.9", FACTOR, "5", FLOW, "FAST"), new TestTimeSource(), 1);
-    }
-
-    @Test
-    public void testFlowMustBeEitherFASTorSLOW() throws Exception
-    {
-        new RateBasedBackPressure(ImmutableMap.of(HIGH_RATIO, "0.9", FACTOR, "1", FLOW, "FAST"), new TestTimeSource(), 10);
-        new RateBasedBackPressure(ImmutableMap.of(HIGH_RATIO, "0.9", FACTOR, "1", FLOW, "SLOW"), new TestTimeSource(), 10);
-        try
-        {
-            new RateBasedBackPressure(ImmutableMap.of(HIGH_RATIO, "0.9", FACTOR, "1", FLOW, "WRONG"), new TestTimeSource(), 10);
-            fail("Expected to fail with wrong flow type.");
-        }
-        catch (Exception ex)
-        {
-        }
-    }
-
-    @Test
-    public void testBackPressureStateUpdates()
-    {
-        long windowSize = 6000;
-        TestTimeSource timeSource = new TestTimeSource();
-        RateBasedBackPressure strategy = new RateBasedBackPressure(ImmutableMap.of(HIGH_RATIO, "0.9", FACTOR, "10", FLOW, "FAST"), timeSource, windowSize);
-
-        RateBasedBackPressureState state = strategy.newState(InetAddressAndPort.getLoopbackAddress());
-        state.onMessageSent(null);
-        assertEquals(0, state.incomingRate.size());
-        assertEquals(0, state.outgoingRate.size());
-
-        state = strategy.newState(InetAddressAndPort.getLoopbackAddress());
-        state.onResponseReceived();
-        assertEquals(1, state.incomingRate.size());
-        assertEquals(1, state.outgoingRate.size());
-
-        state = strategy.newState(InetAddressAndPort.getLoopbackAddress());
-        state.onResponseTimeout();
-        assertEquals(0, state.incomingRate.size());
-        assertEquals(1, state.outgoingRate.size());
-    }
-
-    @Test
-    public void testBackPressureIsNotUpdatedBeyondInfinity() throws Exception
-    {
-        long windowSize = 6000;
-        TestTimeSource timeSource = new TestTimeSource();
-        RateBasedBackPressure strategy = new RateBasedBackPressure(ImmutableMap.of(HIGH_RATIO, "0.9", FACTOR, "10", FLOW, "FAST"), timeSource, windowSize);
-        RateBasedBackPressureState state = strategy.newState(InetAddressAndPort.getLoopbackAddress());
-
-        // Get initial rate:
-        double initialRate = state.rateLimiter.getRate();
-        assertEquals(Double.POSITIVE_INFINITY, initialRate, 0.0);
-
-        // Update incoming and outgoing rate equally:
-        state.incomingRate.update(1);
-        state.outgoingRate.update(1);
-
-        // Move time ahead:
-        timeSource.sleep(windowSize, TimeUnit.MILLISECONDS);
-
-        // Verify the rate doesn't change because already at infinity:
-        strategy.apply(Sets.newHashSet(state), 1, TimeUnit.SECONDS);
-        assertEquals(initialRate, state.rateLimiter.getRate(), 0.0);
-    }
-
-    @Test
-    public void testBackPressureIsUpdatedOncePerWindowSize() throws Exception
-    {
-        long windowSize = 6000;
-        TestTimeSource timeSource = new TestTimeSource();
-        RateBasedBackPressure strategy = new RateBasedBackPressure(ImmutableMap.of(HIGH_RATIO, "0.9", FACTOR, "10", FLOW, "FAST"), timeSource, windowSize);
-        RateBasedBackPressureState state = strategy.newState(InetAddressAndPort.getLoopbackAddress());
-
-        // Get initial time:
-        long current = state.getLastIntervalAcquire();
-        assertEquals(0, current);
-
-        // Update incoming and outgoing rate:
-        state.incomingRate.update(1);
-        state.outgoingRate.update(1);
-
-        // Move time ahead by window size:
-        timeSource.sleep(windowSize, TimeUnit.MILLISECONDS);
-
-        // Verify the timestamp changed:
-        strategy.apply(Sets.newHashSet(state), 1, TimeUnit.SECONDS);
-        current = state.getLastIntervalAcquire();
-        assertEquals(timeSource.currentTimeMillis(), current);
-
-        // Move time ahead by less than interval:
-        long previous = current;
-        timeSource.sleep(windowSize / 2, TimeUnit.MILLISECONDS);
-
-        // Verify the last timestamp didn't change because below the window size:
-        strategy.apply(Sets.newHashSet(state), 1, TimeUnit.SECONDS);
-        current = state.getLastIntervalAcquire();
-        assertEquals(previous, current);
-    }
-
-    @Test
-    public void testBackPressureWhenBelowHighRatio() throws Exception
-    {
-        long windowSize = 6000;
-        TestTimeSource timeSource = new TestTimeSource();
-        RateBasedBackPressure strategy = new RateBasedBackPressure(ImmutableMap.of(HIGH_RATIO, "0.9", FACTOR, "10", FLOW, "FAST"), timeSource, windowSize);
-        RateBasedBackPressureState state = strategy.newState(InetAddressAndPort.getLoopbackAddress());
-
-        // Update incoming and outgoing rate so that the ratio is 0.5:
-        state.incomingRate.update(50);
-        state.outgoingRate.update(100);
-
-        // Move time ahead:
-        timeSource.sleep(windowSize, TimeUnit.MILLISECONDS);
-
-        // Verify the rate is decreased by factor:
-        strategy.apply(Sets.newHashSet(state), 1, TimeUnit.SECONDS);
-        assertEquals(7.4, state.rateLimiter.getRate(), 0.1);
-    }
-
-    @Test
-    public void testBackPressureRateLimiterIsIncreasedAfterGoingAgainAboveHighRatio() throws Exception
-    {
-        long windowSize = 6000;
-        TestTimeSource timeSource = new TestTimeSource();
-        RateBasedBackPressure strategy = new RateBasedBackPressure(ImmutableMap.of(HIGH_RATIO, "0.9", FACTOR, "10", FLOW, "FAST"), timeSource, windowSize);
-        RateBasedBackPressureState state = strategy.newState(InetAddressAndPort.getLoopbackAddress());
-
-        // Update incoming and outgoing rate so that the ratio is 0.5:
-        state.incomingRate.update(50);
-        state.outgoingRate.update(100);
-
-        // Move time ahead:
-        timeSource.sleep(windowSize, TimeUnit.MILLISECONDS);
-
-        // Verify the rate decreased:
-        strategy.apply(Sets.newHashSet(state), 1, TimeUnit.SECONDS);
-        assertEquals(7.4, state.rateLimiter.getRate(), 0.1);
-
-        // Update incoming and outgoing rate back above high rate:
-        state.incomingRate.update(50);
-        state.outgoingRate.update(50);
-
-        // Move time ahead:
-        timeSource.sleep(windowSize, TimeUnit.MILLISECONDS);
-
-        // Verify rate limiter is increased by factor:
-        strategy.apply(Sets.newHashSet(state), 1, TimeUnit.SECONDS);
-        assertEquals(8.25, state.rateLimiter.getRate(), 0.1);
-
-        // Update incoming and outgoing rate to keep it below the limiter rate:
-        state.incomingRate.update(1);
-        state.outgoingRate.update(1);
-
-        // Move time ahead:
-        timeSource.sleep(windowSize, TimeUnit.MILLISECONDS);
-
-        // Verify rate limiter is not increased as already higher than the actual rate:
-        strategy.apply(Sets.newHashSet(state), 1, TimeUnit.SECONDS);
-        assertEquals(8.25, state.rateLimiter.getRate(), 0.1);
-    }
-
-    @Test
-    public void testBackPressureFastFlow() throws Exception
-    {
-        long windowSize = 6000;
-        TestTimeSource timeSource = new TestTimeSource();
-        TestableBackPressure strategy = new TestableBackPressure(ImmutableMap.of(HIGH_RATIO, "0.9", FACTOR, "10", FLOW, "FAST"), timeSource, windowSize);
-        RateBasedBackPressureState state1 = strategy.newState(InetAddressAndPort.getByName("127.0.0.1"));
-        RateBasedBackPressureState state2 = strategy.newState(InetAddressAndPort.getByName("127.0.0.2"));
-        RateBasedBackPressureState state3 = strategy.newState(InetAddressAndPort.getByName("127.0.0.3"));
-
-        // Update incoming and outgoing rates:
-        state1.incomingRate.update(50);
-        state1.outgoingRate.update(100);
-        state2.incomingRate.update(80); // fast
-        state2.outgoingRate.update(100);
-        state3.incomingRate.update(20);
-        state3.outgoingRate.update(100);
-
-        // Move time ahead:
-        timeSource.sleep(windowSize, TimeUnit.MILLISECONDS);
-
-        // Verify the fast replica rate limiting has been applied:
-        Set<RateBasedBackPressureState> replicaGroup = Sets.newHashSet(state1, state2, state3);
-        strategy.apply(replicaGroup, 1, TimeUnit.SECONDS);
-        assertTrue(strategy.checkAcquired());
-        assertTrue(strategy.checkApplied());
-        assertEquals(12.0, strategy.getRateLimiterForReplicaGroup(replicaGroup).getRate(), 0.1);
-    }
-
-    @Test
-    public void testBackPressureSlowFlow() throws Exception
-    {
-        long windowSize = 6000;
-        TestTimeSource timeSource = new TestTimeSource();
-        TestableBackPressure strategy = new TestableBackPressure(ImmutableMap.of(HIGH_RATIO, "0.9", FACTOR, "10", FLOW, "SLOW"), timeSource, windowSize);
-        RateBasedBackPressureState state1 = strategy.newState(InetAddressAndPort.getByName("127.0.0.1"));
-        RateBasedBackPressureState state2 = strategy.newState(InetAddressAndPort.getByName("127.0.0.2"));
-        RateBasedBackPressureState state3 = strategy.newState(InetAddressAndPort.getByName("127.0.0.3"));
-
-        // Update incoming and outgoing rates:
-        state1.incomingRate.update(50);
-        state1.outgoingRate.update(100);
-        state2.incomingRate.update(100);
-        state2.outgoingRate.update(100);
-        state3.incomingRate.update(20); // slow
-        state3.outgoingRate.update(100);
-
-        // Move time ahead:
-        timeSource.sleep(windowSize, TimeUnit.MILLISECONDS);
-
-        // Verify the slow replica rate limiting has been applied:
-        Set<RateBasedBackPressureState> replicaGroup = Sets.newHashSet(state1, state2, state3);
-        strategy.apply(replicaGroup, 1, TimeUnit.SECONDS);
-        assertTrue(strategy.checkAcquired());
-        assertTrue(strategy.checkApplied());
-        assertEquals(3.0, strategy.getRateLimiterForReplicaGroup(replicaGroup).getRate(), 0.1);
-    }
-
-    @Test
-    public void testBackPressureWithDifferentGroups() throws Exception
-    {
-        long windowSize = 6000;
-        TestTimeSource timeSource = new TestTimeSource();
-        TestableBackPressure strategy = new TestableBackPressure(ImmutableMap.of(HIGH_RATIO, "0.9", FACTOR, "10", FLOW, "SLOW"), timeSource, windowSize);
-        RateBasedBackPressureState state1 = strategy.newState(InetAddressAndPort.getByName("127.0.0.1"));
-        RateBasedBackPressureState state2 = strategy.newState(InetAddressAndPort.getByName("127.0.0.2"));
-        RateBasedBackPressureState state3 = strategy.newState(InetAddressAndPort.getByName("127.0.0.3"));
-        RateBasedBackPressureState state4 = strategy.newState(InetAddressAndPort.getByName("127.0.0.4"));
-
-        // Update incoming and outgoing rates:
-        state1.incomingRate.update(50); // this
-        state1.outgoingRate.update(100);
-        state2.incomingRate.update(100);
-        state2.outgoingRate.update(100);
-        state3.incomingRate.update(20); // this
-        state3.outgoingRate.update(100);
-        state4.incomingRate.update(80);
-        state4.outgoingRate.update(100);
-
-        // Move time ahead:
-        timeSource.sleep(windowSize, TimeUnit.MILLISECONDS);
-
-        // Verify the first group:
-        Set<RateBasedBackPressureState> replicaGroup = Sets.newHashSet(state1, state2);
-        strategy.apply(replicaGroup, 1, TimeUnit.SECONDS);
-        assertTrue(strategy.checkAcquired());
-        assertTrue(strategy.checkApplied());
-        assertEquals(7.4, strategy.getRateLimiterForReplicaGroup(replicaGroup).getRate(), 0.1);
-
-        // Verify the second group:
-        replicaGroup = Sets.newHashSet(state3, state4);
-        strategy.apply(replicaGroup, 1, TimeUnit.SECONDS);
-        assertTrue(strategy.checkAcquired());
-        assertTrue(strategy.checkApplied());
-        assertEquals(3.0, strategy.getRateLimiterForReplicaGroup(replicaGroup).getRate(), 0.1);
-    }
-
-    @Test
-    public void testBackPressurePastTimeout() throws Exception
-    {
-        long windowSize = 10000;
-        TestTimeSource timeSource = new TestTimeSource();
-        TestableBackPressure strategy = new TestableBackPressure(ImmutableMap.of(HIGH_RATIO, "0.9", FACTOR, "10", FLOW, "SLOW"), timeSource, windowSize);
-        RateBasedBackPressureState state1 = strategy.newState(InetAddressAndPort.getByName("127.0.0.1"));
-        RateBasedBackPressureState state2 = strategy.newState(InetAddressAndPort.getByName("127.0.0.2"));
-        RateBasedBackPressureState state3 = strategy.newState(InetAddressAndPort.getByName("127.0.0.3"));
-
-        // Update incoming and outgoing rates:
-        state1.incomingRate.update(5); // slow
-        state1.outgoingRate.update(100);
-        state2.incomingRate.update(100);
-        state2.outgoingRate.update(100);
-        state3.incomingRate.update(100);
-        state3.outgoingRate.update(100);
-
-        // Move time ahead:
-        timeSource.sleep(windowSize, TimeUnit.MILLISECONDS);
-
-        // Verify the slow replica rate limiting has been applied:
-        Set<RateBasedBackPressureState> replicaGroup = Sets.newHashSet(state1, state2, state3);
-        strategy.apply(replicaGroup, 4, TimeUnit.SECONDS);
-        assertTrue(strategy.checkAcquired());
-        assertTrue(strategy.checkApplied());
-        assertEquals(0.5, strategy.getRateLimiterForReplicaGroup(replicaGroup).getRate(), 0.1);
-
-        // Make one more apply call to saturate the rate limit timeout (0.5 requests per second means 2 requests span
-        // 4 seconds, but we can only make one as we have to subtract the incoming response time):
-        strategy.apply(replicaGroup, 4, TimeUnit.SECONDS);
-
-        // Now verify another call to apply doesn't acquire the rate limit because of the max timeout of 4 seconds minus
-        // 2 seconds of response time, so the time source itself sleeps two second:
-        long start = timeSource.currentTimeMillis();
-        strategy.apply(replicaGroup, 4, TimeUnit.SECONDS);
-        assertFalse(strategy.checkAcquired());
-        assertTrue(strategy.checkApplied());
-        assertEquals(TimeUnit.NANOSECONDS.convert(2, TimeUnit.SECONDS),
-                     strategy.timeout);
-        assertEquals(strategy.timeout,
-                     TimeUnit.NANOSECONDS.convert(timeSource.currentTimeMillis() - start, TimeUnit.MILLISECONDS));
-    }
-
-    public static class TestableBackPressure extends RateBasedBackPressure
-    {
-        public volatile boolean acquired = false;
-        public volatile boolean applied = false;
-        public volatile long timeout;
-
-        public TestableBackPressure(Map<String, Object> args, TimeSource timeSource, long windowSize)
-        {
-            super(args, timeSource, windowSize);
-        }
-
-        @Override
-        public boolean doRateLimit(RateLimiter rateLimiter, long timeoutInNanos)
-        {
-            acquired = super.doRateLimit(rateLimiter, timeoutInNanos);
-            applied = true;
-            timeout = timeoutInNanos;
-            return acquired;
-        }
-
-        public boolean checkAcquired()
-        {
-            boolean checked = acquired;
-            acquired = false;
-            return checked;
-        }
-
-        public boolean checkApplied()
-        {
-            boolean checked = applied;
-            applied = false;
-            return checked;
-        }
-    }
-}