blob: be5a553ba5f9475549fdc7f1a3e905058473b518 [file] [log] [blame]
/*
* Licensed to the Apache Software Foundation (ASF) under one
* or more contributor license agreements. See the NOTICE file
* distributed with this work for additional information
* regarding copyright ownership. The ASF licenses this file
* to you under the Apache License, Version 2.0 (the
* "License"); you may not use this file except in compliance
* with the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/
package org.apache.cassandra.repair.asymmetric;
import java.net.UnknownHostException;
import java.util.AbstractMap;
import java.util.ArrayList;
import java.util.Arrays;
import java.util.Collection;
import java.util.Collections;
import java.util.Comparator;
import java.util.HashMap;
import java.util.HashSet;
import java.util.Iterator;
import java.util.List;
import java.util.Map;
import java.util.NavigableSet;
import java.util.Set;
import java.util.TreeSet;
import com.google.common.collect.ImmutableMap;
import com.google.common.collect.Sets;
import org.junit.Test;
import org.apache.cassandra.dht.Murmur3Partitioner;
import org.apache.cassandra.dht.Range;
import org.apache.cassandra.dht.Token;
import org.apache.cassandra.locator.InetAddressAndPort;
import static junit.framework.TestCase.fail;
import static org.junit.Assert.assertEquals;
import static org.junit.Assert.assertTrue;
public class ReduceHelperTest
{
private static final InetAddressAndPort[] addresses;
private static final InetAddressAndPort A;
private static final InetAddressAndPort B;
private static final InetAddressAndPort C;
private static final InetAddressAndPort D;
private static final InetAddressAndPort E;
static
{
try
{
A = InetAddressAndPort.getByName("127.0.0.0");
B = InetAddressAndPort.getByName("127.0.0.1");
C = InetAddressAndPort.getByName("127.0.0.2");
D = InetAddressAndPort.getByName("127.0.0.3");
E = InetAddressAndPort.getByName("127.0.0.4");
// for diff creation in loops:
addresses = new InetAddressAndPort[]{ A, B, C, D, E };
}
catch (UnknownHostException e)
{
throw new RuntimeException(e);
}
}
@Test
public void testSimpleReducing()
{
/*
A == B and D == E =>
A streams from C, {D, E} since D==E
B streams from C, {D, E} since D==E
C streams from {A, B}, {D, E} since A==B and D==E
D streams from {A, B}, C since A==B
E streams from {A, B}, C since A==B
A B C D E
A = x x x
B x x x
C x x
D =
*/
Map<InetAddressAndPort, HostDifferences> differences = new HashMap<>();
for (int i = 0; i < 4; i++)
{
HostDifferences hostDiffs = new HostDifferences();
for (int j = i + 1; j < 5; j++)
{
// no diffs between A, B and D, E:
if (addresses[i] == A && addresses[j] == B || addresses[i] == D && addresses[j] == E)
continue;
List<Range<Token>> diff = list(new Range<>(new Murmur3Partitioner.LongToken(0), new Murmur3Partitioner.LongToken(10)));
hostDiffs.add(addresses[j], diff);
}
differences.put(addresses[i], hostDiffs);
}
DifferenceHolder differenceHolder = new DifferenceHolder(differences);
Map<InetAddressAndPort, IncomingRepairStreamTracker> tracker = ReduceHelper.createIncomingRepairStreamTrackers(differenceHolder);
assertEquals(set(set(C), set(E,D)), streams(tracker.get(A)));
assertEquals(set(set(C), set(E,D)), streams(tracker.get(B)));
assertEquals(set(set(A,B), set(E,D)), streams(tracker.get(C)));
assertEquals(set(set(A,B), set(C)), streams(tracker.get(D)));
assertEquals(set(set(A,B), set(C)), streams(tracker.get(E)));
ImmutableMap<InetAddressAndPort, HostDifferences> reduced = ReduceHelper.reduce(differenceHolder, (x, y) -> y);
HostDifferences n0 = reduced.get(A);
assertEquals(0, n0.get(A).size());
assertEquals(0, n0.get(B).size());
assertTrue(n0.get(C).size() > 0);
assertStreamFromEither(n0.get(D), n0.get(E));
HostDifferences n1 = reduced.get(B);
assertEquals(0, n1.get(A).size());
assertEquals(0, n1.get(B).size());
assertTrue(n1.get(C).size() > 0);
assertStreamFromEither(n1.get(D), n1.get(E));
HostDifferences n2 = reduced.get(C);
// we are either streaming from node 0 or node 1, not both:
assertStreamFromEither(n2.get(A), n2.get(B));
assertEquals(0, n2.get(C).size());
assertStreamFromEither(n2.get(D), n2.get(E));
HostDifferences n3 = reduced.get(D);
assertStreamFromEither(n3.get(A), n3.get(B));
assertTrue(n3.get(C).size() > 0);
assertEquals(0, n3.get(D).size());
assertEquals(0, n3.get(E).size());
HostDifferences n4 = reduced.get(E);
assertStreamFromEither(n4.get(A), n4.get(B));
assertTrue(n4.get(C).size() > 0);
assertEquals(0, n4.get(D).size());
assertEquals(0, n4.get(E).size());
}
@Test
public void testSimpleReducingWithPreferedNodes()
{
/*
A == B and D == E =>
A streams from C, {D, E} since D==E
B streams from C, {D, E} since D==E
C streams from {A, B}, {D, E} since A==B and D==E
D streams from {A, B}, C since A==B
E streams from {A, B}, C since A==B
A B C D E
A = x x x
B x x x
C x x
D =
*/
Map<InetAddressAndPort, HostDifferences> differences = new HashMap<>();
for (int i = 0; i < 4; i++)
{
HostDifferences hostDifferences = new HostDifferences();
for (int j = i + 1; j < 5; j++)
{
// no diffs between A, B and D, E:
if (addresses[i] == A && addresses[j] == B || addresses[i] == D && addresses[j] == E)
continue;
List<Range<Token>> diff = list(new Range<>(new Murmur3Partitioner.LongToken(0), new Murmur3Partitioner.LongToken(10)));
hostDifferences.add(addresses[j], diff);
}
differences.put(addresses[i], hostDifferences);
}
DifferenceHolder differenceHolder = new DifferenceHolder(differences);
Map<InetAddressAndPort, IncomingRepairStreamTracker> tracker = ReduceHelper.createIncomingRepairStreamTrackers(differenceHolder);
assertEquals(set(set(C), set(E, D)), streams(tracker.get(A)));
assertEquals(set(set(C), set(E, D)), streams(tracker.get(B)));
assertEquals(set(set(A, B), set(E, D)), streams(tracker.get(C)));
assertEquals(set(set(A, B), set(C)), streams(tracker.get(D)));
assertEquals(set(set(A, B), set(C)), streams(tracker.get(E)));
// if there is an option, never stream from node 1:
ImmutableMap<InetAddressAndPort, HostDifferences> reduced = ReduceHelper.reduce(differenceHolder, (x,y) -> Sets.difference(y, set(B)));
HostDifferences n0 = reduced.get(A);
assertEquals(0, n0.get(A).size());
assertEquals(0, n0.get(B).size());
assertTrue(n0.get(C).size() > 0);
assertStreamFromEither(n0.get(D), n0.get(E));
HostDifferences n1 = reduced.get(B);
assertEquals(0, n1.get(A).size());
assertEquals(0, n1.get(B).size());
assertTrue(n1.get(C).size() > 0);
assertStreamFromEither(n1.get(D), n1.get(E));
HostDifferences n2 = reduced.get(C);
assertTrue(n2.get(A).size() > 0);
assertEquals(0, n2.get(B).size());
assertEquals(0, n2.get(C).size());
assertStreamFromEither(n2.get(D), n2.get(E));
HostDifferences n3 = reduced.get(D);
assertTrue(n3.get(A).size() > 0);
assertEquals(0, n3.get(B).size());
assertTrue(n3.get(C).size() > 0);
assertEquals(0, n3.get(D).size());
assertEquals(0, n3.get(E).size());
HostDifferences n4 = reduced.get(E);
assertTrue(n4.get(A).size() > 0);
assertEquals(0, n4.get(B).size());
assertTrue(n4.get(C).size() > 0);
assertEquals(0, n4.get(D).size());
assertEquals(0, n4.get(E).size());
}
private Iterable<Set<InetAddressAndPort>> streams(IncomingRepairStreamTracker incomingRepairStreamTracker)
{
return incomingRepairStreamTracker.getIncoming().values().iterator().next().allStreams();
}
@Test
public void testOverlapDifference()
{
/*
|A |B |C
---+------+------+--------
A |= |50,100|0,50
B | |= |0,100
C | | |=
A needs to stream (50, 100] from B, (0, 50] from C
B needs to stream (50, 100] from A, (0, 100] from C
C needs to stream (0, 50] from A, (0, 100] from B
A == B on (0, 50] => C can stream (0, 50] from either A or B
A == C on (50, 100] => B can stream (50, 100] from either A or C
=>
A streams (50, 100] from {B}, (0, 50] from C
B streams (0, 50] from {C}, (50, 100] from {A, C}
C streams (0, 50] from {A, B}, (50, 100] from B
*/
Map<InetAddressAndPort, HostDifferences> differences = new HashMap<>();
addDifference(A, differences, B, list(range(50, 100)));
addDifference(A, differences, C, list(range(0, 50)));
addDifference(B, differences, C, list(range(0, 100)));
DifferenceHolder differenceHolder = new DifferenceHolder(differences);
Map<InetAddressAndPort, IncomingRepairStreamTracker> tracker = ReduceHelper.createIncomingRepairStreamTrackers(differenceHolder);
assertEquals(set(set(C)), tracker.get(A).getIncoming().get(range(0, 50)).allStreams());
assertEquals(set(set(B)), tracker.get(A).getIncoming().get(range(50, 100)).allStreams());
assertEquals(set(set(C)), tracker.get(B).getIncoming().get(range(0, 50)).allStreams());
assertEquals(set(set(A,C)), tracker.get(B).getIncoming().get(range(50, 100)).allStreams());
assertEquals(set(set(A,B)), tracker.get(C).getIncoming().get(range(0, 50)).allStreams());
assertEquals(set(set(B)), tracker.get(C).getIncoming().get(range(50, 100)).allStreams());
ImmutableMap<InetAddressAndPort, HostDifferences> reduced = ReduceHelper.reduce(differenceHolder, (x, y) -> y);
HostDifferences n0 = reduced.get(A);
assertTrue(n0.get(B).equals(set(range(50, 100))));
assertTrue(n0.get(C).equals(set(range(0, 50))));
HostDifferences n1 = reduced.get(B);
assertEquals(0, n1.get(B).size());
if (!n1.get(A).isEmpty())
{
assertTrue(n1.get(C).equals(set(range(0, 50))));
assertTrue(n1.get(A).equals(set(range(50, 100))));
}
else
{
assertTrue(n1.get(C).equals(set(range(0, 50), range(50, 100))));
}
HostDifferences n2 = reduced.get(C);
assertEquals(0, n2.get(C).size());
if (!n2.get(A).isEmpty())
{
assertTrue(n2.get(A).equals(set(range(0,50))));
assertTrue(n2.get(B).equals(set(range(50, 100))));
}
else
{
assertTrue(n2.get(A).equals(set(range(0, 50), range(50, 100))));
}
}
@Test
public void testOverlapDifference2()
{
/*
|A |B |C
---+----------------+----------------+------------------
A |= |5,45 |0,10 40,50
B | |= |0,5 10,40 45,50
C | | |=
A needs to stream (5, 45] from B, (0, 10], (40, 50) from C
B needs to stream (5, 45] from A, (0, 5], (10, 40], (45, 50] from C
C needs to stream (0, 10], (40,50] from A, (0,5], (10,40], (45,50] from B
A == B on (0, 5], (45, 50]
A == C on (10, 40]
B == C on (5, 10], (40, 45]
*/
Map<InetAddressAndPort, HostDifferences> differences = new HashMap<>();
addDifference(A, differences, B, list(range(5, 45)));
addDifference(A, differences, C, list(range(0, 10), range(40,50)));
addDifference(B, differences, C, list(range(0, 5), range(10,40), range(45,50)));
DifferenceHolder differenceHolder = new DifferenceHolder(differences);
Map<InetAddressAndPort, IncomingRepairStreamTracker> tracker = ReduceHelper.createIncomingRepairStreamTrackers(differenceHolder);
Map<Range<Token>, StreamFromOptions> ranges = tracker.get(A).getIncoming();
assertEquals(5, ranges.size());
assertEquals(set(set(C)), ranges.get(range(0, 5)).allStreams());
assertEquals(set(set(B, C)), ranges.get(range(5, 10)).allStreams());
assertEquals(set(set(B)), ranges.get(range(10, 40)).allStreams());
assertEquals(set(set(B, C)), ranges.get(range(40, 45)).allStreams());
assertEquals(set(set(C)), ranges.get(range(45, 50)).allStreams());
ranges = tracker.get(B).getIncoming();
assertEquals(5, ranges.size());
assertEquals(set(set(C)), ranges.get(range(0, 5)).allStreams());
assertEquals(set(set(A)), ranges.get(range(5, 10)).allStreams());
assertEquals(set(set(A, C)), ranges.get(range(10, 40)).allStreams());
assertEquals(set(set(A)), ranges.get(range(40, 45)).allStreams());
assertEquals(set(set(C)), ranges.get(range(45, 50)).allStreams());
ranges = tracker.get(C).getIncoming();
assertEquals(5, ranges.size());
assertEquals(set(set(A, B)), ranges.get(range(0, 5)).allStreams());
assertEquals(set(set(A)), ranges.get(range(5, 10)).allStreams());
assertEquals(set(set(B)), ranges.get(range(10, 40)).allStreams());
assertEquals(set(set(A)), ranges.get(range(40, 45)).allStreams());
assertEquals(set(set(A,B)), ranges.get(range(45, 50)).allStreams());
ImmutableMap<InetAddressAndPort, HostDifferences> reduced = ReduceHelper.reduce(differenceHolder, (x, y) -> y);
assertNoOverlap(A, reduced.get(A), list(range(0, 50)));
assertNoOverlap(B, reduced.get(B), list(range(0, 50)));
assertNoOverlap(C, reduced.get(C), list(range(0, 50)));
}
private void assertNoOverlap(InetAddressAndPort incomingNode, HostDifferences node, List<Range<Token>> expectedAfterNormalize)
{
Set<Range<Token>> allRanges = new HashSet<>();
Set<InetAddressAndPort> remoteNodes = Sets.newHashSet(A,B,C);
remoteNodes.remove(incomingNode);
Iterator<InetAddressAndPort> iter = remoteNodes.iterator();
allRanges.addAll(node.get(iter.next()));
InetAddressAndPort i = iter.next();
for (Range<Token> r : node.get(i))
{
for (Range<Token> existing : allRanges)
if (r.intersects(existing))
fail();
}
allRanges.addAll(node.get(i));
List<Range<Token>> normalized = Range.normalize(allRanges);
assertEquals(expectedAfterNormalize, normalized);
}
@SafeVarargs
private static List<Range<Token>> list(Range<Token> r, Range<Token> ... rs)
{
List<Range<Token>> ranges = new ArrayList<>();
ranges.add(r);
Collections.addAll(ranges, rs);
return ranges;
}
private static Set<InetAddressAndPort> set(InetAddressAndPort ... elem)
{
return Sets.newHashSet(elem);
}
@SafeVarargs
private static Set<Set<InetAddressAndPort>> set(Set<InetAddressAndPort> ... elem)
{
Set<Set<InetAddressAndPort>> ret = Sets.newHashSet();
ret.addAll(Arrays.asList(elem));
return ret;
}
@SafeVarargs
private static NavigableSet<Range<Token>> set(Range<Token> ... ranges)
{
NavigableSet<Range<Token>> res = new TreeSet<>(Comparator.comparing(o -> o.left));
res.addAll(Arrays.asList(ranges));
return res;
}
static Murmur3Partitioner.LongToken longtok(long l)
{
return new Murmur3Partitioner.LongToken(l);
}
static Range<Token> range(long t, long t2)
{
return new Range<>(longtok(t), longtok(t2));
}
static Map.Entry<Range<Token>, StreamFromOptions> rangeEntry(long t, long t2)
{
return new AbstractMap.SimpleEntry<>(range(t, t2), new StreamFromOptions(null, null));
}
@Test
public void testSubtractAllRanges()
{
Set<Map.Entry<Range<Token>, StreamFromOptions>> ranges = new HashSet<>();
ranges.add(rangeEntry(10, 20)); ranges.add(rangeEntry(40, 60));
assertEquals(0, RangeDenormalizer.subtractFromAllRanges(ranges, range(0, 100)).size());
ranges.add(rangeEntry(90, 110));
assertEquals(Sets.newHashSet(range(100, 110)), RangeDenormalizer.subtractFromAllRanges(ranges, range(0, 100)));
ranges.add(rangeEntry(-10, 10));
assertEquals(Sets.newHashSet(range(-10, 0), range(100, 110)), RangeDenormalizer.subtractFromAllRanges(ranges, range(0, 100)));
}
private void assertStreamFromEither(Collection<Range<Token>> r1, Collection<Range<Token>> r2)
{
assertTrue(r1.size() > 0 ^ r2.size() > 0);
}
private void addDifference(InetAddressAndPort host1, Map<InetAddressAndPort, HostDifferences> differences, InetAddressAndPort host2, List<Range<Token>> ranges)
{
differences.computeIfAbsent(host1, (x) -> new HostDifferences()).add(host2, ranges);
}
}