blob: 1c4bca7a2133d0a0b0ce48bd29293681fbdf49d0 [file] [log] [blame]
/*
* Licensed to the Apache Software Foundation (ASF) under one
* or more contributor license agreements. See the NOTICE file
* distributed with this work for additional information
* regarding copyright ownership. The ASF licenses this file
* to you under the Apache License, Version 2.0 (the
* "License"); you may not use this file except in compliance
* with the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/
package org.apache.cassandra.repair.asymmetric;
import java.util.ArrayList;
import java.util.Collection;
import java.util.Collections;
import java.util.Comparator;
import java.util.HashMap;
import java.util.HashSet;
import java.util.List;
import java.util.Map;
import java.util.Set;
import com.google.common.annotations.VisibleForTesting;
import com.google.common.collect.ImmutableMap;
import org.apache.cassandra.dht.Range;
import org.apache.cassandra.dht.Token;
import org.apache.cassandra.locator.InetAddressAndPort;
/**
* Basic idea is that we track incoming ranges instead of blindly just exchanging the ranges that mismatch between two nodes
*
* Say node X has tracked that it will stream range r1 from node Y. Now we see find a diffing range
* r1 between node X and Z. When adding r1 from Z as an incoming to X we check if Y and Z are equal on range r (ie, there is
* no difference between them). If they are equal X can stream from Y or Z and the end result will be the same.
*
* The ranges wont match perfectly since we don't iterate over leaves so we always split based on the
* smallest range (either the new difference or the existing one)
*/
public class ReduceHelper
{
/**
* Reduces the differences provided by the merkle trees to a minimum set of differences
*/
public static ImmutableMap<InetAddressAndPort, HostDifferences> reduce(DifferenceHolder differences, PreferedNodeFilter filter)
{
Map<InetAddressAndPort, IncomingRepairStreamTracker> trackers = createIncomingRepairStreamTrackers(differences);
ImmutableMap.Builder<InetAddressAndPort, HostDifferences> mapBuilder = ImmutableMap.builder();
for (Map.Entry<InetAddressAndPort, IncomingRepairStreamTracker> trackerEntry : trackers.entrySet())
{
IncomingRepairStreamTracker tracker = trackerEntry.getValue();
HostDifferences rangesToFetch = new HostDifferences();
for (Map.Entry<Range<Token>, StreamFromOptions> entry : tracker.getIncoming().entrySet())
{
Range<Token> rangeToFetch = entry.getKey();
// StreamFromOptions contains a Set<Set<InetAddress>> with endpoints we need to stream
// rangeToFetch from - if the inner set size > 1 means those endpoints are identical
// for the range. pickConsistent picks a single endpoint from each of these sets.
for (InetAddressAndPort remoteNode : pickConsistent(trackerEntry.getKey(), entry.getValue(), filter))
rangesToFetch.addSingleRange(remoteNode, rangeToFetch);
}
mapBuilder.put(trackerEntry.getKey(), rangesToFetch);
}
return mapBuilder.build();
}
@VisibleForTesting
static Map<InetAddressAndPort, IncomingRepairStreamTracker> createIncomingRepairStreamTrackers(DifferenceHolder differences)
{
Map<InetAddressAndPort, IncomingRepairStreamTracker> trackers = new HashMap<>();
for (InetAddressAndPort hostWithDifference : differences.keyHosts())
{
HostDifferences hostDifferences = differences.get(hostWithDifference);
for (InetAddressAndPort differingHost : hostDifferences.hosts())
{
Iterable<Range<Token>> differingRanges = hostDifferences.get(differingHost);
// hostWithDifference has mismatching ranges differingRanges with differingHost:
for (Range<Token> range : differingRanges)
{
// a difference means that we need to sync that range between two nodes - add the diffing range to both
// hosts:
getTracker(differences, trackers, hostWithDifference).addIncomingRangeFrom(range, differingHost);
getTracker(differences, trackers, differingHost).addIncomingRangeFrom(range, hostWithDifference);
}
}
}
return trackers;
}
private static IncomingRepairStreamTracker getTracker(DifferenceHolder differences,
Map<InetAddressAndPort, IncomingRepairStreamTracker> trackers,
InetAddressAndPort host)
{
return trackers.computeIfAbsent(host, (h) -> new IncomingRepairStreamTracker(differences));
}
private static final Comparator<InetAddressAndPort> comparator = Comparator.comparing(InetAddressAndPort::getHostAddressAndPort);
/**
* Consistently picks the node after the streaming node to stream from
*
* this is done to reduce the amount of sstables created on the receiving node
*
* todo: note that this can be improved - if we have a case like:
* addr3 will stream range1 from addr1 or addr2
* range2 from addr1
* in a perfect world we would stream both range1 and range2 from addr1 - but in this case we might stream
* range1 from addr2 depending on how the addresses sort
*/
private static Collection<InetAddressAndPort> pickConsistent(InetAddressAndPort streamingNode,
StreamFromOptions toStreamFrom,
PreferedNodeFilter filter)
{
Set<InetAddressAndPort> retSet = new HashSet<>();
for (Set<InetAddressAndPort> toStream : toStreamFrom.allStreams())
{
List<InetAddressAndPort> toSearch = new ArrayList<>(filter.apply(streamingNode, toStream));
if (toSearch.isEmpty())
toSearch = new ArrayList<>(toStream);
toSearch.sort(comparator);
int pos = Collections.binarySearch(toSearch, streamingNode, comparator);
assert pos < 0;
pos = -pos - 1;
if (pos == toSearch.size())
retSet.add(toSearch.get(0));
else
retSet.add(toSearch.get(pos));
}
return retSet;
}
}