blob: 909a7f6d2c17ec0dd24468aa15fff25226912e17 [file] [log] [blame]
/*
* Licensed to the Apache Software Foundation (ASF) under one
* or more contributor license agreements. See the NOTICE file
* distributed with this work for additional information
* regarding copyright ownership. The ASF licenses this file
* to you under the Apache License, Version 2.0 (the
* "License"); you may not use this file except in compliance
* with the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/
package org.apache.cassandra.locator;
import java.lang.reflect.Constructor;
import java.lang.reflect.InvocationTargetException;
import java.lang.reflect.Method;
import java.util.Collection;
import java.util.Collections;
import java.util.Map;
import java.util.concurrent.atomic.AtomicReference;
import com.google.common.base.Preconditions;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
import org.apache.cassandra.config.DatabaseDescriptor;
import org.apache.cassandra.db.ConsistencyLevel;
import org.apache.cassandra.db.WriteType;
import org.apache.cassandra.dht.Range;
import org.apache.cassandra.dht.RingPosition;
import org.apache.cassandra.dht.Token;
import org.apache.cassandra.exceptions.ConfigurationException;
import org.apache.cassandra.locator.ReplicaCollection.Builder.Conflict;
import org.apache.cassandra.service.AbstractWriteResponseHandler;
import org.apache.cassandra.service.DatacenterSyncWriteResponseHandler;
import org.apache.cassandra.service.DatacenterWriteResponseHandler;
import org.apache.cassandra.service.WriteResponseHandler;
import org.apache.cassandra.utils.FBUtilities;
import org.cliffc.high_scale_lib.NonBlockingHashMap;
/**
* A abstract parent for all replication strategies.
*/
public abstract class AbstractReplicationStrategy
{
private static final Logger logger = LoggerFactory.getLogger(AbstractReplicationStrategy.class);
public final Map<String, String> configOptions;
protected final String keyspaceName;
private final TokenMetadata tokenMetadata;
private final ReplicaCache<Token, EndpointsForRange> replicas = new ReplicaCache<>();
public IEndpointSnitch snitch;
protected AbstractReplicationStrategy(String keyspaceName, TokenMetadata tokenMetadata, IEndpointSnitch snitch, Map<String, String> configOptions)
{
assert snitch != null;
assert tokenMetadata != null;
this.tokenMetadata = tokenMetadata;
this.snitch = snitch;
this.configOptions = configOptions == null ? Collections.<String, String>emptyMap() : configOptions;
this.keyspaceName = keyspaceName;
}
public EndpointsForRange getCachedReplicas(long ringVersion, Token t)
{
return replicas.get(ringVersion, t);
}
/**
* get the (possibly cached) endpoints that should store the given Token.
* Note that while the endpoints are conceptually a Set (no duplicates will be included),
* we return a List to avoid an extra allocation when sorting by proximity later
* @param searchPosition the position the natural endpoints are requested for
* @return a copy of the natural endpoints for the given token
*/
public EndpointsForToken getNaturalReplicasForToken(RingPosition<?> searchPosition)
{
return getNaturalReplicas(searchPosition).forToken(searchPosition.getToken());
}
public EndpointsForRange getNaturalReplicas(RingPosition<?> searchPosition)
{
Token searchToken = searchPosition.getToken();
long currentRingVersion = tokenMetadata.getRingVersion();
Token keyToken = TokenMetadata.firstToken(tokenMetadata.sortedTokens(), searchToken);
EndpointsForRange endpoints = getCachedReplicas(currentRingVersion, keyToken);
if (endpoints == null)
{
TokenMetadata tm = tokenMetadata.cachedOnlyTokenMap();
// if our cache got invalidated, it's possible there is a new token to account for too
keyToken = TokenMetadata.firstToken(tm.sortedTokens(), searchToken);
endpoints = calculateNaturalReplicas(searchToken, tm);
replicas.put(tm.getRingVersion(), keyToken, endpoints);
}
return endpoints;
}
public Replica getLocalReplicaFor(RingPosition<?> searchPosition)
{
return getNaturalReplicas(searchPosition)
.byEndpoint()
.get(FBUtilities.getBroadcastAddressAndPort());
}
/**
* Calculate the natural endpoints for the given token. Endpoints are returned in the order
* they occur in the ring following the searchToken, as defined by the replication strategy.
*
* Note that the order of the replicas is _implicitly relied upon_ by the definition of
* "primary" range in
* {@link org.apache.cassandra.service.StorageService#getPrimaryRangesForEndpoint(String, InetAddressAndPort)}
* which is in turn relied on by various components like repair and size estimate calculations.
*
* @see #getNaturalReplicasForToken(org.apache.cassandra.dht.RingPosition)
*
* @param tokenMetadata the token metadata used to find the searchToken, e.g. contains token to endpoint
* mapping information
* @param searchToken the token to find the natural endpoints for
* @return a copy of the natural endpoints for the given token
*/
public abstract EndpointsForRange calculateNaturalReplicas(Token searchToken, TokenMetadata tokenMetadata);
public <T> AbstractWriteResponseHandler<T> getWriteResponseHandler(ReplicaPlan.ForTokenWrite replicaPlan,
Runnable callback,
WriteType writeType,
long queryStartNanoTime)
{
return getWriteResponseHandler(replicaPlan, callback, writeType, queryStartNanoTime, DatabaseDescriptor.getIdealConsistencyLevel());
}
public <T> AbstractWriteResponseHandler<T> getWriteResponseHandler(ReplicaPlan.ForTokenWrite replicaPlan,
Runnable callback,
WriteType writeType,
long queryStartNanoTime,
ConsistencyLevel idealConsistencyLevel)
{
AbstractWriteResponseHandler<T> resultResponseHandler;
if (replicaPlan.consistencyLevel().isDatacenterLocal())
{
// block for in this context will be localnodes block.
resultResponseHandler = new DatacenterWriteResponseHandler<T>(replicaPlan, callback, writeType, queryStartNanoTime);
}
else if (replicaPlan.consistencyLevel() == ConsistencyLevel.EACH_QUORUM && (this instanceof NetworkTopologyStrategy))
{
resultResponseHandler = new DatacenterSyncWriteResponseHandler<T>(replicaPlan, callback, writeType, queryStartNanoTime);
}
else
{
resultResponseHandler = new WriteResponseHandler<T>(replicaPlan, callback, writeType, queryStartNanoTime);
}
//Check if tracking the ideal consistency level is configured
if (idealConsistencyLevel != null)
{
//If ideal and requested are the same just use this handler to track the ideal consistency level
//This is also used so that the ideal consistency level handler when constructed knows it is the ideal
//one for tracking purposes
if (idealConsistencyLevel == replicaPlan.consistencyLevel())
{
resultResponseHandler.setIdealCLResponseHandler(resultResponseHandler);
}
else
{
//Construct a delegate response handler to use to track the ideal consistency level
AbstractWriteResponseHandler<T> idealHandler = getWriteResponseHandler(replicaPlan.withConsistencyLevel(idealConsistencyLevel),
callback,
writeType,
queryStartNanoTime,
idealConsistencyLevel);
resultResponseHandler.setIdealCLResponseHandler(idealHandler);
}
}
return resultResponseHandler;
}
/**
* calculate the RF based on strategy_options. When overwriting, ensure that this get()
* is FAST, as this is called often.
*
* @return the replication factor
*/
public abstract ReplicationFactor getReplicationFactor();
public boolean hasTransientReplicas()
{
return getReplicationFactor().hasTransientReplicas();
}
/*
* NOTE: this is pretty inefficient. also the inverse (getRangeAddresses) below.
* this is fine as long as we don't use this on any critical path.
* (fixing this would probably require merging tokenmetadata into replicationstrategy,
* so we could cache/invalidate cleanly.)
*/
public RangesByEndpoint getAddressReplicas(TokenMetadata metadata)
{
RangesByEndpoint.Builder map = new RangesByEndpoint.Builder();
for (Token token : metadata.sortedTokens())
{
Range<Token> range = metadata.getPrimaryRangeFor(token);
for (Replica replica : calculateNaturalReplicas(token, metadata))
{
// LocalStrategy always returns (min, min] ranges for it's replicas, so we skip the check here
Preconditions.checkState(range.equals(replica.range()) || this instanceof LocalStrategy);
map.put(replica.endpoint(), replica);
}
}
return map.build();
}
public RangesAtEndpoint getAddressReplicas(TokenMetadata metadata, InetAddressAndPort endpoint)
{
RangesAtEndpoint.Builder builder = RangesAtEndpoint.builder(endpoint);
for (Token token : metadata.sortedTokens())
{
Range<Token> range = metadata.getPrimaryRangeFor(token);
Replica replica = calculateNaturalReplicas(token, metadata)
.byEndpoint().get(endpoint);
if (replica != null)
{
// LocalStrategy always returns (min, min] ranges for it's replicas, so we skip the check here
Preconditions.checkState(range.equals(replica.range()) || this instanceof LocalStrategy);
builder.add(replica, Conflict.DUPLICATE);
}
}
return builder.build();
}
public EndpointsByRange getRangeAddresses(TokenMetadata metadata)
{
EndpointsByRange.Builder map = new EndpointsByRange.Builder();
for (Token token : metadata.sortedTokens())
{
Range<Token> range = metadata.getPrimaryRangeFor(token);
for (Replica replica : calculateNaturalReplicas(token, metadata))
{
// LocalStrategy always returns (min, min] ranges for it's replicas, so we skip the check here
Preconditions.checkState(range.equals(replica.range()) || this instanceof LocalStrategy);
map.put(range, replica);
}
}
return map.build();
}
public RangesByEndpoint getAddressReplicas()
{
return getAddressReplicas(tokenMetadata.cloneOnlyTokenMap());
}
public RangesAtEndpoint getAddressReplicas(InetAddressAndPort endpoint)
{
return getAddressReplicas(tokenMetadata.cloneOnlyTokenMap(), endpoint);
}
public RangesAtEndpoint getPendingAddressRanges(TokenMetadata metadata, Token pendingToken, InetAddressAndPort pendingAddress)
{
return getPendingAddressRanges(metadata, Collections.singleton(pendingToken), pendingAddress);
}
public RangesAtEndpoint getPendingAddressRanges(TokenMetadata metadata, Collection<Token> pendingTokens, InetAddressAndPort pendingAddress)
{
TokenMetadata temp = metadata.cloneOnlyTokenMap();
temp.updateNormalTokens(pendingTokens, pendingAddress);
return getAddressReplicas(temp, pendingAddress);
}
public abstract void validateOptions() throws ConfigurationException;
public abstract void maybeWarnOnOptions();
/*
* The options recognized by the strategy.
* The empty collection means that no options are accepted, but null means
* that any option is accepted.
*/
public Collection<String> recognizedOptions()
{
// We default to null for backward compatibility sake
return null;
}
private static AbstractReplicationStrategy createInternal(String keyspaceName,
Class<? extends AbstractReplicationStrategy> strategyClass,
TokenMetadata tokenMetadata,
IEndpointSnitch snitch,
Map<String, String> strategyOptions)
throws ConfigurationException
{
AbstractReplicationStrategy strategy;
Class<?>[] parameterTypes = new Class[] {String.class, TokenMetadata.class, IEndpointSnitch.class, Map.class};
try
{
Constructor<? extends AbstractReplicationStrategy> constructor = strategyClass.getConstructor(parameterTypes);
strategy = constructor.newInstance(keyspaceName, tokenMetadata, snitch, strategyOptions);
}
catch (InvocationTargetException e)
{
Throwable targetException = e.getTargetException();
throw new ConfigurationException(targetException.getMessage(), targetException);
}
catch (Exception e)
{
throw new ConfigurationException("Error constructing replication strategy class", e);
}
return strategy;
}
public static AbstractReplicationStrategy createReplicationStrategy(String keyspaceName,
Class<? extends AbstractReplicationStrategy> strategyClass,
TokenMetadata tokenMetadata,
IEndpointSnitch snitch,
Map<String, String> strategyOptions)
{
AbstractReplicationStrategy strategy = createInternal(keyspaceName, strategyClass, tokenMetadata, snitch, strategyOptions);
// Because we used to not properly validate unrecognized options, we only log a warning if we find one.
try
{
strategy.validateExpectedOptions();
}
catch (ConfigurationException e)
{
logger.warn("Ignoring {}", e.getMessage());
}
strategy.validateOptions();
return strategy;
}
/**
* Before constructing the ARS we first give it a chance to prepare the options map in any way it
* would like to. For example datacenter auto-expansion or other templating to make the user interface
* more usable. Note that this may mutate the passed strategyOptions Map.
*
* We do this prior to the construction of the strategyClass itself because at that point the option
* map is already immutable and comes from {@link org.apache.cassandra.schema.ReplicationParams}
* (and should probably stay that way so we don't start having bugs related to ReplicationParams being mutable).
* Instead ARS classes get a static hook here via the prepareOptions(Map, Map) method to mutate the user input
* before it becomes an immutable part of the ReplicationParams.
*
* @param strategyClass The class to call prepareOptions on
* @param strategyOptions The proposed strategy options that will be potentially mutated by the prepareOptions
* method.
* @param previousStrategyOptions In the case of an ALTER statement, the previous strategy options of this class.
* This map cannot be mutated.
*/
public static void prepareReplicationStrategyOptions(Class<? extends AbstractReplicationStrategy> strategyClass,
Map<String, String> strategyOptions,
Map<String, String> previousStrategyOptions)
{
try
{
Method method = strategyClass.getDeclaredMethod("prepareOptions", Map.class, Map.class);
method.invoke(null, strategyOptions, previousStrategyOptions);
}
catch (NoSuchMethodException | IllegalAccessException | InvocationTargetException ign)
{
// If the subclass doesn't specify a prepareOptions method, then that means that it
// doesn't want to do anything to the options. So do nothing on reflection related exceptions.
}
}
public static void validateReplicationStrategy(String keyspaceName,
Class<? extends AbstractReplicationStrategy> strategyClass,
TokenMetadata tokenMetadata,
IEndpointSnitch snitch,
Map<String, String> strategyOptions) throws ConfigurationException
{
AbstractReplicationStrategy strategy = createInternal(keyspaceName, strategyClass, tokenMetadata, snitch, strategyOptions);
strategy.validateExpectedOptions();
strategy.validateOptions();
strategy.maybeWarnOnOptions();
if (strategy.hasTransientReplicas() && !DatabaseDescriptor.isTransientReplicationEnabled())
{
throw new ConfigurationException("Transient replication is disabled. Enable in cassandra.yaml to use.");
}
}
public static Class<AbstractReplicationStrategy> getClass(String cls) throws ConfigurationException
{
String className = cls.contains(".") ? cls : "org.apache.cassandra.locator." + cls;
if ("org.apache.cassandra.locator.OldNetworkTopologyStrategy".equals(className)) // see CASSANDRA-16301
throw new ConfigurationException("The support for the OldNetworkTopologyStrategy has been removed in C* version 4.0. The keyspace strategy should be switch to NetworkTopologyStrategy");
Class<AbstractReplicationStrategy> strategyClass = FBUtilities.classForName(className, "replication strategy");
if (!AbstractReplicationStrategy.class.isAssignableFrom(strategyClass))
{
throw new ConfigurationException(String.format("Specified replication strategy class (%s) is not derived from AbstractReplicationStrategy", className));
}
return strategyClass;
}
public boolean hasSameSettings(AbstractReplicationStrategy other)
{
return getClass().equals(other.getClass()) && getReplicationFactor().equals(other.getReplicationFactor());
}
protected void validateReplicationFactor(String s) throws ConfigurationException
{
try
{
ReplicationFactor rf = ReplicationFactor.fromString(s);
if (rf.hasTransientReplicas())
{
if (DatabaseDescriptor.getNumTokens() > 1)
throw new ConfigurationException("Transient replication is not supported with vnodes yet");
}
}
catch (IllegalArgumentException e)
{
throw new ConfigurationException(e.getMessage());
}
}
protected void validateExpectedOptions() throws ConfigurationException
{
Collection<String> expectedOptions = recognizedOptions();
if (expectedOptions == null)
return;
for (String key : configOptions.keySet())
{
if (!expectedOptions.contains(key))
throw new ConfigurationException(String.format("Unrecognized strategy option {%s} passed to %s for keyspace %s", key, getClass().getSimpleName(), keyspaceName));
}
}
static class ReplicaCache<K, V>
{
private final AtomicReference<ReplicaHolder<K, V>> cachedReplicas = new AtomicReference<>(new ReplicaHolder<>(0, 4));
V get(long ringVersion, K keyToken)
{
ReplicaHolder<K, V> replicaHolder = maybeClearAndGet(ringVersion);
if (replicaHolder == null)
return null;
return replicaHolder.replicas.get(keyToken);
}
void put(long ringVersion, K keyToken, V endpoints)
{
ReplicaHolder<K, V> current = maybeClearAndGet(ringVersion);
if (current != null)
{
// if we have the same ringVersion, but already know about the keyToken the endpoints should be the same
current.replicas.putIfAbsent(keyToken, endpoints);
}
}
ReplicaHolder<K, V> maybeClearAndGet(long ringVersion)
{
ReplicaHolder<K, V> current = cachedReplicas.get();
if (ringVersion == current.ringVersion)
return current;
else if (ringVersion < current.ringVersion) // things have already moved on
return null;
// If ring version has changed, create a fresh replica holder and try to replace the current one.
// This may race with other threads that have the same new ring version and one will win and the loosers
// will be garbage collected
ReplicaHolder<K, V> cleaned = new ReplicaHolder<>(ringVersion, current.replicas.size());
cachedReplicas.compareAndSet(current, cleaned);
// A new ring version may have come along while making the new holder, so re-check the
// reference and return the ring version if the same, otherwise return null as there is no point
// in using it.
current = cachedReplicas.get();
if (ringVersion == current.ringVersion)
return current;
else
return null;
}
}
static class ReplicaHolder<K, V>
{
private final long ringVersion;
private final NonBlockingHashMap<K, V> replicas;
ReplicaHolder(long ringVersion, int expectedEntries)
{
this.ringVersion = ringVersion;
this.replicas = new NonBlockingHashMap<>(expectedEntries);
}
}
}