blob: d2a80583af9af0a6ec99c492074c3680f825e458 [file] [log] [blame]
/*
* Licensed to the Apache Software Foundation (ASF) under one
* or more contributor license agreements. See the NOTICE file
* distributed with this work for additional information
* regarding copyright ownership. The ASF licenses this file
* to you under the Apache License, Version 2.0 (the
* "License"); you may not use this file except in compliance
* with the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/
package org.apache.cassandra.utils;
import java.io.ByteArrayOutputStream;
import java.io.IOException;
import java.util.*;
import com.google.common.annotations.VisibleForTesting;
import com.google.common.collect.AbstractIterator;
import com.google.common.collect.PeekingIterator;
import org.slf4j.Logger;
import org.apache.cassandra.db.TypeSizes;
import org.apache.cassandra.dht.IPartitioner;
import org.apache.cassandra.dht.Range;
import org.apache.cassandra.dht.Token;
import org.apache.cassandra.io.IVersionedSerializer;
import org.apache.cassandra.io.util.DataInputPlus;
import org.apache.cassandra.io.util.DataOutputPlus;
/**
* Wrapper class for handling of multiple MerkleTrees at once.
*
* The MerkleTree's are divided in Ranges of non-overlapping tokens.
*/
public class MerkleTrees implements Iterable<Map.Entry<Range<Token>, MerkleTree>>
{
public static final MerkleTreesSerializer serializer = new MerkleTreesSerializer();
private Map<Range<Token>, MerkleTree> merkleTrees = new TreeMap<>(new TokenRangeComparator());
private IPartitioner partitioner;
/**
* Creates empty MerkleTrees object.
*
* @param partitioner The partitioner to use
*/
public MerkleTrees(IPartitioner partitioner)
{
this(partitioner, new ArrayList<>());
}
private MerkleTrees(IPartitioner partitioner, Collection<MerkleTree> merkleTrees)
{
this.partitioner = partitioner;
addTrees(merkleTrees);
}
/**
* Get the ranges that these merkle trees covers.
*
* @return
*/
public Collection<Range<Token>> ranges()
{
return merkleTrees.keySet();
}
/**
* Get the partitioner in use.
*
* @return
*/
public IPartitioner partitioner()
{
return partitioner;
}
/**
* Add merkle tree's with the defined maxsize and ranges.
*
* @param maxsize
* @param ranges
*/
public void addMerkleTrees(int maxsize, Collection<Range<Token>> ranges)
{
for (Range<Token> range : ranges)
{
addMerkleTree(maxsize, range);
}
}
/**
* Add a MerkleTree with the defined size and range.
*
* @param maxsize
* @param range
* @return The created merkle tree.
*/
public MerkleTree addMerkleTree(int maxsize, Range<Token> range)
{
return addMerkleTree(maxsize, MerkleTree.RECOMMENDED_DEPTH, range);
}
@VisibleForTesting
public MerkleTree addMerkleTree(int maxsize, byte hashdepth, Range<Token> range)
{
MerkleTree tree = new MerkleTree(partitioner, range, hashdepth, maxsize);
addTree(tree);
return tree;
}
/**
* Get the MerkleTree.Range responsible for the given token.
*
* @param t
* @return
*/
@VisibleForTesting
public MerkleTree.TreeRange get(Token t)
{
return getMerkleTree(t).get(t);
}
/**
* Init all MerkleTree's with an even tree distribution.
*/
public void init()
{
for (Range<Token> range : merkleTrees.keySet())
{
init(range);
}
}
/**
* Init a selected MerkleTree with an even tree distribution.
*
* @param range
*/
public void init(Range<Token> range)
{
merkleTrees.get(range).init();
}
/**
* Split the MerkleTree responsible for the given token.
*
* @param t
* @return
*/
public boolean split(Token t)
{
return getMerkleTree(t).split(t);
}
/**
* Invalidate the MerkleTree responsible for the given token.
*
* @param t
*/
@VisibleForTesting
public void invalidate(Token t)
{
getMerkleTree(t).invalidate(t);
}
/**
* Get the MerkleTree responsible for the given token range.
*
* @param range
* @return
*/
public MerkleTree getMerkleTree(Range<Token> range)
{
return merkleTrees.get(range);
}
public long size()
{
long size = 0;
for (MerkleTree tree : merkleTrees.values())
{
size += tree.size();
}
return size;
}
@VisibleForTesting
public void maxsize(Range<Token> range, int maxsize)
{
getMerkleTree(range).maxsize(maxsize);
}
/**
* Get the MerkleTree responsible for the given token.
*
* @param t
* @return The given MerkleTree or null if none exist.
*/
private MerkleTree getMerkleTree(Token t)
{
for (Range<Token> range : merkleTrees.keySet())
{
if (range.contains(t))
return merkleTrees.get(range);
}
throw new AssertionError("Expected tree for token " + t);
}
private void addTrees(Collection<MerkleTree> trees)
{
for (MerkleTree tree : trees)
{
addTree(tree);
}
}
private void addTree(MerkleTree tree)
{
assert validateNonOverlapping(tree) : "Range [" + tree.fullRange + "] is intersecting an existing range";
merkleTrees.put(tree.fullRange, tree);
}
private boolean validateNonOverlapping(MerkleTree tree)
{
for (Range<Token> range : merkleTrees.keySet())
{
if (tree.fullRange.intersects(range))
return false;
}
return true;
}
/**
* Get an iterator for all the invalids generated by the MerkleTrees.
*
* @return
*/
public TreeRangeIterator invalids()
{
return new TreeRangeIterator();
}
/**
* Log the row count per leaf for all MerkleTrees.
*
* @param logger
*/
public void logRowCountPerLeaf(Logger logger)
{
for (MerkleTree tree : merkleTrees.values())
{
tree.histogramOfRowCountPerLeaf().log(logger);
}
}
/**
* Log the row size per leaf for all MerkleTrees.
*
* @param logger
*/
public void logRowSizePerLeaf(Logger logger)
{
for (MerkleTree tree : merkleTrees.values())
{
tree.histogramOfRowSizePerLeaf().log(logger);
}
}
@VisibleForTesting
public byte[] hash(Range<Token> range)
{
ByteArrayOutputStream baos = new ByteArrayOutputStream();
boolean hashed = false;
try
{
for (Range<Token> rt : merkleTrees.keySet())
{
if (rt.intersects(range))
{
byte[] bytes = merkleTrees.get(rt).hash(range);
if (bytes != null)
{
baos.write(bytes);
hashed = true;
}
}
}
}
catch (IOException e)
{
throw new RuntimeException("Unable to append merkle tree hash to result");
}
return hashed ? baos.toByteArray() : null;
}
/**
* Get an iterator of all ranges and their MerkleTrees.
*/
public Iterator<Map.Entry<Range<Token>, MerkleTree>> iterator()
{
return merkleTrees.entrySet().iterator();
}
public long rowCount()
{
long totalCount = 0;
for (MerkleTree tree : merkleTrees.values())
{
totalCount += tree.rowCount();
}
return totalCount;
}
public class TreeRangeIterator extends AbstractIterator<MerkleTree.TreeRange> implements
Iterable<MerkleTree.TreeRange>,
PeekingIterator<MerkleTree.TreeRange>
{
private final Iterator<MerkleTree> it;
private MerkleTree.TreeRangeIterator current = null;
private TreeRangeIterator()
{
it = merkleTrees.values().iterator();
}
public MerkleTree.TreeRange computeNext()
{
if (current == null || !current.hasNext())
return nextIterator();
return current.next();
}
private MerkleTree.TreeRange nextIterator()
{
if (it.hasNext())
{
current = it.next().invalids();
return current.next();
}
return endOfData();
}
public Iterator<MerkleTree.TreeRange> iterator()
{
return this;
}
}
/**
* Get the differences between the two sets of MerkleTrees.
*
* @param ltree
* @param rtree
* @return
*/
public static List<Range<Token>> difference(MerkleTrees ltree, MerkleTrees rtree)
{
List<Range<Token>> differences = new ArrayList<>();
for (MerkleTree tree : ltree.merkleTrees.values())
{
differences.addAll(MerkleTree.difference(tree, rtree.getMerkleTree(tree.fullRange)));
}
return differences;
}
public static class MerkleTreesSerializer implements IVersionedSerializer<MerkleTrees>
{
public void serialize(MerkleTrees trees, DataOutputPlus out, int version) throws IOException
{
out.writeInt(trees.merkleTrees.size());
for (MerkleTree tree : trees.merkleTrees.values())
{
MerkleTree.serializer.serialize(tree, out, version);
}
}
public MerkleTrees deserialize(DataInputPlus in, int version) throws IOException
{
IPartitioner partitioner = null;
int nTrees = in.readInt();
Collection<MerkleTree> trees = new ArrayList<>(nTrees);
if (nTrees > 0)
{
for (int i = 0; i < nTrees; i++)
{
MerkleTree tree = MerkleTree.serializer.deserialize(in, version);
trees.add(tree);
if (partitioner == null)
partitioner = tree.partitioner();
else
assert tree.partitioner() == partitioner;
}
}
return new MerkleTrees(partitioner, trees);
}
public long serializedSize(MerkleTrees trees, int version)
{
assert trees != null;
long size = TypeSizes.sizeof(trees.merkleTrees.size());
for (MerkleTree tree : trees.merkleTrees.values())
{
size += MerkleTree.serializer.serializedSize(tree, version);
}
return size;
}
}
private static class TokenRangeComparator implements Comparator<Range<Token>>
{
@Override
public int compare(Range<Token> rt1, Range<Token> rt2)
{
if (rt1.left.compareTo(rt2.left) == 0)
return 0;
return rt1.compareTo(rt2);
}
}
}