blob: 2a3d0be54b829a62356b8c40f4578004d26654ae [file] [log] [blame]
/*
* Licensed to the Apache Software Foundation (ASF) under one
* or more contributor license agreements. See the NOTICE file
* distributed with this work for additional information
* regarding copyright ownership. The ASF licenses this file
* to you under the Apache License, Version 2.0 (the
* "License"); you may not use this file except in compliance
* with the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing,
* software distributed under the License is distributed on an
* "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
* KIND, either express or implied. See the License for the
* specific language governing permissions and limitations
* under the License.
*/
package org.apache.cassandra.spark.data.partitioner;
import java.io.IOException;
import java.io.ObjectInputStream;
import java.io.ObjectOutputStream;
import java.io.Serializable;
import java.math.BigInteger;
import java.util.ArrayList;
import java.util.Collection;
import java.util.Collections;
import java.util.Comparator;
import java.util.HashMap;
import java.util.List;
import java.util.Map;
import java.util.stream.Collectors;
import com.google.common.base.Preconditions;
import com.google.common.collect.ArrayListMultimap;
import com.google.common.collect.Multimap;
import com.google.common.collect.Range;
import com.google.common.collect.RangeMap;
import com.google.common.collect.TreeRangeMap;
import org.apache.commons.lang.builder.EqualsBuilder;
import org.apache.commons.lang.builder.HashCodeBuilder;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
import com.esotericsoftware.kryo.Kryo;
import com.esotericsoftware.kryo.io.Input;
import com.esotericsoftware.kryo.io.Output;
import org.apache.cassandra.spark.data.ReplicationFactor;
import org.apache.cassandra.spark.utils.RangeUtils;
import static org.apache.cassandra.spark.data.ReplicationFactor.ReplicationStrategy.SimpleStrategy;
/**
* CassandraRing is designed to have one unique way of handling
* Cassandra token/topology information across all Cassandra tooling.
* This class is made Serializable so it's easy to use it from Hadoop/Spark.
* As Cassandra token ranges are dependent on Replication strategy, ring makes sense for a specific keyspace only.
* It is made to be immutable for the sake of simplicity.
* <p>
* Token ranges are calculated assuming Cassandra racks are not being used, but controlled by assigning tokens properly.
* <p>
* {@link #equals(Object)} and {@link #hashCode()} don't take {@link #replicas} and {@link #tokenRangeMap}
* into consideration as they are just derived fields.
*/
@SuppressWarnings({"UnstableApiUsage", "unused", "WeakerAccess"})
public class CassandraRing implements Serializable
{
private static final Logger LOGGER = LoggerFactory.getLogger(CassandraRing.class);
private Partitioner partitioner;
private String keyspace;
private ReplicationFactor replicationFactor;
private ArrayList<CassandraInstance> instances;
private transient RangeMap<BigInteger, List<CassandraInstance>> replicas;
private transient Multimap<CassandraInstance, Range<BigInteger>> tokenRangeMap;
/**
* Add a replica with given range to replicaMap (RangeMap pointing to replicas).
* <p>
* replicaMap starts with full range (representing complete ring) with empty list of replicas. So, it is
* guaranteed that range will match one or many ranges in replicaMap.
* <p>
* Scheme to add a new replica for a range:
* * Find overlapping rangeMap entries from replicaMap
* * For each overlapping range, create new replica list by adding new replica to the existing list and add it
* back to replicaMap.
*/
private static void addReplica(CassandraInstance replica,
Range<BigInteger> range,
RangeMap<BigInteger, List<CassandraInstance>> replicaMap)
{
Preconditions.checkArgument(range.lowerEndpoint().compareTo(range.upperEndpoint()) <= 0,
"Range calculations assume range is not wrapped");
RangeMap<BigInteger, List<CassandraInstance>> replicaRanges = replicaMap.subRangeMap(range);
RangeMap<BigInteger, List<CassandraInstance>> mappingsToAdd = TreeRangeMap.create();
replicaRanges.asMapOfRanges().forEach((key, value) -> {
List<CassandraInstance> replicas = new ArrayList<>(value);
replicas.add(replica);
mappingsToAdd.put(key, replicas);
});
replicaMap.putAll(mappingsToAdd);
}
public CassandraRing(Partitioner partitioner,
String keyspace,
ReplicationFactor replicationFactor,
Collection<CassandraInstance> instances)
{
this.partitioner = partitioner;
this.keyspace = keyspace;
this.replicationFactor = replicationFactor;
this.instances = instances.stream()
.sorted(Comparator.comparing(instance -> new BigInteger(instance.token())))
.collect(Collectors.toCollection(ArrayList::new));
this.init();
}
private void init()
{
// Setup token range map
replicas = TreeRangeMap.create();
tokenRangeMap = ArrayListMultimap.create();
// Calculate instance to token ranges mapping
switch (replicationFactor.getReplicationStrategy())
{
case SimpleStrategy:
tokenRangeMap.putAll(RangeUtils.calculateTokenRanges(instances,
replicationFactor.getTotalReplicationFactor(),
partitioner));
break;
case NetworkTopologyStrategy:
for (String dataCenter : dataCenters())
{
int rf = replicationFactor.getOptions().get(dataCenter);
if (rf == 0)
{
continue;
}
List<CassandraInstance> dcInstances = instances.stream()
.filter(instance -> instance.dataCenter().matches(dataCenter))
.collect(Collectors.toList());
tokenRangeMap.putAll(RangeUtils.calculateTokenRanges(dcInstances,
replicationFactor.getOptions().get(dataCenter),
partitioner));
}
break;
default:
throw new UnsupportedOperationException("Unsupported replication strategy");
}
// Calculate token range to replica mapping
replicas.put(Range.closed(partitioner.minToken(), partitioner.maxToken()), Collections.emptyList());
tokenRangeMap.asMap().forEach((instance, ranges) -> ranges.forEach(range -> addReplica(instance, range, replicas)));
}
public Partitioner partitioner()
{
return partitioner;
}
public String keyspace()
{
return keyspace;
}
public Collection<CassandraInstance> instances()
{
return instances;
}
public Collection<CassandraInstance> getReplicas(BigInteger token)
{
return replicas.get(token);
}
public RangeMap<BigInteger, List<CassandraInstance>> rangeMap()
{
return replicas;
}
public ReplicationFactor replicationFactor()
{
return replicationFactor;
}
public RangeMap<BigInteger, List<CassandraInstance>> getSubRanges(Range<BigInteger> tokenRange)
{
return replicas.subRangeMap(tokenRange);
}
public Multimap<CassandraInstance, Range<BigInteger>> tokenRanges()
{
return tokenRangeMap;
}
private Collection<String> dataCenters()
{
return replicationFactor.getReplicationStrategy() == SimpleStrategy
? Collections.emptySet()
: replicationFactor.getOptions().keySet();
}
public Collection<BigInteger> tokens()
{
return instances.stream()
.map(CassandraInstance::token)
.map(BigInteger::new)
.sorted()
.collect(Collectors.toList());
}
public Collection<BigInteger> tokens(String dataCenter)
{
Preconditions.checkArgument(replicationFactor.getReplicationStrategy() != SimpleStrategy,
"Datacenter tokens doesn't make sense for SimpleStrategy");
return instances.stream()
.filter(instance -> instance.dataCenter().matches(dataCenter))
.map(CassandraInstance::token)
.map(BigInteger::new)
.collect(Collectors.toList());
}
@Override
public boolean equals(Object other)
{
if (other == null)
{
return false;
}
if (this == other)
{
return true;
}
if (this.getClass() != other.getClass())
{
return false;
}
CassandraRing that = (CassandraRing) other;
return new EqualsBuilder()
.append(this.partitioner, that.partitioner)
.append(this.keyspace, that.keyspace)
.append(this.replicationFactor, that.replicationFactor)
.append(this.instances, that.instances)
.append(this.replicas, that.replicas)
.append(this.tokenRangeMap, that.tokenRangeMap)
.isEquals();
}
@Override
public int hashCode()
{
return new HashCodeBuilder()
.append(partitioner)
.append(keyspace)
.append(replicationFactor)
.append(instances)
.append(replicas)
.append(tokenRangeMap)
.toHashCode();
}
private void readObject(ObjectInputStream in) throws IOException, ClassNotFoundException
{
LOGGER.warn("Falling back to JDK deserialization");
this.partitioner = in.readByte() == 0 ? Partitioner.RandomPartitioner : Partitioner.Murmur3Partitioner;
this.keyspace = in.readUTF();
ReplicationFactor.ReplicationStrategy strategy = ReplicationFactor.ReplicationStrategy.valueOf(in.readByte());
int optionCount = in.readByte();
Map<String, Integer> options = new HashMap<>(optionCount);
for (int option = 0; option < optionCount; option++)
{
options.put(in.readUTF(), (int) in.readByte());
}
this.replicationFactor = new ReplicationFactor(strategy, options);
int numInstances = in.readShort();
this.instances = new ArrayList<>(numInstances);
for (int instance = 0; instance < numInstances; instance++)
{
this.instances.add(new CassandraInstance(in.readUTF(), in.readUTF(), in.readUTF()));
}
this.init();
}
private void writeObject(ObjectOutputStream out) throws IOException, ClassNotFoundException
{
LOGGER.warn("Falling back to JDK serialization");
out.writeByte(this.partitioner == Partitioner.RandomPartitioner ? 0 : 1);
out.writeUTF(this.keyspace);
out.writeByte(this.replicationFactor.getReplicationStrategy().value);
Map<String, Integer> options = this.replicationFactor.getOptions();
out.writeByte(options.size());
for (Map.Entry<String, Integer> option : options.entrySet())
{
out.writeUTF(option.getKey());
out.writeByte(option.getValue());
}
out.writeShort(this.instances.size());
for (CassandraInstance instance : this.instances)
{
out.writeUTF(instance.token());
out.writeUTF(instance.nodeName());
out.writeUTF(instance.dataCenter());
}
}
public static class Serializer extends com.esotericsoftware.kryo.Serializer<CassandraRing>
{
@Override
public void write(Kryo kryo, Output out, CassandraRing ring)
{
out.writeByte(ring.partitioner == Partitioner.RandomPartitioner ? 1 : 0);
out.writeString(ring.keyspace);
kryo.writeObject(out, ring.replicationFactor);
kryo.writeObject(out, ring.instances);
}
@Override
@SuppressWarnings("unchecked")
public CassandraRing read(Kryo kryo, Input in, Class<CassandraRing> type)
{
return new CassandraRing(in.readByte() == 1 ? Partitioner.RandomPartitioner
: Partitioner.Murmur3Partitioner,
in.readString(),
kryo.readObject(in, ReplicationFactor.class),
kryo.readObject(in, ArrayList.class));
}
}
}