blob: 9bcefb6aa4638e45d0984a361c4fb8cd419aa089 [file] [log] [blame]
* Licensed to the Apache Software Foundation (ASF) under one or more
* contributor license agreements. See the NOTICE file distributed with this
* work for additional information regarding copyright ownership. The ASF
* licenses this file to You under the Apache License, Version 2.0 (the
* "License"); you may not use this file except in compliance with the License.
* You may obtain a copy of the License at
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS, WITHOUT
* WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. See the
* License for the specific language governing permissions and limitations
* under the License.
import java.util.ArrayList;
import java.util.Collections;
import java.util.HashMap;
import java.util.List;
import java.util.Map;
import java.util.Random;
import java.util.Set;
import java.util.concurrent.Callable;
import java.util.concurrent.ExecutorService;
import java.util.concurrent.Future;
import java.util.concurrent.LinkedBlockingQueue;
import java.util.concurrent.ThreadPoolExecutor;
import java.util.concurrent.TimeUnit;
import org.apache.cassandra.schema.SchemaConstants;
import org.apache.cassandra.db.SystemKeyspace;
import org.apache.cassandra.dht.ByteOrderedPartitioner;
import org.apache.cassandra.dht.IPartitioner;
import org.apache.cassandra.dht.Murmur3Partitioner;
import org.apache.cassandra.dht.OrderPreservingPartitioner;
import org.apache.cassandra.dht.Range;
import org.apache.cassandra.dht.Token;
import org.apache.cassandra.dht.Token.TokenFactory;
import org.apache.hugegraph.backend.BackendException;
import org.apache.hugegraph.util.Bytes;
import com.datastax.driver.core.Host;
import com.datastax.driver.core.Metadata;
import com.datastax.driver.core.ResultSet;
import com.datastax.driver.core.Row;
import com.datastax.driver.core.TokenRange;
* CassandraShard is used for cassandra scanning operations.
* Each shard represents a range of tokens for a node.
* Reading data from a given shard does not cross multiple nodes.
* <p>
* Refer to AbstractColumnFamilyInputFormat from:
* <a href="">...</a>
public class CassandraShard {
/** The minimal shard size should >= 1M to prevent too many number of shards */
private static final int MIN_SHARD_SIZE = (int) Bytes.MB;
private CassandraSessionPool.Session session;
private String keyspace;
private String table;
private IPartitioner partitioner;
public CassandraShard(CassandraSessionPool.Session session,
String keyspace, String table) {
this.session = session;
this.keyspace = keyspace;
this.table = table;
this.partitioner = new Murmur3Partitioner();
* Get splits of a table
* @param splitPartitions: expected partitions count per split
* @param splitSize: expected size(bytes) per split,
* splitPartitions will be ignored if splitSize is passed
* @return a list of Shard
public List<Shard> getSplits(long splitPartitions, long splitSize) {
// Canonical ranges, split into pieces, fetch the splits in parallel
ExecutorService executor = new ThreadPoolExecutor(0, 128, 60L, TimeUnit.SECONDS,
new LinkedBlockingQueue<>());
List<Shard> splits = new ArrayList<>();
try {
List<Future<List<Shard>>> futures = new ArrayList<>();
// Canonical ranges and nodes holding replicas
Map<TokenRange, Set<Host>> masterRangeNodes = getRangeMap();
for (TokenRange range : masterRangeNodes.keySet()) {
* For each token range, pick a live owner and ask it to
* compute bite-sized splits.
futures.add(executor.submit(new SplitCallable(
range, splitPartitions, splitSize)));
// Wait until we have all the results back
for (Future<List<Shard>> future : futures) {
try {
} catch (Exception e) {
throw new BackendException("Can't get cassandra shards", e);
assert splits.size() > masterRangeNodes.size();
} finally {
Collections.shuffle(splits, new Random(System.nanoTime()));
return splits;
* Get splits of a table in specified range
* NOTE: maybe we don't need this method
* @param start: the start of range
* @param end: the end of range
* @param splitPartitions: expected partitions count per split
* @param splitSize: expected size(bytes) per split,
* splitPartitions will be ignored if splitSize is passed
* @return a list of Shard
public List<Shard> getSplits(String start, String end,
int splitPartitions, int splitSize) {
ExecutorService executor = new ThreadPoolExecutor(0, 128, 60L, TimeUnit.SECONDS,
new LinkedBlockingQueue<>());
List<Shard> splits = new ArrayList<>();
try {
List<Future<List<Shard>>> futures = new ArrayList<>();
TokenFactory tokenFactory = this.partitioner.getTokenFactory();
TokenRange tokenRange = rangeToTokenRange(new Range<>(
// Canonical ranges and nodes holding replicas
Map<TokenRange, Set<Host>> masterRangeNodes = getRangeMap();
for (TokenRange range : masterRangeNodes.keySet()) {
for (TokenRange r : range.intersectWith(tokenRange)) {
// For each tokenRange, pick a live owner and ask it
// to compute bite-sized splits
futures.add(executor.submit(new SplitCallable(
r, splitPartitions, splitSize)));
// Wait until we have all the results back
for (Future<List<Shard>> future : futures) {
try {
} catch (Exception e) {
throw new BackendException("Can't get cassandra shards", e);
assert splits.size() >= masterRangeNodes.size();
} finally {
Collections.shuffle(splits, new Random(System.nanoTime()));
return splits;
private boolean isPartitionerOpp() {
return this.partitioner instanceof OrderPreservingPartitioner ||
this.partitioner instanceof ByteOrderedPartitioner;
private TokenRange rangeToTokenRange(Range<Token> range) {
TokenFactory tokenFactory = this.partitioner.getTokenFactory();
Metadata metadata = this.session.metadata();
return metadata.newTokenRange(
private Map<TokenRange, Long> getSubSplits(TokenRange tokenRange,
long splitPartitions,
long splitSize) {
try {
return describeSplits(this.session, this.keyspace, this.table,
splitPartitions, splitSize, tokenRange);
} catch (Exception e) {
throw new RuntimeException(e);
private Map<TokenRange, Set<Host>> getRangeMap() {
Metadata metadata = this.session.metadata();
return metadata.getTokenRanges().stream().collect(Collectors.toMap(
p -> p,
p -> metadata.getReplicas('"' + this.keyspace + '"', p)));
private static Map<TokenRange, Long> describeSplits(
CassandraSessionPool.Session session,
String keyspace,
String table,
long splitPartitions,
long splitSize,
TokenRange tokenRange) {
String query = String.format(
"SELECT mean_partition_size, partitions_count FROM %s.%s " +
"WHERE keyspace_name = ? AND table_name = ? AND " +
"range_start = ? AND range_end = ?",
ResultSet resultSet = session.execute(query, keyspace, table,
Row row =;
long meanPartitionSize;
long partitionsCount = 0L;
long splitCount = 0L;
if (row != null) {
meanPartitionSize = row.getLong("mean_partition_size");
partitionsCount = row.getLong("partitions_count");
assert splitSize <= 0 || splitSize >= MIN_SHARD_SIZE;
splitCount = splitSize > 0 ?
(meanPartitionSize * partitionsCount / splitSize) :
(partitionsCount / splitPartitions);
* If we have no data on this split or the size estimate is 0,
* return the full split i.e., do not sub-split
* Assume the smallest granularity of partition count available from CASSANDRA-7688.
if (splitCount == 0) {
return ImmutableMap.of(tokenRange, (long) 128);
List<TokenRange> ranges = tokenRange.splitEvenly((int) splitCount);
Map<TokenRange, Long> rangesWithLength = new HashMap<>();
for (TokenRange range : ranges) {
// Add a sub-range (with its partitions count per sub-range)
rangesWithLength.put(range, partitionsCount / splitCount);
return rangesWithLength;
* Gets a token tokenRange and splits it up according to the suggested size
* into input splits that Hugegraph can use.
class SplitCallable implements Callable<List<Shard>> {
private final TokenRange tokenRange;
private final long splitPartitions;
private final long splitSize;
public SplitCallable(TokenRange tokenRange,
long splitPartitions, long splitSize) {
if (splitSize <= 0 && splitPartitions <= 0) {
throw new IllegalArgumentException(String.format(
"The split-partitions must be > 0, but got %s",
if (splitSize > 0 && splitSize < MIN_SHARD_SIZE) {
// splitSize should be at least 1M if passed
throw new IllegalArgumentException(String.format(
"The split-size must be >= %s bytes, but got %s",
MIN_SHARD_SIZE, splitSize));
this.tokenRange = tokenRange;
this.splitPartitions = splitPartitions;
this.splitSize = splitSize;
public List<Shard> call() throws Exception {
ArrayList<Shard> splits = new ArrayList<>();
Map<TokenRange, Long> subSplits = getSubSplits(
for (Map.Entry<TokenRange, Long> entry : subSplits.entrySet()) {
List<TokenRange> ranges = entry.getKey().unwrap();
for (TokenRange subrange : ranges) {
String start = !isPartitionerOpp() ?
subrange.getStart().toString() :
String end = !isPartitionerOpp() ?
subrange.getEnd().toString() :
long length = entry.getValue();
splits.add(new Shard(start, end, length));
return splits;