blob: a2a2512aa2653a29fe4bc99776f2b1c97daf6fed [file] [log] [blame]
/*
* Licensed to the Apache Software Foundation (ASF) under one
* or more contributor license agreements. See the NOTICE file
* distributed with this work for additional information
* regarding copyright ownership. The ASF licenses this file
* to you under the Apache License, Version 2.0 (the
* "License"); you may not use this file except in compliance
* with the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/
package org.apache.cassandra.repair;
import java.net.InetAddress;
import java.security.MessageDigest;
import java.util.ArrayList;
import java.util.List;
import java.util.Random;
import com.google.common.annotations.VisibleForTesting;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
import org.apache.cassandra.concurrent.Stage;
import org.apache.cassandra.concurrent.StageManager;
import org.apache.cassandra.db.ColumnFamilyStore;
import org.apache.cassandra.db.DecoratedKey;
import org.apache.cassandra.db.rows.UnfilteredRowIterator;
import org.apache.cassandra.db.rows.UnfilteredRowIterators;
import org.apache.cassandra.dht.Range;
import org.apache.cassandra.dht.Token;
import org.apache.cassandra.net.MessagingService;
import org.apache.cassandra.repair.messages.ValidationComplete;
import org.apache.cassandra.tracing.Tracing;
import org.apache.cassandra.utils.FBUtilities;
import org.apache.cassandra.utils.MerkleTree;
import org.apache.cassandra.utils.MerkleTree.RowHash;
import org.apache.cassandra.utils.MerkleTrees;
/**
* Handles the building of a merkle tree for a column family.
*
* Lifecycle:
* 1. prepare() - Initialize tree with samples.
* 2. add() - 0 or more times, to add hashes to the tree.
* 3. complete() - Enqueues any operations that were blocked waiting for a valid tree.
*/
public class Validator implements Runnable
{
private static final Logger logger = LoggerFactory.getLogger(Validator.class);
public final RepairJobDesc desc;
public final InetAddress initiator;
public final int gcBefore;
private final boolean evenTreeDistribution;
// null when all rows with the min token have been consumed
private long validated;
private MerkleTrees trees;
// current range being updated
private MerkleTree.TreeRange range;
// iterator for iterating sub ranges (MT's leaves)
private MerkleTrees.TreeRangeIterator ranges;
// last key seen
private DecoratedKey lastKey;
public Validator(RepairJobDesc desc, InetAddress initiator, int gcBefore)
{
this(desc, initiator, gcBefore, false);
}
public Validator(RepairJobDesc desc, InetAddress initiator, int gcBefore, boolean evenTreeDistribution)
{
this.desc = desc;
this.initiator = initiator;
this.gcBefore = gcBefore;
validated = 0;
range = null;
ranges = null;
this.evenTreeDistribution = evenTreeDistribution;
}
public void prepare(ColumnFamilyStore cfs, MerkleTrees tree)
{
this.trees = tree;
if (!tree.partitioner().preservesOrder() || evenTreeDistribution)
{
// You can't beat an even tree distribution for md5
tree.init();
}
else
{
List<DecoratedKey> keys = new ArrayList<>();
Random random = new Random();
for (Range<Token> range : tree.ranges())
{
for (DecoratedKey sample : cfs.keySamples(range))
{
assert range.contains(sample.getToken()) : "Token " + sample.getToken() + " is not within range " + desc.ranges;
keys.add(sample);
}
if (keys.isEmpty())
{
// use an even tree distribution
tree.init(range);
}
else
{
int numKeys = keys.size();
// sample the column family using random keys from the index
while (true)
{
DecoratedKey dk = keys.get(random.nextInt(numKeys));
if (!tree.split(dk.getToken()))
break;
}
keys.clear();
}
}
}
logger.debug("Prepared AEService trees of size {} for {}", trees.size(), desc);
ranges = tree.invalids();
}
/**
* Called (in order) for every row present in the CF.
* Hashes the row, and adds it to the tree being built.
*
* @param partition Partition to add hash
*/
public void add(UnfilteredRowIterator partition)
{
assert Range.isInRanges(partition.partitionKey().getToken(), desc.ranges) : partition.partitionKey().getToken() + " is not contained in " + desc.ranges;
assert lastKey == null || lastKey.compareTo(partition.partitionKey()) < 0
: "partition " + partition.partitionKey() + " received out of order wrt " + lastKey;
lastKey = partition.partitionKey();
if (range == null)
range = ranges.next();
// generate new ranges as long as case 1 is true
if (!findCorrectRange(lastKey.getToken()))
{
// add the empty hash, and move to the next range
ranges = trees.invalids();
findCorrectRange(lastKey.getToken());
}
assert range.contains(lastKey.getToken()) : "Token not in MerkleTree: " + lastKey.getToken();
// case 3 must be true: mix in the hashed row
RowHash rowHash = rowHash(partition);
if (rowHash != null)
{
range.addHash(rowHash);
}
}
public boolean findCorrectRange(Token t)
{
while (!range.contains(t) && ranges.hasNext())
{
range = ranges.next();
}
return range.contains(t);
}
static class CountingDigest extends MessageDigest
{
private long count;
private MessageDigest underlying;
public CountingDigest(MessageDigest underlying)
{
super(underlying.getAlgorithm());
this.underlying = underlying;
}
@Override
protected void engineUpdate(byte input)
{
underlying.update(input);
count += 1;
}
@Override
protected void engineUpdate(byte[] input, int offset, int len)
{
underlying.update(input, offset, len);
count += len;
}
@Override
protected byte[] engineDigest()
{
return underlying.digest();
}
@Override
protected void engineReset()
{
underlying.reset();
}
}
private MerkleTree.RowHash rowHash(UnfilteredRowIterator partition)
{
validated++;
// MerkleTree uses XOR internally, so we want lots of output bits here
CountingDigest digest = new CountingDigest(FBUtilities.newMessageDigest("SHA-256"));
UnfilteredRowIterators.digest(null, partition, digest, MessagingService.current_version);
// only return new hash for merkle tree in case digest was updated - see CASSANDRA-8979
return digest.count > 0
? new MerkleTree.RowHash(partition.partitionKey().getToken(), digest.digest(), digest.count)
: null;
}
/**
* Registers the newly created tree for rendezvous in Stage.ANTIENTROPY.
*/
public void complete()
{
completeTree();
StageManager.getStage(Stage.ANTI_ENTROPY).execute(this);
if (logger.isDebugEnabled())
{
// log distribution of rows in tree
logger.debug("Validated {} partitions for {}. Partitions per leaf are:", validated, desc.sessionId);
trees.logRowCountPerLeaf(logger);
logger.debug("Validated {} partitions for {}. Partition sizes are:", validated, desc.sessionId);
trees.logRowSizePerLeaf(logger);
}
}
@VisibleForTesting
public void completeTree()
{
assert ranges != null : "Validator was not prepared()";
ranges = trees.invalids();
while (ranges.hasNext())
{
range = ranges.next();
range.ensureHashInitialised();
}
}
/**
* Called when some error during the validation happened.
* This sends RepairStatus to inform the initiator that the validation has failed.
* The actual reason for failure should be looked up in the log of the host calling this function.
*/
public void fail()
{
logger.error("Failed creating a merkle tree for {}, {} (see log for details)", desc, initiator);
// send fail message only to nodes >= version 2.0
MessagingService.instance().sendOneWay(new ValidationComplete(desc).createMessage(), initiator);
}
/**
* Called after the validation lifecycle to respond with the now valid tree. Runs in Stage.ANTIENTROPY.
*/
public void run()
{
// respond to the request that triggered this validation
if (!initiator.equals(FBUtilities.getBroadcastAddress()))
{
logger.info("[repair #{}] Sending completed merkle tree to {} for {}.{}", desc.sessionId, initiator, desc.keyspace, desc.columnFamily);
Tracing.traceRepair("Sending completed merkle tree to {} for {}.{}", initiator, desc.keyspace, desc.columnFamily);
}
MessagingService.instance().sendOneWay(new ValidationComplete(desc, trees).createMessage(), initiator);
}
}