blob: a46437ca09d93064f71839f106df41f768746ad0 [file] [log] [blame]
* Licensed to the Apache Software Foundation (ASF) under one
* or more contributor license agreements. See the NOTICE file
* distributed with this work for additional information
* regarding copyright ownership. The ASF licenses this file
* to you under the Apache License, Version 2.0 (the
* "License"); you may not use this file except in compliance
* with the License. You may obtain a copy of the License at
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* See the License for the specific language governing permissions and
* limitations under the License.
package org.apache.cassandra.sidecar.adapters.base;
import java.math.BigInteger;
import java.util.ArrayList;
import java.util.Collection;
import java.util.Collections;
import java.util.HashMap;
import java.util.HashSet;
import java.util.Iterator;
import java.util.List;
import java.util.Map;
import java.util.Objects;
import java.util.PriorityQueue;
import java.util.Set;
import java.util.function.Consumer;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
import org.jetbrains.annotations.NotNull;
* Representation of a token range (exclusive start and inclusive end - (start, end]) and the
* corresponding mapping to replica-set hosts. Static factory ensures that ranges are always unwrapped.
* Note: Range comparisons are used for ordering of ranges. eg. A.compareTo(B) <= 0 implies that
* range A occurs before range B, not their sizes.
public class TokenRangeReplicas implements Comparable<TokenRangeReplicas>
private final BigInteger start;
private final BigInteger end;
private final Partitioner partitioner;
private final Set<String> replicaSet;
private static final Logger LOGGER = LoggerFactory.getLogger(TokenRangeReplicas.class);
private TokenRangeReplicas(BigInteger start, BigInteger end, Partitioner partitioner, Set<String> replicaSet)
this.start = start;
this.end = end;
this.partitioner = partitioner;
this.replicaSet = replicaSet;
public static List<TokenRangeReplicas> generateTokenRangeReplicas(BigInteger start,
BigInteger end,
Partitioner partitioner,
Set<String> replicaSet)
if (start.compareTo(end) >= 0)
return unwrapRange(start, end, partitioner, replicaSet);
}"Generating replica-map for range: {} - {} : Replicaset: {}", start, end, replicaSet);
return Collections.singletonList(new TokenRangeReplicas(start, end, partitioner, replicaSet));
public BigInteger start()
return start;
public BigInteger end()
return end;
public Set<String> replicaSet()
return replicaSet;
* {@inheritDoc}
public int compareTo(@NotNull TokenRangeReplicas other)
int compareStart = this.start.compareTo(other.start);
return (compareStart != 0) ? compareStart : this.end.compareTo(other.end);
* {@inheritDoc}
public boolean equals(Object o)
if (this == o)
return true;
if (o == null || getClass() != o.getClass())
return false;
TokenRangeReplicas that = (TokenRangeReplicas) o;
return Objects.equals(start, that.start)
&& Objects.equals(end, that.end)
&& partitioner == that.partitioner
&& replicaSet.equals(that.replicaSet);
* {@inheritDoc}
public int hashCode()
return Objects.hash(start, end, partitioner);
private void validateRangesForComparison(@NotNull TokenRangeReplicas other)
if (this.partitioner != other.partitioner)
throw new IllegalStateException("Token ranges being compared do not have the same partitioner");
boolean contains(TokenRangeReplicas other)
return (other.start.compareTo(this.start) >= 0 && other.end.compareTo(this.end) <= 0);
* Determines intersection if the next range starts before the current range ends. This method assumes that
* the provided ranges are sorted and unwrapped.
* When the current range goes all the way to the end, we determine intersection if the next range starts
* after the current since all subsequent ranges have to be subsets.
* @param other the range we are currently processing to check if "this" intersects it
* @return true if "this" range intersects the other
boolean intersects(TokenRangeReplicas other)
boolean inOrder = this.compareTo(other) <= 0;
TokenRangeReplicas first = inOrder ? this : other;
TokenRangeReplicas last = inOrder ? other : this;
return first.end.compareTo(last.start) > 0 && first.start.compareTo(last.end) < 0;
* Unwraps the token range if it wraps-around to end either on or after the least token by overriding such
* ranges to end at the partitioner max-token value in the former case and splitting into 2 ranges in the latter
* case.
* @return list of split ranges
private static List<TokenRangeReplicas> unwrapRange(BigInteger start,
BigInteger end,
Partitioner partitioner,
Set<String> replicaSet)
// Range ending at minToken is "unwrapped" to end at the maxToken.
// Note: These being open-closed ranges, this will result in exclusion of partitioner's minToken from
// allocation. This is by-design as it is never assigned to a node in Cassandra:
if (end.compareTo(partitioner.minToken) == 0)
return Collections.singletonList(
new TokenRangeReplicas(start, partitioner.maxToken, partitioner, replicaSet));
else if (start.compareTo(partitioner.maxToken) == 0)
return Collections.singletonList(
new TokenRangeReplicas(partitioner.minToken, end, partitioner, replicaSet));
// Wrap-around range goes beyond at the "min-token" and is therefore split into two.
List<TokenRangeReplicas> unwrapped = new ArrayList<>(2);
unwrapped.add(new TokenRangeReplicas(start, partitioner.maxToken, partitioner, replicaSet));
unwrapped.add(new TokenRangeReplicas(partitioner.minToken, end, partitioner, replicaSet));
return unwrapped;
* Given a list of token ranges with replica-sets, normalizes them by unwrapping around the beginning/min
* of the range and removing overlaps to return a sorted list of non-overlapping ranges.
* <p>
* For an overlapping range that is included in both natural and pending ranges, say R_natural and R_pending
* (where R_natural == R_pending), the replicas of both R_natural and R_pending should receive writes.
* Therefore, the write-replicas of such range is the union of both replica sets.
* This method implements the consolidation process.
* @param ranges
* @return sorted list of non-overlapping ranges and replica-sets
public static List<TokenRangeReplicas> normalize(List<TokenRangeReplicas> ranges)
if ( -> r.partitioner.minToken.compareTo(r.start()) == 0))
LOGGER.warn("{} based minToken does not exist in the token ranges",
return deoverlap(ranges);
* Given a list of unwrapped (around the starting/min value) token ranges and their replica-sets, return list of
* ranges with no overlaps. Any impacted range absorbs the replica-sets from the overlapping range.
* This is to ensure that we have most coverage while using the replica-sets as write-replicas.
* Overlaps are removed by splitting the original range around the overlap boundaries, resulting in sub-ranges
* with replicas from all the overlapping replicas.
* <pre>
* Illustration:
* Input with C overlapping with A and B
* |----------A-----------||----------B-------------|
* |--------C----------|
* Split result: C is split first which further splits A and B to create
* |-----------A----------||----------B-------------|
* |---C---|----C'----|
* Subsets C & C' are merged into supersets A and B by splitting them. Replica-sets for A,C and B,C are merged
* for the resulting ranges.
* |-----A------|----AC---||---BC-----|-----B------|
* </pre>
private static List<TokenRangeReplicas> deoverlap(List<TokenRangeReplicas> allRanges)
if (allRanges.isEmpty())
return allRanges;
LOGGER.debug("Token ranges to be normalized: {}", allRanges);
List<TokenRangeReplicas> ranges = mergeIdenticalRanges(allRanges);
List<TokenRangeReplicas> output = new ArrayList<>();
Iterator<TokenRangeReplicas> iter = ranges.iterator();
TokenRangeReplicas current =;
while (iter.hasNext())
TokenRangeReplicas next =;
if (!current.intersects(next))
current = next;
current = processIntersectingRanges(output, iter, current, next);
if (current != null)
return output;
private static List<TokenRangeReplicas> mergeIdenticalRanges(List<TokenRangeReplicas> ranges)
Map<TokenRangeReplicas, Set<String>> rangeMapping = new HashMap<>();
for (TokenRangeReplicas r: ranges)
if (!rangeMapping.containsKey(r))
rangeMapping.put(r, r.replicaSet);
List<TokenRangeReplicas> merged = new ArrayList<>();
for (Map.Entry<TokenRangeReplicas, Set<String>> entry : rangeMapping.entrySet())
TokenRangeReplicas r = entry.getKey();
if (!r.replicaSet().equals(entry.getValue()))
return merged;
* Splits intersecting token ranges starting from the provided cursors and the iterator, while accumulating
* overlapping replicas into each sub-range.
* <p>
* The algorithm 1) extracts all intersecting ranges at the provided cursor, and 2) Maintains a min-heap of all
* intersecting ranges ordered by the end of the range, so that the least common sub-range relative to the current
* range can be extracted.
* @param output ongoing list of resulting non-overlapping ranges
* @param iter iterator over the list of ranges
* @param current cursor to the current, intersecting range
* @param next cursor to the intersecting range after the current range
* @return cursor to the subsequent non-intersecting range
static TokenRangeReplicas processIntersectingRanges(List<TokenRangeReplicas> output,
Iterator<TokenRangeReplicas> iter,
TokenRangeReplicas current,
TokenRangeReplicas next)
// min-heap with a comparator comparing the ends of ranges
PriorityQueue<TokenRangeReplicas> rangeHeap =
new PriorityQueue<>((n1, n2) -> (!n1.end.equals(n2.end())) ?
n1.end().compareTo(n2.end()) : n1.compareTo(n2));
List<TokenRangeReplicas> intersectingRanges = new ArrayList<>();
next = extractIntersectingRanges(intersectingRanges::add, iter, current, next);
rangeHeap.add(intersectingRanges.get(0)); -> {
if (!rangeHeap.isEmpty())
TokenRangeReplicas range = rangeHeap.peek();
// Use the last processed range's end as the new range's start
// Except when its the first range, in which case, we use the queue-head's start
BigInteger newStart = output.isEmpty() ? range.start() : output.get(output.size() - 1).end();
if (r.start().compareTo(rangeHeap.peek().end()) == 0)
output.add(new TokenRangeReplicas(newStart,
else if (r.start().compareTo(rangeHeap.peek().end()) > 0)
output.add(new TokenRangeReplicas(newStart,
// Start-token is before the first intersecting range end. We have not encountered end of the range, so
// it is not removed from the heap yet.
if (newStart.compareTo(r.start()) != 0)
output.add(new TokenRangeReplicas(newStart,
// Remaining intersecting ranges from heap are processed
while (!rangeHeap.isEmpty())
LOGGER.debug("Non-empty heap while resolving intersecting ranges:" + rangeHeap.size());
TokenRangeReplicas nextVal = rangeHeap.peek();
BigInteger newStart = output.isEmpty() ? nextVal.start() : output.get(output.size() - 1).end();
// Corner case w/ common end ranges - we do not add redundant single token range
if (newStart.compareTo(nextVal.end()) != 0)
output.add(new TokenRangeReplicas(newStart,
return next;
* Extract all the intersecting ranges starting from the current cursor, which we know is intersecting with the
* next range. Note that the cursor is moved forward until a non-intersecting range is found.
* @param rangeConsumer functional interface to collect candidate intersecting ranges
* @param iter ongoing iterator over the entire range-set
* @param current cursor to the current, intersecting range
* @param next cursor to the next intersecting range
* @return list of intersecting ranges starting at the specified cursor
private static TokenRangeReplicas extractIntersectingRanges(Consumer<TokenRangeReplicas> rangeConsumer,
Iterator<TokenRangeReplicas> iter,
TokenRangeReplicas current,
TokenRangeReplicas next)
// we know that current and next intersect
current = (current.contains(next)) ? current : next;
next = null;
while (iter.hasNext())
next =;
if (!current.intersects(next))
// when next is subset of current, we keep tracking current
current = (current.contains(next)) ? current : next;
next = null;
return next;
// TODO: Verify why we need all replicas from queue
private static Set<String> getBatchReplicas(PriorityQueue<TokenRangeReplicas> rangeHeap)
private static Set<String> mergeReplicas(TokenRangeReplicas current, TokenRangeReplicas next)
Set<String> merged = new HashSet<>(current.replicaSet);
return merged;
* {@inheritDoc}
public String toString()
return String.format("Range(%s, %s]: %s:%s", start.toString(), end.toString(), replicaSet, partitioner);