| /* |
| * Licensed to the Apache Software Foundation (ASF) under one |
| * or more contributor license agreements. See the NOTICE file |
| * distributed with this work for additional information |
| * regarding copyright ownership. The ASF licenses this file |
| * to you under the Apache License, Version 2.0 (the |
| * "License"); you may not use this file except in compliance |
| * with the License. You may obtain a copy of the License at |
| * |
| * http://www.apache.org/licenses/LICENSE-2.0 |
| * |
| * Unless required by applicable law or agreed to in writing, software |
| * distributed under the License is distributed on an "AS IS" BASIS, |
| * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. |
| * See the License for the specific language governing permissions and |
| * limitations under the License. |
| */ |
| package org.apache.cassandra.locator; |
| |
| import java.util.*; |
| import java.util.Map.Entry; |
| |
| import org.apache.cassandra.locator.ReplicaCollection.Builder.Conflict; |
| import org.slf4j.Logger; |
| import org.slf4j.LoggerFactory; |
| |
| import org.apache.cassandra.dht.Datacenters; |
| import org.apache.cassandra.dht.Range; |
| import org.apache.cassandra.exceptions.ConfigurationException; |
| import org.apache.cassandra.dht.Token; |
| import org.apache.cassandra.locator.TokenMetadata.Topology; |
| import org.apache.cassandra.schema.SchemaConstants; |
| import org.apache.cassandra.service.ClientWarn; |
| import org.apache.cassandra.service.StorageService; |
| import org.apache.cassandra.utils.FBUtilities; |
| import org.apache.cassandra.utils.Pair; |
| |
| import com.google.common.collect.ImmutableMultimap; |
| import com.google.common.collect.Multimap; |
| import com.google.common.collect.Multimaps; |
| |
| /** |
| * <p> |
| * This Replication Strategy takes a property file that gives the intended |
| * replication factor in each datacenter. The sum total of the datacenter |
| * replication factor values should be equal to the keyspace replication |
| * factor. |
| * </p> |
| * <p> |
| * So for example, if the keyspace replication factor is 6, the |
| * datacenter replication factors could be 3, 2, and 1 - so 3 replicas in |
| * one datacenter, 2 in another, and 1 in another - totaling 6. |
| * </p> |
| * This class also caches the Endpoints and invalidates the cache if there is a |
| * change in the number of tokens. |
| */ |
| public class NetworkTopologyStrategy extends AbstractReplicationStrategy |
| { |
| public static final String REPLICATION_FACTOR = "replication_factor"; |
| |
| private final Map<String, ReplicationFactor> datacenters; |
| private final ReplicationFactor aggregateRf; |
| private static final Logger logger = LoggerFactory.getLogger(NetworkTopologyStrategy.class); |
| |
| public NetworkTopologyStrategy(String keyspaceName, TokenMetadata tokenMetadata, IEndpointSnitch snitch, Map<String, String> configOptions) throws ConfigurationException |
| { |
| super(keyspaceName, tokenMetadata, snitch, configOptions); |
| |
| int replicas = 0; |
| int trans = 0; |
| Map<String, ReplicationFactor> newDatacenters = new HashMap<>(); |
| if (configOptions != null) |
| { |
| for (Entry<String, String> entry : configOptions.entrySet()) |
| { |
| String dc = entry.getKey(); |
| // prepareOptions should have transformed any "replication_factor" options by now |
| if (dc.equalsIgnoreCase(REPLICATION_FACTOR)) |
| throw new ConfigurationException(REPLICATION_FACTOR + " should not appear as an option at construction time for NetworkTopologyStrategy"); |
| ReplicationFactor rf = ReplicationFactor.fromString(entry.getValue()); |
| replicas += rf.allReplicas; |
| trans += rf.transientReplicas(); |
| newDatacenters.put(dc, rf); |
| } |
| } |
| |
| datacenters = Collections.unmodifiableMap(newDatacenters); |
| aggregateRf = ReplicationFactor.withTransient(replicas, trans); |
| logger.info("Configured datacenter replicas are {}", FBUtilities.toString(datacenters)); |
| } |
| |
| /** |
| * Endpoint adder applying the replication rules for a given DC. |
| */ |
| private static final class DatacenterEndpoints |
| { |
| /** List accepted endpoints get pushed into. */ |
| EndpointsForRange.Builder replicas; |
| |
| /** |
| * Racks encountered so far. Replicas are put into separate racks while possible. |
| * For efficiency the set is shared between the instances, using the location pair (dc, rack) to make sure |
| * clashing names aren't a problem. |
| */ |
| Set<Pair<String, String>> racks; |
| |
| /** Number of replicas left to fill from this DC. */ |
| int rfLeft; |
| int acceptableRackRepeats; |
| int transients; |
| |
| DatacenterEndpoints(ReplicationFactor rf, |
| int rackCount, |
| int nodeCount, |
| EndpointsForRange.Builder replicas, |
| Set<Pair<String, String>> racks) |
| { |
| this.replicas = replicas; |
| this.racks = racks; |
| // If there aren't enough nodes in this DC to fill the RF, the number of nodes is the effective RF. |
| this.rfLeft = Math.min(rf.allReplicas, nodeCount); |
| // If there aren't enough racks in this DC to fill the RF, we'll still use at least one node from each rack, |
| // and the difference is to be filled by the first encountered nodes. |
| acceptableRackRepeats = rf.allReplicas - rackCount; |
| |
| // if we have fewer replicas than rf calls for, reduce transients accordingly |
| int reduceTransients = rf.allReplicas - this.rfLeft; |
| transients = Math.max(rf.transientReplicas() - reduceTransients, 0); |
| ReplicationFactor.validate(rfLeft, transients); |
| } |
| |
| /** |
| * Attempts to add an endpoint to the replicas for this datacenter, adding to the replicas set if successful. |
| * Returns true if the endpoint was added, and this datacenter does not require further replicas. |
| */ |
| boolean addEndpointAndCheckIfDone(InetAddressAndPort ep, Pair<String,String> location, Range<Token> replicatedRange) |
| { |
| if (done()) |
| return false; |
| |
| if (replicas.endpoints().contains(ep)) |
| // Cannot repeat a node. |
| return false; |
| |
| Replica replica = new Replica(ep, replicatedRange, rfLeft > transients); |
| |
| if (racks.add(location)) |
| { |
| // New rack. |
| --rfLeft; |
| replicas.add(replica, Conflict.NONE); |
| return done(); |
| } |
| if (acceptableRackRepeats <= 0) |
| // There must be rfLeft distinct racks left, do not add any more rack repeats. |
| return false; |
| |
| replicas.add(replica, Conflict.NONE); |
| // Added a node that is from an already met rack to match RF when there aren't enough racks. |
| --acceptableRackRepeats; |
| --rfLeft; |
| return done(); |
| } |
| |
| boolean done() |
| { |
| assert rfLeft >= 0; |
| return rfLeft == 0; |
| } |
| } |
| |
| /** |
| * calculate endpoints in one pass through the tokens by tracking our progress in each DC. |
| */ |
| @Override |
| public EndpointsForRange calculateNaturalReplicas(Token searchToken, TokenMetadata tokenMetadata) |
| { |
| // we want to preserve insertion order so that the first added endpoint becomes primary |
| ArrayList<Token> sortedTokens = tokenMetadata.sortedTokens(); |
| Token replicaEnd = TokenMetadata.firstToken(sortedTokens, searchToken); |
| Token replicaStart = tokenMetadata.getPredecessor(replicaEnd); |
| Range<Token> replicatedRange = new Range<>(replicaStart, replicaEnd); |
| |
| EndpointsForRange.Builder builder = new EndpointsForRange.Builder(replicatedRange); |
| Set<Pair<String, String>> seenRacks = new HashSet<>(); |
| |
| Topology topology = tokenMetadata.getTopology(); |
| // all endpoints in each DC, so we can check when we have exhausted all the members of a DC |
| Multimap<String, InetAddressAndPort> allEndpoints = topology.getDatacenterEndpoints(); |
| // all racks in a DC so we can check when we have exhausted all racks in a DC |
| Map<String, ImmutableMultimap<String, InetAddressAndPort>> racks = topology.getDatacenterRacks(); |
| assert !allEndpoints.isEmpty() && !racks.isEmpty() : "not aware of any cluster members"; |
| |
| int dcsToFill = 0; |
| Map<String, DatacenterEndpoints> dcs = new HashMap<>(datacenters.size() * 2); |
| |
| // Create a DatacenterEndpoints object for each non-empty DC. |
| for (Map.Entry<String, ReplicationFactor> en : datacenters.entrySet()) |
| { |
| String dc = en.getKey(); |
| ReplicationFactor rf = en.getValue(); |
| int nodeCount = sizeOrZero(allEndpoints.get(dc)); |
| |
| if (rf.allReplicas <= 0 || nodeCount <= 0) |
| continue; |
| |
| DatacenterEndpoints dcEndpoints = new DatacenterEndpoints(rf, sizeOrZero(racks.get(dc)), nodeCount, builder, seenRacks); |
| dcs.put(dc, dcEndpoints); |
| ++dcsToFill; |
| } |
| |
| Iterator<Token> tokenIter = TokenMetadata.ringIterator(sortedTokens, searchToken, false); |
| while (dcsToFill > 0 && tokenIter.hasNext()) |
| { |
| Token next = tokenIter.next(); |
| InetAddressAndPort ep = tokenMetadata.getEndpoint(next); |
| Pair<String, String> location = topology.getLocation(ep); |
| DatacenterEndpoints dcEndpoints = dcs.get(location.left); |
| if (dcEndpoints != null && dcEndpoints.addEndpointAndCheckIfDone(ep, location, replicatedRange)) |
| --dcsToFill; |
| } |
| return builder.build(); |
| } |
| |
| private int sizeOrZero(Multimap<?, ?> collection) |
| { |
| return collection != null ? collection.asMap().size() : 0; |
| } |
| |
| private int sizeOrZero(Collection<?> collection) |
| { |
| return collection != null ? collection.size() : 0; |
| } |
| |
| @Override |
| public ReplicationFactor getReplicationFactor() |
| { |
| return aggregateRf; |
| } |
| |
| public ReplicationFactor getReplicationFactor(String dc) |
| { |
| ReplicationFactor replicas = datacenters.get(dc); |
| return replicas == null ? ReplicationFactor.ZERO : replicas; |
| } |
| |
| public Set<String> getDatacenters() |
| { |
| return datacenters.keySet(); |
| } |
| |
| @Override |
| public Collection<String> recognizedOptions() |
| { |
| // only valid options are valid DC names. |
| return Datacenters.getValidDatacenters(); |
| } |
| |
| /** |
| * Support datacenter auto-expansion for CASSANDRA-14303. This hook allows us to safely auto-expand |
| * the "replication_factor" options out into the known datacenters. It is called via reflection from |
| * {@link AbstractReplicationStrategy#prepareReplicationStrategyOptions(Class, Map, Map)}. |
| * |
| * @param options The proposed strategy options that will be potentially mutated |
| * @param previousOptions Any previous strategy options in the case of an ALTER statement |
| */ |
| protected static void prepareOptions(Map<String, String> options, Map<String, String> previousOptions) |
| { |
| String replication = options.remove(REPLICATION_FACTOR); |
| |
| if (replication == null && options.size() == 0) |
| { |
| // Support direct alters from SimpleStrategy to NTS |
| replication = previousOptions.get(REPLICATION_FACTOR); |
| } |
| else if (replication != null) |
| { |
| // When datacenter auto-expansion occurs in e.g. an ALTER statement (meaning that the previousOptions |
| // map is not empty) we choose not to alter existing datacenter replication levels for safety. |
| previousOptions.entrySet().stream() |
| .filter(e -> !e.getKey().equals(REPLICATION_FACTOR)) // SimpleStrategy conversions |
| .forEach(e -> options.putIfAbsent(e.getKey(), e.getValue())); |
| } |
| |
| if (replication != null) { |
| ReplicationFactor defaultReplicas = ReplicationFactor.fromString(replication); |
| Datacenters.getValidDatacenters() |
| .forEach(dc -> options.putIfAbsent(dc, defaultReplicas.toParseableString())); |
| } |
| |
| options.values().removeAll(Collections.singleton("0")); |
| } |
| |
| @Override |
| protected void validateExpectedOptions() throws ConfigurationException |
| { |
| // Do not accept query with no data centers specified. |
| if (this.configOptions.isEmpty()) |
| { |
| throw new ConfigurationException("Configuration for at least one datacenter must be present"); |
| } |
| |
| // Validate the data center names |
| super.validateExpectedOptions(); |
| } |
| |
| @Override |
| public void validateOptions() throws ConfigurationException |
| { |
| for (Entry<String, String> e : this.configOptions.entrySet()) |
| { |
| // prepareOptions should have transformed any "replication_factor" by now |
| if (e.getKey().equalsIgnoreCase(REPLICATION_FACTOR)) |
| throw new ConfigurationException(REPLICATION_FACTOR + " should not appear as an option to NetworkTopologyStrategy"); |
| validateReplicationFactor(e.getValue()); |
| } |
| } |
| |
| @Override |
| public void maybeWarnOnOptions() |
| { |
| if (!SchemaConstants.isSystemKeyspace(keyspaceName)) |
| { |
| ImmutableMultimap<String, InetAddressAndPort> dcsNodes = Multimaps.index(StorageService.instance.getTokenMetadata().getAllMembers(), snitch::getDatacenter); |
| for (Entry<String, String> e : this.configOptions.entrySet()) |
| { |
| |
| String dc = e.getKey(); |
| ReplicationFactor rf = getReplicationFactor(dc); |
| int nodeCount = dcsNodes.get(dc).size(); |
| // nodeCount==0 on many tests |
| if (rf.fullReplicas > nodeCount && nodeCount != 0) |
| { |
| String msg = "Your replication factor " + rf.fullReplicas |
| + " for keyspace " |
| + keyspaceName |
| + " is higher than the number of nodes " |
| + nodeCount |
| + " for datacenter " |
| + dc; |
| ClientWarn.instance.warn(msg); |
| logger.warn(msg); |
| } |
| } |
| } |
| } |
| |
| @Override |
| public boolean hasSameSettings(AbstractReplicationStrategy other) |
| { |
| return super.hasSameSettings(other) && ((NetworkTopologyStrategy) other).datacenters.equals(datacenters); |
| } |
| } |