Remove BackPressureStrategy
patch by Benedict; reviewed by Sergio Bossa and Robert Stupp for CASSANDRA-15375
diff --git a/conf/cassandra.yaml b/conf/cassandra.yaml
index 4bd72cf..573c936 100644
--- a/conf/cassandra.yaml
+++ b/conf/cassandra.yaml
@@ -1253,28 +1253,6 @@
# as corrupted. This should be positive and less than 2048.
# max_value_size_in_mb: 256
-# Back-pressure settings #
-# If enabled, the coordinator will apply the back-pressure strategy specified below to each mutation
-# sent to replicas, with the aim of reducing pressure on overloaded replicas.
-back_pressure_enabled: false
-# The back-pressure strategy applied.
-# The default implementation, RateBasedBackPressure, takes three arguments:
-# high ratio, factor, and flow type, and uses the ratio between incoming mutation responses and outgoing mutation requests.
-# If below high ratio, outgoing mutations are rate limited according to the incoming rate decreased by the given factor;
-# if above high ratio, the rate limiting is increased by the given factor;
-# such factor is usually best configured between 1 and 10, use larger values for a faster recovery
-# at the expense of potentially more dropped mutations;
-# the rate limiting is applied according to the flow type: if FAST, it's rate limited at the speed of the fastest replica,
-# if SLOW at the speed of the slowest one.
-# New strategies can be added. Implementors need to implement org.apache.cassandra.net.BackpressureStrategy and
-# provide a public constructor accepting a Map<String, Object>.
-back_pressure_strategy:
- - class_name: org.apache.cassandra.net.RateBasedBackPressure
- parameters:
- - high_ratio: 0.90
- factor: 5
- flow: FAST
-
# Coalescing Strategies #
# Coalescing multiples messages turns out to significantly boost message processing throughput (think doubling or more).
# On bare metal, the floor for packet processing throughput is high enough that many applications won't notice, but in
diff --git a/src/java/org/apache/cassandra/config/Config.java b/src/java/org/apache/cassandra/config/Config.java
index 63cfff4..c8af291 100644
--- a/src/java/org/apache/cassandra/config/Config.java
+++ b/src/java/org/apache/cassandra/config/Config.java
@@ -412,7 +412,9 @@
*/
public UserFunctionTimeoutPolicy user_function_timeout_policy = UserFunctionTimeoutPolicy.die;
+ @Deprecated
public volatile boolean back_pressure_enabled = false;
+ @Deprecated
public volatile ParameterizedClass back_pressure_strategy;
public volatile int concurrent_validations;
diff --git a/src/java/org/apache/cassandra/config/DatabaseDescriptor.java b/src/java/org/apache/cassandra/config/DatabaseDescriptor.java
index 5f94710..a7559f1 100644
--- a/src/java/org/apache/cassandra/config/DatabaseDescriptor.java
+++ b/src/java/org/apache/cassandra/config/DatabaseDescriptor.java
@@ -68,8 +68,6 @@
import org.apache.cassandra.locator.InetAddressAndPort;
import org.apache.cassandra.locator.Replica;
import org.apache.cassandra.locator.SeedProvider;
-import org.apache.cassandra.net.BackPressureStrategy;
-import org.apache.cassandra.net.RateBasedBackPressure;
import org.apache.cassandra.security.EncryptionContext;
import org.apache.cassandra.security.SSLFactory;
import org.apache.cassandra.service.CacheService.CacheType;
@@ -137,7 +135,6 @@
private static EncryptionContext encryptionContext;
private static boolean hasLoggedConfig;
- private static BackPressureStrategy backPressureStrategy;
private static DiskOptimizationStrategy diskOptimizationStrategy;
private static boolean clientInitialized;
@@ -783,27 +780,6 @@
break;
}
- try
- {
- ParameterizedClass strategy = conf.back_pressure_strategy != null ? conf.back_pressure_strategy : RateBasedBackPressure.withDefaultParams();
- Class<?> clazz = Class.forName(strategy.class_name);
- if (!BackPressureStrategy.class.isAssignableFrom(clazz))
- throw new ConfigurationException(strategy + " is not an instance of " + BackPressureStrategy.class.getCanonicalName(), false);
-
- Constructor<?> ctor = clazz.getConstructor(Map.class);
- BackPressureStrategy instance = (BackPressureStrategy) ctor.newInstance(strategy.parameters);
- logger.info("Back-pressure is {} with strategy {}.", backPressureEnabled() ? "enabled" : "disabled", conf.back_pressure_strategy);
- backPressureStrategy = instance;
- }
- catch (ConfigurationException ex)
- {
- throw ex;
- }
- catch (Exception ex)
- {
- throw new ConfigurationException("Error configuring back-pressure strategy: " + conf.back_pressure_strategy, ex);
- }
-
if (conf.otc_coalescing_enough_coalesced_messages > 128)
throw new ConfigurationException("otc_coalescing_enough_coalesced_messages must be smaller than 128", false);
@@ -2872,16 +2848,6 @@
return unsafeSystem;
}
- public static void setBackPressureEnabled(boolean backPressureEnabled)
- {
- conf.back_pressure_enabled = backPressureEnabled;
- }
-
- public static boolean backPressureEnabled()
- {
- return conf.back_pressure_enabled;
- }
-
public static boolean diagnosticEventsEnabled()
{
return conf.diagnostic_events_enabled;
@@ -2892,17 +2858,6 @@
conf.diagnostic_events_enabled = enabled;
}
- @VisibleForTesting
- public static void setBackPressureStrategy(BackPressureStrategy strategy)
- {
- backPressureStrategy = strategy;
- }
-
- public static BackPressureStrategy getBackPressureStrategy()
- {
- return backPressureStrategy;
- }
-
public static ConsistencyLevel getIdealConsistencyLevel()
{
return conf.ideal_consistency_level;
diff --git a/src/java/org/apache/cassandra/hints/HintsDispatcher.java b/src/java/org/apache/cassandra/hints/HintsDispatcher.java
index 39e4b25..743b275 100644
--- a/src/java/org/apache/cassandra/hints/HintsDispatcher.java
+++ b/src/java/org/apache/cassandra/hints/HintsDispatcher.java
@@ -255,11 +255,5 @@
outcome = Outcome.SUCCESS;
condition.signalAll();
}
-
- @Override
- public boolean supportsBackPressure()
- {
- return true;
- }
}
}
diff --git a/src/java/org/apache/cassandra/net/BackPressureState.java b/src/java/org/apache/cassandra/net/BackPressureState.java
deleted file mode 100644
index de19bf3..0000000
--- a/src/java/org/apache/cassandra/net/BackPressureState.java
+++ /dev/null
@@ -1,51 +0,0 @@
-/*
- * Licensed to the Apache Software Foundation (ASF) under one
- * or more contributor license agreements. See the NOTICE file
- * distributed with this work for additional information
- * regarding copyright ownership. The ASF licenses this file
- * to you under the Apache License, Version 2.0 (the
- * "License"); you may not use this file except in compliance
- * with the License. You may obtain a copy of the License at
- *
- * http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-package org.apache.cassandra.net;
-
-import org.apache.cassandra.locator.InetAddressAndPort;
-
-/**
- * Interface meant to track the back-pressure state per replica host.
- */
-public interface BackPressureState
-{
- /**
- * Called when a message is sent to a replica.
- */
- void onMessageSent(Message<?> message);
-
- /**
- * Called when a response is received from a replica.
- */
- void onResponseReceived();
-
- /**
- * Called when no response is received from replica.
- */
- void onResponseTimeout();
-
- /**
- * Gets the current back-pressure rate limit.
- */
- double getBackPressureRateLimit();
-
- /**
- * Returns the host this state refers to.
- */
- InetAddressAndPort getHost();
-}
diff --git a/src/java/org/apache/cassandra/net/BackPressureStrategy.java b/src/java/org/apache/cassandra/net/BackPressureStrategy.java
deleted file mode 100644
index 6b49495..0000000
--- a/src/java/org/apache/cassandra/net/BackPressureStrategy.java
+++ /dev/null
@@ -1,44 +0,0 @@
-/*
- * Licensed to the Apache Software Foundation (ASF) under one
- * or more contributor license agreements. See the NOTICE file
- * distributed with this work for additional information
- * regarding copyright ownership. The ASF licenses this file
- * to you under the Apache License, Version 2.0 (the
- * "License"); you may not use this file except in compliance
- * with the License. You may obtain a copy of the License at
- *
- * http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-package org.apache.cassandra.net;
-
-import java.util.Set;
-import java.util.concurrent.TimeUnit;
-
-import org.apache.cassandra.locator.InetAddressAndPort;
-
-/**
- * Back-pressure algorithm interface.
- * <p>
- * For experts usage only. Implementors must provide a constructor accepting a single {@code Map<String, Object>} argument,
- * representing any parameters eventually required by the specific implementation.
- * </p>
- */
-public interface BackPressureStrategy<S extends BackPressureState>
-{
- /**
- * Applies the back-pressure algorithm, based and acting on the given {@link BackPressureState}s, and up to the given
- * timeout.
- */
- void apply(Set<S> states, long timeout, TimeUnit unit);
-
- /**
- * Creates a new {@link BackPressureState} initialized as needed by the specific implementation.
- */
- S newState(InetAddressAndPort host);
-}
diff --git a/src/java/org/apache/cassandra/net/MessagingService.java b/src/java/org/apache/cassandra/net/MessagingService.java
index beec083..0827f78 100644
--- a/src/java/org/apache/cassandra/net/MessagingService.java
+++ b/src/java/org/apache/cassandra/net/MessagingService.java
@@ -242,9 +242,6 @@
final ResourceLimits.Limit outboundGlobalReserveLimit =
new ResourceLimits.Concurrent(DatabaseDescriptor.getInternodeApplicationSendQueueReserveGlobalCapacityInBytes());
- // back-pressure implementation
- private final BackPressureStrategy backPressure = DatabaseDescriptor.getBackPressureStrategy();
-
private volatile boolean isShuttingDown;
@VisibleForTesting
@@ -271,7 +268,6 @@
public void sendWithCallback(Message message, InetAddressAndPort to, RequestCallback cb, ConnectionType specifyConnection)
{
callbacks.addWithExpiration(cb, message, to);
- updateBackPressureOnSend(to, cb, message);
if (cb.invokeOnFailure() && !message.callBackOnFailure())
message = message.withCallBackOnFailure();
send(message, to, specifyConnection);
@@ -292,7 +288,6 @@
{
assert message.callBackOnFailure();
callbacks.addWithExpiration(handler, message, to, handler.consistencyLevel(), allowHints);
- updateBackPressureOnSend(to.endpoint(), handler, message);
send(message, to.endpoint(), null);
}
@@ -343,74 +338,6 @@
}
}
- /**
- * Updates the back-pressure state on sending to the given host if enabled and the given message callback supports it.
- *
- * @param host The replica host the back-pressure state refers to.
- * @param callback The message callback.
- * @param message The actual message.
- */
- void updateBackPressureOnSend(InetAddressAndPort host, RequestCallback callback, Message<?> message)
- {
- if (DatabaseDescriptor.backPressureEnabled() && callback.supportsBackPressure())
- {
- BackPressureState backPressureState = getBackPressureState(host);
- if (backPressureState != null)
- backPressureState.onMessageSent(message);
- }
- }
-
- /**
- * Updates the back-pressure state on reception from the given host if enabled and the given message callback supports it.
- *
- * @param host The replica host the back-pressure state refers to.
- * @param callback The message callback.
- * @param timeout True if updated following a timeout, false otherwise.
- */
- void updateBackPressureOnReceive(InetAddressAndPort host, RequestCallback callback, boolean timeout)
- {
- if (DatabaseDescriptor.backPressureEnabled() && callback.supportsBackPressure())
- {
- BackPressureState backPressureState = getBackPressureState(host);
- if (backPressureState == null)
- return;
- if (!timeout)
- backPressureState.onResponseReceived();
- else
- backPressureState.onResponseTimeout();
- }
- }
-
- /**
- * Applies back-pressure for the given hosts, according to the configured strategy.
- *
- * If the local host is present, it is removed from the pool, as back-pressure is only applied
- * to remote hosts.
- *
- * @param hosts The hosts to apply back-pressure to.
- * @param timeoutInNanos The max back-pressure timeout.
- */
- public void applyBackPressure(Iterable<InetAddressAndPort> hosts, long timeoutInNanos)
- {
- if (DatabaseDescriptor.backPressureEnabled())
- {
- Set<BackPressureState> states = new HashSet<>();
- for (InetAddressAndPort host : hosts)
- {
- if (host.equals(FBUtilities.getBroadcastAddressAndPort()))
- continue;
- states.add(getOutbound(host).getBackPressureState());
- }
- //noinspection unchecked
- backPressure.apply(states, timeoutInNanos, NANOSECONDS);
- }
- }
-
- BackPressureState getBackPressureState(InetAddressAndPort host)
- {
- return getOutbound(host).getBackPressureState();
- }
-
void markExpiredCallback(InetAddressAndPort addr)
{
OutboundConnections conn = channelManagers.get(addr);
@@ -552,7 +479,7 @@
{
OutboundConnections connections = channelManagers.get(to);
if (connections == null)
- connections = OutboundConnections.tryRegister(channelManagers, to, new OutboundConnectionSettings(to).withDefaults(ConnectionCategory.MESSAGING), backPressure.newState(to));
+ connections = OutboundConnections.tryRegister(channelManagers, to, new OutboundConnectionSettings(to).withDefaults(ConnectionCategory.MESSAGING));
return connections;
}
diff --git a/src/java/org/apache/cassandra/net/MessagingServiceMBean.java b/src/java/org/apache/cassandra/net/MessagingServiceMBean.java
index 732a5ed..886459e 100644
--- a/src/java/org/apache/cassandra/net/MessagingServiceMBean.java
+++ b/src/java/org/apache/cassandra/net/MessagingServiceMBean.java
@@ -117,16 +117,17 @@
*/
@Deprecated
public Map<String, Double> getBackPressurePerHost();
- public Map<String, Double> getBackPressurePerHostWithPort();
/**
* Enable/Disable back-pressure
*/
+ @Deprecated
public void setBackPressureEnabled(boolean enabled);
/**
* Get back-pressure enabled state
*/
+ @Deprecated
public boolean isBackPressureEnabled();
public int getVersion(String address) throws UnknownHostException;
diff --git a/src/java/org/apache/cassandra/net/MessagingServiceMBeanImpl.java b/src/java/org/apache/cassandra/net/MessagingServiceMBeanImpl.java
index b48ae1c..bea2b8c 100644
--- a/src/java/org/apache/cassandra/net/MessagingServiceMBeanImpl.java
+++ b/src/java/org/apache/cassandra/net/MessagingServiceMBeanImpl.java
@@ -19,6 +19,7 @@
import java.io.IOException;
import java.net.UnknownHostException;
+import java.util.Collections;
import java.util.HashMap;
import java.util.Map;
import java.util.concurrent.ConcurrentHashMap;
@@ -258,33 +259,19 @@
@Override
public Map<String, Double> getBackPressurePerHost()
{
- Map<String, Double> map = new HashMap<>(channelManagers.size());
- for (Map.Entry<InetAddressAndPort, OutboundConnections> entry : channelManagers.entrySet())
- map.put(entry.getKey().toString(false), entry.getValue().getBackPressureState().getBackPressureRateLimit());
-
- return map;
- }
-
- @Override
- public Map<String, Double> getBackPressurePerHostWithPort()
- {
- Map<String, Double> map = new HashMap<>(channelManagers.size());
- for (Map.Entry<InetAddressAndPort, OutboundConnections> entry : channelManagers.entrySet())
- map.put(entry.getKey().toString(false), entry.getValue().getBackPressureState().getBackPressureRateLimit());
-
- return map;
+ throw new UnsupportedOperationException("This feature has been removed");
}
@Override
public void setBackPressureEnabled(boolean enabled)
{
- DatabaseDescriptor.setBackPressureEnabled(enabled);
+ throw new UnsupportedOperationException("This feature has been removed");
}
@Override
public boolean isBackPressureEnabled()
{
- return DatabaseDescriptor.backPressureEnabled();
+ return false;
}
@Override
diff --git a/src/java/org/apache/cassandra/net/OutboundConnections.java b/src/java/org/apache/cassandra/net/OutboundConnections.java
index c900908..029d5e1 100644
--- a/src/java/org/apache/cassandra/net/OutboundConnections.java
+++ b/src/java/org/apache/cassandra/net/OutboundConnections.java
@@ -53,7 +53,6 @@
private final SimpleCondition metricsReady = new SimpleCondition();
private volatile InternodeOutboundMetrics metrics;
- private final BackPressureState backPressureState;
private final ResourceLimits.Limit reserveCapacity;
private OutboundConnectionSettings template;
@@ -61,9 +60,8 @@
public final OutboundConnection large;
public final OutboundConnection urgent;
- private OutboundConnections(OutboundConnectionSettings template, BackPressureState backPressureState)
+ private OutboundConnections(OutboundConnectionSettings template)
{
- this.backPressureState = backPressureState;
this.template = template = template.withDefaultReserveLimits();
reserveCapacity = new ResourceLimits.Concurrent(template.applicationSendQueueReserveEndpointCapacityInBytes);
ResourceLimits.EndpointAndGlobal reserveCapacityInBytes = new ResourceLimits.EndpointAndGlobal(reserveCapacity, template.applicationSendQueueReserveGlobalCapacityInBytes);
@@ -80,12 +78,12 @@
connectionFor(msg, type).enqueue(msg);
}
- static <K> OutboundConnections tryRegister(ConcurrentMap<K, OutboundConnections> in, K key, OutboundConnectionSettings settings, BackPressureState backPressureState)
+ static <K> OutboundConnections tryRegister(ConcurrentMap<K, OutboundConnections> in, K key, OutboundConnectionSettings settings)
{
OutboundConnections connections = in.get(key);
if (connections == null)
{
- connections = new OutboundConnections(settings, backPressureState);
+ connections = new OutboundConnections(settings);
OutboundConnections existing = in.putIfAbsent(key, connections);
if (existing == null)
@@ -103,11 +101,6 @@
return connections;
}
- BackPressureState getBackPressureState()
- {
- return backPressureState;
- }
-
/**
* Reconnect to the peer using the given {@code addr}. Outstanding messages in each channel will be sent on the
* current channel. Typically this function is used for something like EC2 public IP addresses which need to be used
@@ -313,9 +306,9 @@
}
@VisibleForTesting
- static OutboundConnections unsafeCreate(OutboundConnectionSettings template, BackPressureState backPressureState)
+ static OutboundConnections unsafeCreate(OutboundConnectionSettings template)
{
- OutboundConnections connections = new OutboundConnections(template, backPressureState);
+ OutboundConnections connections = new OutboundConnections(template);
connections.metricsReady.signalAll();
return connections;
}
diff --git a/src/java/org/apache/cassandra/net/RateBasedBackPressure.java b/src/java/org/apache/cassandra/net/RateBasedBackPressure.java
deleted file mode 100644
index 02d8cce..0000000
--- a/src/java/org/apache/cassandra/net/RateBasedBackPressure.java
+++ /dev/null
@@ -1,297 +0,0 @@
-/*
- * Licensed to the Apache Software Foundation (ASF) under one
- * or more contributor license agreements. See the NOTICE file
- * distributed with this work for additional information
- * regarding copyright ownership. The ASF licenses this file
- * to you under the Apache License, Version 2.0 (the
- * "License"); you may not use this file except in compliance
- * with the License. You may obtain a copy of the License at
- *
- * http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-package org.apache.cassandra.net;
-
-import java.util.Map;
-import java.util.Set;
-import java.util.concurrent.TimeUnit;
-
-import com.github.benmanes.caffeine.cache.Cache;
-import com.github.benmanes.caffeine.cache.Caffeine;
-import com.google.common.annotations.VisibleForTesting;
-import com.google.common.collect.ImmutableMap;
-import com.google.common.util.concurrent.MoreExecutors;
-import com.google.common.util.concurrent.RateLimiter;
-
-import org.slf4j.Logger;
-import org.slf4j.LoggerFactory;
-
-import org.apache.cassandra.config.DatabaseDescriptor;
-import org.apache.cassandra.config.ParameterizedClass;
-import org.apache.cassandra.locator.InetAddressAndPort;
-import org.apache.cassandra.utils.NoSpamLogger;
-import org.apache.cassandra.utils.SystemTimeSource;
-import org.apache.cassandra.utils.TimeSource;
-import org.apache.cassandra.utils.concurrent.IntervalLock;
-
-import static java.util.concurrent.TimeUnit.MILLISECONDS;
-
-/**
- * Back-pressure algorithm based on rate limiting according to the ratio between incoming and outgoing rates, computed
- * over a sliding time window with size equal to write RPC timeout.
- */
-public class RateBasedBackPressure implements BackPressureStrategy<RateBasedBackPressureState>
-{
- static final String HIGH_RATIO = "high_ratio";
- static final String FACTOR = "factor";
- static final String FLOW = "flow";
- private static final String BACK_PRESSURE_HIGH_RATIO = "0.90";
- private static final String BACK_PRESSURE_FACTOR = "5";
- private static final String BACK_PRESSURE_FLOW = "FAST";
-
- private static final Logger logger = LoggerFactory.getLogger(RateBasedBackPressure.class);
- private static final NoSpamLogger tenSecsNoSpamLogger = NoSpamLogger.getLogger(logger, 10, TimeUnit.SECONDS);
- private static final NoSpamLogger oneMinNoSpamLogger = NoSpamLogger.getLogger(logger, 1, TimeUnit.MINUTES);
-
- protected final TimeSource timeSource;
- protected final double highRatio;
- protected final int factor;
- protected final Flow flow;
- protected final long windowSize;
-
- private final Cache<Set<RateBasedBackPressureState>, IntervalRateLimiter> rateLimiters =
- Caffeine.newBuilder()
- .expireAfterAccess(1, TimeUnit.HOURS)
- .executor(MoreExecutors.directExecutor())
- .build();
-
- enum Flow
- {
- FAST,
- SLOW
- }
-
- public static ParameterizedClass withDefaultParams()
- {
- return new ParameterizedClass(RateBasedBackPressure.class.getName(),
- ImmutableMap.of(HIGH_RATIO, BACK_PRESSURE_HIGH_RATIO,
- FACTOR, BACK_PRESSURE_FACTOR,
- FLOW, BACK_PRESSURE_FLOW));
- }
-
- public RateBasedBackPressure(Map<String, Object> args)
- {
- this(args, new SystemTimeSource(), DatabaseDescriptor.getWriteRpcTimeout(MILLISECONDS));
- }
-
- @VisibleForTesting
- public RateBasedBackPressure(Map<String, Object> args, TimeSource timeSource, long windowSize)
- {
- if (args.size() != 3)
- {
- throw new IllegalArgumentException(RateBasedBackPressure.class.getCanonicalName()
- + " requires 3 arguments: high ratio, back-pressure factor and flow type.");
- }
-
- try
- {
- highRatio = Double.parseDouble(args.getOrDefault(HIGH_RATIO, "").toString().trim());
- factor = Integer.parseInt(args.getOrDefault(FACTOR, "").toString().trim());
- flow = Flow.valueOf(args.getOrDefault(FLOW, "").toString().trim().toUpperCase());
- }
- catch (Exception ex)
- {
- throw new IllegalArgumentException(ex.getMessage(), ex);
- }
-
- if (highRatio <= 0 || highRatio > 1)
- {
- throw new IllegalArgumentException("Back-pressure high ratio must be > 0 and <= 1");
- }
- if (factor < 1)
- {
- throw new IllegalArgumentException("Back-pressure factor must be >= 1");
- }
- if (windowSize < 10)
- {
- throw new IllegalArgumentException("Back-pressure window size must be >= 10");
- }
-
- this.timeSource = timeSource;
- this.windowSize = windowSize;
-
- logger.info("Initialized back-pressure with high ratio: {}, factor: {}, flow: {}, window size: {}.",
- highRatio, factor, flow, windowSize);
- }
-
- @Override
- public void apply(Set<RateBasedBackPressureState> states, long timeout, TimeUnit unit)
- {
- // Go through the back-pressure states, try updating each of them and collect min/max rates:
- boolean isUpdated = false;
- double minRateLimit = Double.POSITIVE_INFINITY;
- double maxRateLimit = Double.NEGATIVE_INFINITY;
- double minIncomingRate = Double.POSITIVE_INFINITY;
- RateLimiter currentMin = null;
- RateLimiter currentMax = null;
- for (RateBasedBackPressureState backPressure : states)
- {
- // Get the incoming/outgoing rates:
- double incomingRate = backPressure.incomingRate.get(TimeUnit.SECONDS);
- double outgoingRate = backPressure.outgoingRate.get(TimeUnit.SECONDS);
- // Compute the min incoming rate:
- if (incomingRate < minIncomingRate)
- minIncomingRate = incomingRate;
-
- // Try acquiring the interval lock:
- if (backPressure.tryIntervalLock(windowSize))
- {
- // If acquired, proceed updating thi back-pressure state rate limit:
- isUpdated = true;
- try
- {
- RateLimiter limiter = backPressure.rateLimiter;
-
- // If we have sent any outgoing requests during this time window, go ahead with rate limiting
- // (this is safe against concurrent back-pressure state updates thanks to the rw-locking in
- // RateBasedBackPressureState):
- if (outgoingRate > 0)
- {
- // Compute the incoming/outgoing ratio:
- double actualRatio = incomingRate / outgoingRate;
-
- // If the ratio is above the high mark, try growing by the back-pressure factor:
- double limiterRate = limiter.getRate();
- if (actualRatio >= highRatio)
- {
- // Only if the outgoing rate is able to keep up with the rate increase:
- if (limiterRate <= outgoingRate)
- {
- double newRate = limiterRate + ((limiterRate * factor) / 100);
- if (newRate > 0 && newRate != Double.POSITIVE_INFINITY)
- {
- limiter.setRate(newRate);
- }
- }
- }
- // If below, set the rate limiter at the incoming rate, decreased by factor:
- else
- {
- // Only if the new rate is actually less than the actual rate:
- double newRate = incomingRate - ((incomingRate * factor) / 100);
- if (newRate > 0 && newRate < limiterRate)
- {
- limiter.setRate(newRate);
- }
- }
- if (logger.isTraceEnabled())
- {
- logger.trace("Back-pressure state for {}: incoming rate {}, outgoing rate {}, ratio {}, rate limiting {}",
- backPressure.getHost(), incomingRate, outgoingRate, actualRatio, limiter.getRate());
- }
- }
- // Otherwise reset the rate limiter:
- else
- {
- limiter.setRate(Double.POSITIVE_INFINITY);
- }
-
- // Housekeeping: pruning windows and resetting the last check timestamp!
- backPressure.incomingRate.prune();
- backPressure.outgoingRate.prune();
- }
- finally
- {
- backPressure.releaseIntervalLock();
- }
- }
- if (backPressure.rateLimiter.getRate() <= minRateLimit)
- {
- minRateLimit = backPressure.rateLimiter.getRate();
- currentMin = backPressure.rateLimiter;
- }
- if (backPressure.rateLimiter.getRate() >= maxRateLimit)
- {
- maxRateLimit = backPressure.rateLimiter.getRate();
- currentMax = backPressure.rateLimiter;
- }
- }
-
- // Now find the rate limiter corresponding to the replica group represented by these back-pressure states:
- if (!states.isEmpty())
- {
- // Get the rate limiter:
- IntervalRateLimiter rateLimiter = rateLimiters.get(states, key -> new IntervalRateLimiter(timeSource));
-
- // If the back-pressure was updated and we acquire the interval lock for the rate limiter of this group:
- if (isUpdated && rateLimiter.tryIntervalLock(windowSize))
- {
- try
- {
- // Update the rate limiter value based on the configured flow:
- if (flow.equals(Flow.FAST))
- rateLimiter.limiter = currentMax;
- else
- rateLimiter.limiter = currentMin;
-
- tenSecsNoSpamLogger.info("{} currently applied for remote replicas: {}", rateLimiter.limiter, states);
- }
- finally
- {
- rateLimiter.releaseIntervalLock();
- }
- }
- // Assigning a single rate limiter per replica group once per window size allows the back-pressure rate
- // limiting to be stable within the group itself.
-
- // Finally apply the rate limit with a max pause time equal to the provided timeout minus the
- // response time computed from the incoming rate, to reduce the number of client timeouts by taking into
- // account how long it could take to process responses after back-pressure:
- long responseTimeInNanos = (long) (TimeUnit.NANOSECONDS.convert(1, TimeUnit.SECONDS) / minIncomingRate);
- doRateLimit(rateLimiter.limiter, Math.max(0, TimeUnit.NANOSECONDS.convert(timeout, unit) - responseTimeInNanos));
- }
- }
-
- @Override
- public RateBasedBackPressureState newState(InetAddressAndPort host)
- {
- return new RateBasedBackPressureState(host, timeSource, windowSize);
- }
-
- @VisibleForTesting
- RateLimiter getRateLimiterForReplicaGroup(Set<RateBasedBackPressureState> states)
- {
- IntervalRateLimiter rateLimiter = rateLimiters.getIfPresent(states);
- return rateLimiter != null ? rateLimiter.limiter : RateLimiter.create(Double.POSITIVE_INFINITY);
- }
-
- @VisibleForTesting
- boolean doRateLimit(RateLimiter rateLimiter, long timeoutInNanos)
- {
- if (!rateLimiter.tryAcquire(1, timeoutInNanos, TimeUnit.NANOSECONDS))
- {
- timeSource.sleepUninterruptibly(timeoutInNanos, TimeUnit.NANOSECONDS);
- oneMinNoSpamLogger.info("Cannot apply {} due to exceeding write timeout, pausing {} nanoseconds instead.",
- rateLimiter, timeoutInNanos);
-
- return false;
- }
-
- return true;
- }
-
- private static class IntervalRateLimiter extends IntervalLock
- {
- public volatile RateLimiter limiter = RateLimiter.create(Double.POSITIVE_INFINITY);
-
- IntervalRateLimiter(TimeSource timeSource)
- {
- super(timeSource);
- }
- }
-}
diff --git a/src/java/org/apache/cassandra/net/RateBasedBackPressureState.java b/src/java/org/apache/cassandra/net/RateBasedBackPressureState.java
deleted file mode 100644
index a150874..0000000
--- a/src/java/org/apache/cassandra/net/RateBasedBackPressureState.java
+++ /dev/null
@@ -1,130 +0,0 @@
-/*
- * Licensed to the Apache Software Foundation (ASF) under one
- * or more contributor license agreements. See the NOTICE file
- * distributed with this work for additional information
- * regarding copyright ownership. The ASF licenses this file
- * to you under the Apache License, Version 2.0 (the
- * "License"); you may not use this file except in compliance
- * with the License. You may obtain a copy of the License at
- *
- * http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-package org.apache.cassandra.net;
-
-import java.util.concurrent.TimeUnit;
-
-import com.google.common.util.concurrent.RateLimiter;
-
-import org.apache.cassandra.locator.InetAddressAndPort;
-import org.apache.cassandra.utils.SlidingTimeRate;
-import org.apache.cassandra.utils.TimeSource;
-import org.apache.cassandra.utils.concurrent.IntervalLock;
-
-/**
- * The rate-based back-pressure state, tracked per replica host.
- * <p>
- * This back-pressure state is made up of the following attributes:
- * <ul>
- * <li>windowSize: the length of the back-pressure window in milliseconds.</li>
- * <li>incomingRate: the rate of back-pressure supporting incoming messages.</li>
- * <li>outgoingRate: the rate of back-pressure supporting outgoing messages.</li>
- * <li>rateLimiter: the rate limiter to eventually apply to outgoing messages.</li>
- * </ul>
- * The incomingRate and outgoingRate are updated together when a response is received to guarantee consistency between
- * the two.
- * <p>
- * It also provides methods to exclusively lock/release back-pressure windows at given intervals;
- * this allows to apply back-pressure even under concurrent modifications. Please also note a read lock is acquired
- * during response processing so that no concurrent rate updates can screw rate computations.
- * </p>
- */
-class RateBasedBackPressureState extends IntervalLock implements BackPressureState
-{
- private final InetAddressAndPort host;
- final SlidingTimeRate incomingRate;
- final SlidingTimeRate outgoingRate;
- final RateLimiter rateLimiter;
-
- RateBasedBackPressureState(InetAddressAndPort host, TimeSource timeSource, long windowSize)
- {
- super(timeSource);
- this.host = host;
- this.incomingRate = new SlidingTimeRate(timeSource, windowSize, windowSize / 10, TimeUnit.MILLISECONDS);
- this.outgoingRate = new SlidingTimeRate(timeSource, windowSize, windowSize / 10, TimeUnit.MILLISECONDS);
- this.rateLimiter = RateLimiter.create(Double.POSITIVE_INFINITY);
- }
-
- @Override
- public void onMessageSent(Message<?> message) {}
-
- @Override
- public void onResponseReceived()
- {
- readLock().lock();
- try
- {
- incomingRate.update(1);
- outgoingRate.update(1);
- }
- finally
- {
- readLock().unlock();
- }
- }
-
- @Override
- public void onResponseTimeout()
- {
- readLock().lock();
- try
- {
- outgoingRate.update(1);
- }
- finally
- {
- readLock().unlock();
- }
- }
-
- @Override
- public double getBackPressureRateLimit()
- {
- return rateLimiter.getRate();
- }
-
- @Override
- public InetAddressAndPort getHost()
- {
- return host;
- }
-
- @Override
- public boolean equals(Object obj)
- {
- if (obj instanceof RateBasedBackPressureState)
- {
- RateBasedBackPressureState other = (RateBasedBackPressureState) obj;
- return this.host.equals(other.host);
- }
- return false;
- }
-
- @Override
- public int hashCode()
- {
- return this.host.hashCode();
- }
-
- @Override
- public String toString()
- {
- return String.format("[host: %s, incoming rate: %.3f, outgoing rate: %.3f, rate limit: %.3f]",
- host, incomingRate.get(TimeUnit.SECONDS), outgoingRate.get(TimeUnit.SECONDS), rateLimiter.getRate());
- }
-}
diff --git a/src/java/org/apache/cassandra/net/RequestCallback.java b/src/java/org/apache/cassandra/net/RequestCallback.java
index 9ed3a4b..5bbe011 100644
--- a/src/java/org/apache/cassandra/net/RequestCallback.java
+++ b/src/java/org/apache/cassandra/net/RequestCallback.java
@@ -56,9 +56,4 @@
{
return false;
}
-
- default boolean supportsBackPressure()
- {
- return false;
- }
}
diff --git a/src/java/org/apache/cassandra/net/RequestCallbacks.java b/src/java/org/apache/cassandra/net/RequestCallbacks.java
index d5424ed..94044e1 100644
--- a/src/java/org/apache/cassandra/net/RequestCallbacks.java
+++ b/src/java/org/apache/cassandra/net/RequestCallbacks.java
@@ -167,9 +167,6 @@
InternodeOutboundMetrics.totalExpiredCallbacks.mark();
messagingService.markExpiredCallback(info.peer);
- if (info.callback.supportsBackPressure())
- messagingService.updateBackPressureOnReceive(info.peer, info.callback, true);
-
if (info.invokeOnFailure())
INTERNAL_RESPONSE.submit(() -> info.callback.onFailure(info.peer, RequestFailureReason.TIMEOUT));
diff --git a/src/java/org/apache/cassandra/net/ResponseVerbHandler.java b/src/java/org/apache/cassandra/net/ResponseVerbHandler.java
index e5779ab..369e5f4 100644
--- a/src/java/org/apache/cassandra/net/ResponseVerbHandler.java
+++ b/src/java/org/apache/cassandra/net/ResponseVerbHandler.java
@@ -57,10 +57,5 @@
MessagingService.instance().latencySubscribers.maybeAdd(cb, message.from(), latencyNanos, NANOSECONDS);
cb.onResponse(message);
}
-
- if (callbackInfo.callback.supportsBackPressure())
- {
- MessagingService.instance().updateBackPressureOnReceive(message.from(), cb, false);
- }
}
}
diff --git a/src/java/org/apache/cassandra/service/AbstractWriteResponseHandler.java b/src/java/org/apache/cassandra/service/AbstractWriteResponseHandler.java
index b1eb5b3..4f384a4 100644
--- a/src/java/org/apache/cassandra/service/AbstractWriteResponseHandler.java
+++ b/src/java/org/apache/cassandra/service/AbstractWriteResponseHandler.java
@@ -63,7 +63,6 @@
private volatile int failures = 0;
private final Map<InetAddressAndPort, RequestFailureReason> failureReasonByEndpoint;
private final long queryStartNanoTime;
- private volatile boolean supportsBackPressure = true;
/**
* Delegate to another WriteResponseHandler or possibly this one to track if the ideal consistency level was reached.
@@ -270,17 +269,6 @@
return true;
}
- @Override
- public boolean supportsBackPressure()
- {
- return supportsBackPressure;
- }
-
- public void setSupportsBackPressure(boolean supportsBackPressure)
- {
- this.supportsBackPressure = supportsBackPressure;
- }
-
/**
* Decrement the counter for all responses/expirations and if the counter
* hits 0 check to see if the ideal consistency level (this write response handler)
diff --git a/src/java/org/apache/cassandra/service/StorageProxy.java b/src/java/org/apache/cassandra/service/StorageProxy.java
index 0aedd3d..a5a6e3c 100644
--- a/src/java/org/apache/cassandra/service/StorageProxy.java
+++ b/src/java/org/apache/cassandra/service/StorageProxy.java
@@ -617,7 +617,6 @@
{
AbstractReplicationStrategy rs = keyspace.getReplicationStrategy();
responseHandler = rs.getWriteResponseHandler(replicaPlan, null, WriteType.SIMPLE, queryStartNanoTime);
- responseHandler.setSupportsBackPressure(false);
}
Message<Commit> message = Message.outWithFlag(PAXOS_COMMIT_REQ, proposal, MessageFlag.CALL_BACK_ON_FAILURE);
@@ -1339,9 +1338,6 @@
}
}
- if (backPressureHosts != null)
- MessagingService.instance().applyBackPressure(backPressureHosts, responseHandler.currentTimeoutNanos());
-
if (endpointsToHint != null)
submitHint(mutation, EndpointsForToken.copyOf(mutation.key().getToken(), endpointsToHint), responseHandler);
diff --git a/test/unit/org/apache/cassandra/config/DatabaseDescriptorRefTest.java b/test/unit/org/apache/cassandra/config/DatabaseDescriptorRefTest.java
index c43d622..576862b 100644
--- a/test/unit/org/apache/cassandra/config/DatabaseDescriptorRefTest.java
+++ b/test/unit/org/apache/cassandra/config/DatabaseDescriptorRefTest.java
@@ -140,7 +140,6 @@
"org.apache.cassandra.locator.Replica",
"org.apache.cassandra.locator.SimpleSeedProvider",
"org.apache.cassandra.locator.SeedProvider",
- "org.apache.cassandra.net.BackPressureStrategy",
"org.apache.cassandra.security.EncryptionContext",
"org.apache.cassandra.service.CacheService$CacheType",
"org.apache.cassandra.utils.binlog.BinLogOptions",
diff --git a/test/unit/org/apache/cassandra/locator/ReconnectableSnitchHelperTest.java b/test/unit/org/apache/cassandra/locator/ReconnectableSnitchHelperTest.java
index b1c3775..0c2f272 100644
--- a/test/unit/org/apache/cassandra/locator/ReconnectableSnitchHelperTest.java
+++ b/test/unit/org/apache/cassandra/locator/ReconnectableSnitchHelperTest.java
@@ -38,7 +38,6 @@
public static void beforeClass() throws UnknownHostException
{
DatabaseDescriptor.daemonInitialization();
- DatabaseDescriptor.setBackPressureStrategy(new MessagingServiceTest.MockBackPressureStrategy(Collections.emptyMap()));
DatabaseDescriptor.setBroadcastAddress(InetAddress.getByName("127.0.0.1"));
}
diff --git a/test/unit/org/apache/cassandra/net/MessagingServiceTest.java b/test/unit/org/apache/cassandra/net/MessagingServiceTest.java
index 9ce041b..7bae3fa 100644
--- a/test/unit/org/apache/cassandra/net/MessagingServiceTest.java
+++ b/test/unit/org/apache/cassandra/net/MessagingServiceTest.java
@@ -85,7 +85,6 @@
{
DatabaseDescriptor.daemonInitialization();
CommitLog.instance.start();
- DatabaseDescriptor.setBackPressureStrategy(new MockBackPressureStrategy(Collections.emptyMap()));
DatabaseDescriptor.setBroadcastAddress(InetAddress.getByName("127.0.0.1"));
originalAuthenticator = DatabaseDescriptor.getInternodeAuthenticator();
originalServerEncryptionOptions = DatabaseDescriptor.getInternodeMessagingEncyptionOptions();
@@ -98,7 +97,6 @@
public void before() throws UnknownHostException
{
messagingService.metrics.resetDroppedMessages(Integer.toString(metricScopeId++));
- MockBackPressureStrategy.applied = false;
messagingService.closeOutbound(InetAddressAndPort.getByName("127.0.0.2"));
messagingService.closeOutbound(InetAddressAndPort.getByName("127.0.0.3"));
}
@@ -213,195 +211,11 @@
assertNull(queueWaitLatency.get(verb));
}
- @Test
- public void testUpdatesBackPressureOnSendWhenEnabledAndWithSupportedCallback() throws UnknownHostException
- {
- MockBackPressureStrategy.MockBackPressureState backPressureState = (MockBackPressureStrategy.MockBackPressureState) messagingService.getBackPressureState(InetAddressAndPort.getByName("127.0.0.2"));
- RequestCallback bpCallback = new BackPressureCallback();
- RequestCallback noCallback = new NoBackPressureCallback();
- Message<?> ignored = null;
-
- DatabaseDescriptor.setBackPressureEnabled(true);
- messagingService.updateBackPressureOnSend(InetAddressAndPort.getByName("127.0.0.2"), noCallback, ignored);
- assertFalse(backPressureState.onSend);
-
- DatabaseDescriptor.setBackPressureEnabled(false);
- messagingService.updateBackPressureOnSend(InetAddressAndPort.getByName("127.0.0.2"), bpCallback, ignored);
- assertFalse(backPressureState.onSend);
-
- DatabaseDescriptor.setBackPressureEnabled(true);
- messagingService.updateBackPressureOnSend(InetAddressAndPort.getByName("127.0.0.2"), bpCallback, ignored);
- assertTrue(backPressureState.onSend);
- }
-
- @Test
- public void testUpdatesBackPressureOnReceiveWhenEnabledAndWithSupportedCallback() throws UnknownHostException
- {
- MockBackPressureStrategy.MockBackPressureState backPressureState = (MockBackPressureStrategy.MockBackPressureState) messagingService.getBackPressureState(InetAddressAndPort.getByName("127.0.0.2"));
- RequestCallback bpCallback = new BackPressureCallback();
- RequestCallback noCallback = new NoBackPressureCallback();
- boolean timeout = false;
-
- DatabaseDescriptor.setBackPressureEnabled(true);
- messagingService.updateBackPressureOnReceive(InetAddressAndPort.getByName("127.0.0.2"), noCallback, timeout);
- assertFalse(backPressureState.onReceive);
- assertFalse(backPressureState.onTimeout);
-
- DatabaseDescriptor.setBackPressureEnabled(false);
- messagingService.updateBackPressureOnReceive(InetAddressAndPort.getByName("127.0.0.2"), bpCallback, timeout);
- assertFalse(backPressureState.onReceive);
- assertFalse(backPressureState.onTimeout);
-
- DatabaseDescriptor.setBackPressureEnabled(true);
- messagingService.updateBackPressureOnReceive(InetAddressAndPort.getByName("127.0.0.2"), bpCallback, timeout);
- assertTrue(backPressureState.onReceive);
- assertFalse(backPressureState.onTimeout);
- }
-
- @Test
- public void testUpdatesBackPressureOnTimeoutWhenEnabledAndWithSupportedCallback() throws UnknownHostException
- {
- MockBackPressureStrategy.MockBackPressureState backPressureState = (MockBackPressureStrategy.MockBackPressureState) messagingService.getBackPressureState(InetAddressAndPort.getByName("127.0.0.2"));
- RequestCallback bpCallback = new BackPressureCallback();
- RequestCallback noCallback = new NoBackPressureCallback();
- boolean timeout = true;
-
- DatabaseDescriptor.setBackPressureEnabled(true);
- messagingService.updateBackPressureOnReceive(InetAddressAndPort.getByName("127.0.0.2"), noCallback, timeout);
- assertFalse(backPressureState.onReceive);
- assertFalse(backPressureState.onTimeout);
-
- DatabaseDescriptor.setBackPressureEnabled(false);
- messagingService.updateBackPressureOnReceive(InetAddressAndPort.getByName("127.0.0.2"), bpCallback, timeout);
- assertFalse(backPressureState.onReceive);
- assertFalse(backPressureState.onTimeout);
-
- DatabaseDescriptor.setBackPressureEnabled(true);
- messagingService.updateBackPressureOnReceive(InetAddressAndPort.getByName("127.0.0.2"), bpCallback, timeout);
- assertFalse(backPressureState.onReceive);
- assertTrue(backPressureState.onTimeout);
- }
-
- @Test
- public void testAppliesBackPressureWhenEnabled() throws UnknownHostException
- {
- DatabaseDescriptor.setBackPressureEnabled(false);
- messagingService.applyBackPressure(Arrays.asList(InetAddressAndPort.getByName("127.0.0.2")), ONE_SECOND);
- assertFalse(MockBackPressureStrategy.applied);
-
- DatabaseDescriptor.setBackPressureEnabled(true);
- messagingService.applyBackPressure(Arrays.asList(InetAddressAndPort.getByName("127.0.0.2")), ONE_SECOND);
- assertTrue(MockBackPressureStrategy.applied);
- }
-
- @Test
- public void testDoesntApplyBackPressureToBroadcastAddress() throws UnknownHostException
- {
- DatabaseDescriptor.setBackPressureEnabled(true);
- messagingService.applyBackPressure(Arrays.asList(InetAddressAndPort.getByName("127.0.0.1")), ONE_SECOND);
- assertFalse(MockBackPressureStrategy.applied);
- }
-
private static void addDCLatency(long sentAt, long nowTime) throws IOException
{
MessagingService.instance().metrics.internodeLatencyRecorder(InetAddressAndPort.getLocalHost()).accept(nowTime - sentAt, MILLISECONDS);
}
- public static class MockBackPressureStrategy implements BackPressureStrategy<MockBackPressureStrategy.MockBackPressureState>
- {
- public static volatile boolean applied = false;
-
- public MockBackPressureStrategy(Map<String, Object> args)
- {
- }
-
- @Override
- public void apply(Set<MockBackPressureState> states, long timeout, TimeUnit unit)
- {
- if (!Iterables.isEmpty(states))
- applied = true;
- }
-
- @Override
- public MockBackPressureState newState(InetAddressAndPort host)
- {
- return new MockBackPressureState(host);
- }
-
- public static class MockBackPressureState implements BackPressureState
- {
- private final InetAddressAndPort host;
- public volatile boolean onSend = false;
- public volatile boolean onReceive = false;
- public volatile boolean onTimeout = false;
-
- private MockBackPressureState(InetAddressAndPort host)
- {
- this.host = host;
- }
-
- @Override
- public void onMessageSent(Message<?> message)
- {
- onSend = true;
- }
-
- @Override
- public void onResponseReceived()
- {
- onReceive = true;
- }
-
- @Override
- public void onResponseTimeout()
- {
- onTimeout = true;
- }
-
- @Override
- public double getBackPressureRateLimit()
- {
- throw new UnsupportedOperationException("Not supported yet.");
- }
-
- @Override
- public InetAddressAndPort getHost()
- {
- return host;
- }
- }
- }
-
- private static class BackPressureCallback implements RequestCallback
- {
- @Override
- public boolean supportsBackPressure()
- {
- return true;
- }
-
- @Override
- public void onResponse(Message msg)
- {
- throw new UnsupportedOperationException("Not supported.");
- }
- }
-
- private static class NoBackPressureCallback implements RequestCallback
- {
- @Override
- public boolean supportsBackPressure()
- {
- return false;
- }
-
- @Override
- public void onResponse(Message msg)
- {
- throw new UnsupportedOperationException("Not supported.");
- }
- }
-
/**
* Make sure that if internode authenticatino fails for an outbound connection that all the code that relies
* on getting the connection pool handles the null return
diff --git a/test/unit/org/apache/cassandra/net/OutboundConnectionsTest.java b/test/unit/org/apache/cassandra/net/OutboundConnectionsTest.java
index 82543e1..32faea3 100644
--- a/test/unit/org/apache/cassandra/net/OutboundConnectionsTest.java
+++ b/test/unit/org/apache/cassandra/net/OutboundConnectionsTest.java
@@ -39,13 +39,6 @@
import org.apache.cassandra.io.util.DataInputPlus;
import org.apache.cassandra.io.util.DataOutputPlus;
import org.apache.cassandra.locator.InetAddressAndPort;
-import org.apache.cassandra.net.BackPressureState;
-import org.apache.cassandra.net.ConnectionType;
-import org.apache.cassandra.net.Message;
-import org.apache.cassandra.net.OutboundConnectionSettings;
-import org.apache.cassandra.net.OutboundConnections;
-import org.apache.cassandra.net.PingRequest;
-import org.apache.cassandra.net.Verb;
public class OutboundConnectionsTest
{
@@ -66,8 +59,7 @@
@Before
public void setup()
{
- BackPressureState backPressureState = DatabaseDescriptor.getBackPressureStrategy().newState(REMOTE_ADDR);
- connections = OutboundConnections.unsafeCreate(new OutboundConnectionSettings(REMOTE_ADDR), backPressureState);
+ connections = OutboundConnections.unsafeCreate(new OutboundConnectionSettings(REMOTE_ADDR));
}
@After
diff --git a/test/unit/org/apache/cassandra/net/RateBasedBackPressureTest.java b/test/unit/org/apache/cassandra/net/RateBasedBackPressureTest.java
deleted file mode 100644
index 4d1ae01..0000000
--- a/test/unit/org/apache/cassandra/net/RateBasedBackPressureTest.java
+++ /dev/null
@@ -1,408 +0,0 @@
-/*
-* Licensed to the Apache Software Foundation (ASF) under one
-* or more contributor license agreements. See the NOTICE file
-* distributed with this work for additional information
-* regarding copyright ownership. The ASF licenses this file
-* to you under the Apache License, Version 2.0 (the
-* "License"); you may not use this file except in compliance
-* with the License. You may obtain a copy of the License at
-*
-* http://www.apache.org/licenses/LICENSE-2.0
-*
-* Unless required by applicable law or agreed to in writing,
-* software distributed under the License is distributed on an
-* "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
-* KIND, either express or implied. See the License for the
-* specific language governing permissions and limitations
-* under the License.
-*/
-package org.apache.cassandra.net;
-
-import java.util.Map;
-import java.util.Set;
-import java.util.concurrent.TimeUnit;
-
-import com.google.common.collect.ImmutableMap;
-import com.google.common.collect.Sets;
-import com.google.common.util.concurrent.RateLimiter;
-
-import org.junit.Test;
-
-import org.apache.cassandra.locator.InetAddressAndPort;
-import org.apache.cassandra.utils.TestTimeSource;
-import org.apache.cassandra.utils.TimeSource;
-
-import static org.apache.cassandra.net.RateBasedBackPressure.FACTOR;
-import static org.apache.cassandra.net.RateBasedBackPressure.FLOW;
-import static org.apache.cassandra.net.RateBasedBackPressure.HIGH_RATIO;
-import static org.junit.Assert.assertEquals;
-import static org.junit.Assert.assertFalse;
-import static org.junit.Assert.assertTrue;
-import static org.junit.Assert.fail;
-
-public class RateBasedBackPressureTest
-{
- @Test(expected = IllegalArgumentException.class)
- public void testAcceptsNoLessThanThreeArguments() throws Exception
- {
- new RateBasedBackPressure(ImmutableMap.of(HIGH_RATIO, "1"), new TestTimeSource(), 10);
- }
-
- @Test(expected = IllegalArgumentException.class)
- public void testHighRatioMustBeBiggerThanZero() throws Exception
- {
- new RateBasedBackPressure(ImmutableMap.of(HIGH_RATIO, "0", FACTOR, "2", FLOW, "FAST"), new TestTimeSource(), 10);
- }
-
- @Test(expected = IllegalArgumentException.class)
- public void testHighRatioMustBeSmallerEqualThanOne() throws Exception
- {
- new RateBasedBackPressure(ImmutableMap.of(HIGH_RATIO, "2", FACTOR, "2", FLOW, "FAST"), new TestTimeSource(), 10);
- }
-
- @Test(expected = IllegalArgumentException.class)
- public void testFactorMustBeBiggerEqualThanOne() throws Exception
- {
- new RateBasedBackPressure(ImmutableMap.of(HIGH_RATIO, "0.9", FACTOR, "0", FLOW, "FAST"), new TestTimeSource(), 10);
- }
-
- @Test(expected = IllegalArgumentException.class)
- public void testWindowSizeMustBeBiggerEqualThanTen() throws Exception
- {
- new RateBasedBackPressure(ImmutableMap.of(HIGH_RATIO, "0.9", FACTOR, "5", FLOW, "FAST"), new TestTimeSource(), 1);
- }
-
- @Test
- public void testFlowMustBeEitherFASTorSLOW() throws Exception
- {
- new RateBasedBackPressure(ImmutableMap.of(HIGH_RATIO, "0.9", FACTOR, "1", FLOW, "FAST"), new TestTimeSource(), 10);
- new RateBasedBackPressure(ImmutableMap.of(HIGH_RATIO, "0.9", FACTOR, "1", FLOW, "SLOW"), new TestTimeSource(), 10);
- try
- {
- new RateBasedBackPressure(ImmutableMap.of(HIGH_RATIO, "0.9", FACTOR, "1", FLOW, "WRONG"), new TestTimeSource(), 10);
- fail("Expected to fail with wrong flow type.");
- }
- catch (Exception ex)
- {
- }
- }
-
- @Test
- public void testBackPressureStateUpdates()
- {
- long windowSize = 6000;
- TestTimeSource timeSource = new TestTimeSource();
- RateBasedBackPressure strategy = new RateBasedBackPressure(ImmutableMap.of(HIGH_RATIO, "0.9", FACTOR, "10", FLOW, "FAST"), timeSource, windowSize);
-
- RateBasedBackPressureState state = strategy.newState(InetAddressAndPort.getLoopbackAddress());
- state.onMessageSent(null);
- assertEquals(0, state.incomingRate.size());
- assertEquals(0, state.outgoingRate.size());
-
- state = strategy.newState(InetAddressAndPort.getLoopbackAddress());
- state.onResponseReceived();
- assertEquals(1, state.incomingRate.size());
- assertEquals(1, state.outgoingRate.size());
-
- state = strategy.newState(InetAddressAndPort.getLoopbackAddress());
- state.onResponseTimeout();
- assertEquals(0, state.incomingRate.size());
- assertEquals(1, state.outgoingRate.size());
- }
-
- @Test
- public void testBackPressureIsNotUpdatedBeyondInfinity() throws Exception
- {
- long windowSize = 6000;
- TestTimeSource timeSource = new TestTimeSource();
- RateBasedBackPressure strategy = new RateBasedBackPressure(ImmutableMap.of(HIGH_RATIO, "0.9", FACTOR, "10", FLOW, "FAST"), timeSource, windowSize);
- RateBasedBackPressureState state = strategy.newState(InetAddressAndPort.getLoopbackAddress());
-
- // Get initial rate:
- double initialRate = state.rateLimiter.getRate();
- assertEquals(Double.POSITIVE_INFINITY, initialRate, 0.0);
-
- // Update incoming and outgoing rate equally:
- state.incomingRate.update(1);
- state.outgoingRate.update(1);
-
- // Move time ahead:
- timeSource.sleep(windowSize, TimeUnit.MILLISECONDS);
-
- // Verify the rate doesn't change because already at infinity:
- strategy.apply(Sets.newHashSet(state), 1, TimeUnit.SECONDS);
- assertEquals(initialRate, state.rateLimiter.getRate(), 0.0);
- }
-
- @Test
- public void testBackPressureIsUpdatedOncePerWindowSize() throws Exception
- {
- long windowSize = 6000;
- TestTimeSource timeSource = new TestTimeSource();
- RateBasedBackPressure strategy = new RateBasedBackPressure(ImmutableMap.of(HIGH_RATIO, "0.9", FACTOR, "10", FLOW, "FAST"), timeSource, windowSize);
- RateBasedBackPressureState state = strategy.newState(InetAddressAndPort.getLoopbackAddress());
-
- // Get initial time:
- long current = state.getLastIntervalAcquire();
- assertEquals(0, current);
-
- // Update incoming and outgoing rate:
- state.incomingRate.update(1);
- state.outgoingRate.update(1);
-
- // Move time ahead by window size:
- timeSource.sleep(windowSize, TimeUnit.MILLISECONDS);
-
- // Verify the timestamp changed:
- strategy.apply(Sets.newHashSet(state), 1, TimeUnit.SECONDS);
- current = state.getLastIntervalAcquire();
- assertEquals(timeSource.currentTimeMillis(), current);
-
- // Move time ahead by less than interval:
- long previous = current;
- timeSource.sleep(windowSize / 2, TimeUnit.MILLISECONDS);
-
- // Verify the last timestamp didn't change because below the window size:
- strategy.apply(Sets.newHashSet(state), 1, TimeUnit.SECONDS);
- current = state.getLastIntervalAcquire();
- assertEquals(previous, current);
- }
-
- @Test
- public void testBackPressureWhenBelowHighRatio() throws Exception
- {
- long windowSize = 6000;
- TestTimeSource timeSource = new TestTimeSource();
- RateBasedBackPressure strategy = new RateBasedBackPressure(ImmutableMap.of(HIGH_RATIO, "0.9", FACTOR, "10", FLOW, "FAST"), timeSource, windowSize);
- RateBasedBackPressureState state = strategy.newState(InetAddressAndPort.getLoopbackAddress());
-
- // Update incoming and outgoing rate so that the ratio is 0.5:
- state.incomingRate.update(50);
- state.outgoingRate.update(100);
-
- // Move time ahead:
- timeSource.sleep(windowSize, TimeUnit.MILLISECONDS);
-
- // Verify the rate is decreased by factor:
- strategy.apply(Sets.newHashSet(state), 1, TimeUnit.SECONDS);
- assertEquals(7.4, state.rateLimiter.getRate(), 0.1);
- }
-
- @Test
- public void testBackPressureRateLimiterIsIncreasedAfterGoingAgainAboveHighRatio() throws Exception
- {
- long windowSize = 6000;
- TestTimeSource timeSource = new TestTimeSource();
- RateBasedBackPressure strategy = new RateBasedBackPressure(ImmutableMap.of(HIGH_RATIO, "0.9", FACTOR, "10", FLOW, "FAST"), timeSource, windowSize);
- RateBasedBackPressureState state = strategy.newState(InetAddressAndPort.getLoopbackAddress());
-
- // Update incoming and outgoing rate so that the ratio is 0.5:
- state.incomingRate.update(50);
- state.outgoingRate.update(100);
-
- // Move time ahead:
- timeSource.sleep(windowSize, TimeUnit.MILLISECONDS);
-
- // Verify the rate decreased:
- strategy.apply(Sets.newHashSet(state), 1, TimeUnit.SECONDS);
- assertEquals(7.4, state.rateLimiter.getRate(), 0.1);
-
- // Update incoming and outgoing rate back above high rate:
- state.incomingRate.update(50);
- state.outgoingRate.update(50);
-
- // Move time ahead:
- timeSource.sleep(windowSize, TimeUnit.MILLISECONDS);
-
- // Verify rate limiter is increased by factor:
- strategy.apply(Sets.newHashSet(state), 1, TimeUnit.SECONDS);
- assertEquals(8.25, state.rateLimiter.getRate(), 0.1);
-
- // Update incoming and outgoing rate to keep it below the limiter rate:
- state.incomingRate.update(1);
- state.outgoingRate.update(1);
-
- // Move time ahead:
- timeSource.sleep(windowSize, TimeUnit.MILLISECONDS);
-
- // Verify rate limiter is not increased as already higher than the actual rate:
- strategy.apply(Sets.newHashSet(state), 1, TimeUnit.SECONDS);
- assertEquals(8.25, state.rateLimiter.getRate(), 0.1);
- }
-
- @Test
- public void testBackPressureFastFlow() throws Exception
- {
- long windowSize = 6000;
- TestTimeSource timeSource = new TestTimeSource();
- TestableBackPressure strategy = new TestableBackPressure(ImmutableMap.of(HIGH_RATIO, "0.9", FACTOR, "10", FLOW, "FAST"), timeSource, windowSize);
- RateBasedBackPressureState state1 = strategy.newState(InetAddressAndPort.getByName("127.0.0.1"));
- RateBasedBackPressureState state2 = strategy.newState(InetAddressAndPort.getByName("127.0.0.2"));
- RateBasedBackPressureState state3 = strategy.newState(InetAddressAndPort.getByName("127.0.0.3"));
-
- // Update incoming and outgoing rates:
- state1.incomingRate.update(50);
- state1.outgoingRate.update(100);
- state2.incomingRate.update(80); // fast
- state2.outgoingRate.update(100);
- state3.incomingRate.update(20);
- state3.outgoingRate.update(100);
-
- // Move time ahead:
- timeSource.sleep(windowSize, TimeUnit.MILLISECONDS);
-
- // Verify the fast replica rate limiting has been applied:
- Set<RateBasedBackPressureState> replicaGroup = Sets.newHashSet(state1, state2, state3);
- strategy.apply(replicaGroup, 1, TimeUnit.SECONDS);
- assertTrue(strategy.checkAcquired());
- assertTrue(strategy.checkApplied());
- assertEquals(12.0, strategy.getRateLimiterForReplicaGroup(replicaGroup).getRate(), 0.1);
- }
-
- @Test
- public void testBackPressureSlowFlow() throws Exception
- {
- long windowSize = 6000;
- TestTimeSource timeSource = new TestTimeSource();
- TestableBackPressure strategy = new TestableBackPressure(ImmutableMap.of(HIGH_RATIO, "0.9", FACTOR, "10", FLOW, "SLOW"), timeSource, windowSize);
- RateBasedBackPressureState state1 = strategy.newState(InetAddressAndPort.getByName("127.0.0.1"));
- RateBasedBackPressureState state2 = strategy.newState(InetAddressAndPort.getByName("127.0.0.2"));
- RateBasedBackPressureState state3 = strategy.newState(InetAddressAndPort.getByName("127.0.0.3"));
-
- // Update incoming and outgoing rates:
- state1.incomingRate.update(50);
- state1.outgoingRate.update(100);
- state2.incomingRate.update(100);
- state2.outgoingRate.update(100);
- state3.incomingRate.update(20); // slow
- state3.outgoingRate.update(100);
-
- // Move time ahead:
- timeSource.sleep(windowSize, TimeUnit.MILLISECONDS);
-
- // Verify the slow replica rate limiting has been applied:
- Set<RateBasedBackPressureState> replicaGroup = Sets.newHashSet(state1, state2, state3);
- strategy.apply(replicaGroup, 1, TimeUnit.SECONDS);
- assertTrue(strategy.checkAcquired());
- assertTrue(strategy.checkApplied());
- assertEquals(3.0, strategy.getRateLimiterForReplicaGroup(replicaGroup).getRate(), 0.1);
- }
-
- @Test
- public void testBackPressureWithDifferentGroups() throws Exception
- {
- long windowSize = 6000;
- TestTimeSource timeSource = new TestTimeSource();
- TestableBackPressure strategy = new TestableBackPressure(ImmutableMap.of(HIGH_RATIO, "0.9", FACTOR, "10", FLOW, "SLOW"), timeSource, windowSize);
- RateBasedBackPressureState state1 = strategy.newState(InetAddressAndPort.getByName("127.0.0.1"));
- RateBasedBackPressureState state2 = strategy.newState(InetAddressAndPort.getByName("127.0.0.2"));
- RateBasedBackPressureState state3 = strategy.newState(InetAddressAndPort.getByName("127.0.0.3"));
- RateBasedBackPressureState state4 = strategy.newState(InetAddressAndPort.getByName("127.0.0.4"));
-
- // Update incoming and outgoing rates:
- state1.incomingRate.update(50); // this
- state1.outgoingRate.update(100);
- state2.incomingRate.update(100);
- state2.outgoingRate.update(100);
- state3.incomingRate.update(20); // this
- state3.outgoingRate.update(100);
- state4.incomingRate.update(80);
- state4.outgoingRate.update(100);
-
- // Move time ahead:
- timeSource.sleep(windowSize, TimeUnit.MILLISECONDS);
-
- // Verify the first group:
- Set<RateBasedBackPressureState> replicaGroup = Sets.newHashSet(state1, state2);
- strategy.apply(replicaGroup, 1, TimeUnit.SECONDS);
- assertTrue(strategy.checkAcquired());
- assertTrue(strategy.checkApplied());
- assertEquals(7.4, strategy.getRateLimiterForReplicaGroup(replicaGroup).getRate(), 0.1);
-
- // Verify the second group:
- replicaGroup = Sets.newHashSet(state3, state4);
- strategy.apply(replicaGroup, 1, TimeUnit.SECONDS);
- assertTrue(strategy.checkAcquired());
- assertTrue(strategy.checkApplied());
- assertEquals(3.0, strategy.getRateLimiterForReplicaGroup(replicaGroup).getRate(), 0.1);
- }
-
- @Test
- public void testBackPressurePastTimeout() throws Exception
- {
- long windowSize = 10000;
- TestTimeSource timeSource = new TestTimeSource();
- TestableBackPressure strategy = new TestableBackPressure(ImmutableMap.of(HIGH_RATIO, "0.9", FACTOR, "10", FLOW, "SLOW"), timeSource, windowSize);
- RateBasedBackPressureState state1 = strategy.newState(InetAddressAndPort.getByName("127.0.0.1"));
- RateBasedBackPressureState state2 = strategy.newState(InetAddressAndPort.getByName("127.0.0.2"));
- RateBasedBackPressureState state3 = strategy.newState(InetAddressAndPort.getByName("127.0.0.3"));
-
- // Update incoming and outgoing rates:
- state1.incomingRate.update(5); // slow
- state1.outgoingRate.update(100);
- state2.incomingRate.update(100);
- state2.outgoingRate.update(100);
- state3.incomingRate.update(100);
- state3.outgoingRate.update(100);
-
- // Move time ahead:
- timeSource.sleep(windowSize, TimeUnit.MILLISECONDS);
-
- // Verify the slow replica rate limiting has been applied:
- Set<RateBasedBackPressureState> replicaGroup = Sets.newHashSet(state1, state2, state3);
- strategy.apply(replicaGroup, 4, TimeUnit.SECONDS);
- assertTrue(strategy.checkAcquired());
- assertTrue(strategy.checkApplied());
- assertEquals(0.5, strategy.getRateLimiterForReplicaGroup(replicaGroup).getRate(), 0.1);
-
- // Make one more apply call to saturate the rate limit timeout (0.5 requests per second means 2 requests span
- // 4 seconds, but we can only make one as we have to subtract the incoming response time):
- strategy.apply(replicaGroup, 4, TimeUnit.SECONDS);
-
- // Now verify another call to apply doesn't acquire the rate limit because of the max timeout of 4 seconds minus
- // 2 seconds of response time, so the time source itself sleeps two second:
- long start = timeSource.currentTimeMillis();
- strategy.apply(replicaGroup, 4, TimeUnit.SECONDS);
- assertFalse(strategy.checkAcquired());
- assertTrue(strategy.checkApplied());
- assertEquals(TimeUnit.NANOSECONDS.convert(2, TimeUnit.SECONDS),
- strategy.timeout);
- assertEquals(strategy.timeout,
- TimeUnit.NANOSECONDS.convert(timeSource.currentTimeMillis() - start, TimeUnit.MILLISECONDS));
- }
-
- public static class TestableBackPressure extends RateBasedBackPressure
- {
- public volatile boolean acquired = false;
- public volatile boolean applied = false;
- public volatile long timeout;
-
- public TestableBackPressure(Map<String, Object> args, TimeSource timeSource, long windowSize)
- {
- super(args, timeSource, windowSize);
- }
-
- @Override
- public boolean doRateLimit(RateLimiter rateLimiter, long timeoutInNanos)
- {
- acquired = super.doRateLimit(rateLimiter, timeoutInNanos);
- applied = true;
- timeout = timeoutInNanos;
- return acquired;
- }
-
- public boolean checkAcquired()
- {
- boolean checked = acquired;
- acquired = false;
- return checked;
- }
-
- public boolean checkApplied()
- {
- boolean checked = applied;
- applied = false;
- return checked;
- }
- }
-}