blob: e2b09bc9ee716346bf7deb592f017ecb4e2e59b6 [file] [log] [blame]
/*
* Licensed to the Apache Software Foundation (ASF) under one
* or more contributor license agreements. See the NOTICE file
* distributed with this work for additional information
* regarding copyright ownership. The ASF licenses this file
* to you under the Apache License, Version 2.0 (the
* "License"); you may not use this file except in compliance
* with the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/
package org.apache.cassandra.dht;
import java.net.UnknownHostException;
import java.util.ArrayList;
import java.util.Arrays;
import java.util.Collection;
import java.util.Collections;
import java.util.Map;
import com.google.common.base.Predicate;
import com.google.common.collect.Multimap;
import org.apache.cassandra.locator.EndpointsByRange;
import org.junit.Assert;
import org.junit.BeforeClass;
import org.junit.Test;
import org.apache.cassandra.config.DatabaseDescriptor;
import org.apache.cassandra.locator.AbstractNetworkTopologySnitch;
import org.apache.cassandra.locator.InetAddressAndPort;
import org.apache.cassandra.locator.Replica;
import static org.junit.Assert.assertFalse;
import static org.junit.Assert.assertTrue;
public class RangeFetchMapCalculatorTest
{
@BeforeClass
public static void setupUpSnitch()
{
DatabaseDescriptor.daemonInitialization();
DatabaseDescriptor.setPartitionerUnsafe(RandomPartitioner.instance);
DatabaseDescriptor.setEndpointSnitch(new AbstractNetworkTopologySnitch()
{
//Odd IPs are in DC1 and Even are in DC2. Endpoints upto .14 will have unique racks and
// then will be same for a set of three.
@Override
public String getRack(InetAddressAndPort endpoint)
{
return "RAC1";
}
@Override
public String getDatacenter(InetAddressAndPort endpoint)
{
if (getIPLastPart(endpoint) <= 50)
return DatabaseDescriptor.getLocalDataCenter();
else if (getIPLastPart(endpoint) % 2 == 0)
return DatabaseDescriptor.getLocalDataCenter();
else
return DatabaseDescriptor.getLocalDataCenter() + "Remote";
}
private int getIPLastPart(InetAddressAndPort endpoint)
{
String str = endpoint.address.toString();
int index = str.lastIndexOf(".");
return Integer.parseInt(str.substring(index + 1).trim());
}
});
}
@Test
public void testWithSingleSource() throws Exception
{
EndpointsByRange.Builder rangesWithSources = new EndpointsByRange.Builder();
addNonTrivialRangeAndSources(rangesWithSources, 1, 10, "127.0.0.1");
addNonTrivialRangeAndSources(rangesWithSources, 11, 20, "127.0.0.2");
addNonTrivialRangeAndSources(rangesWithSources, 21, 30, "127.0.0.3");
addNonTrivialRangeAndSources(rangesWithSources, 31, 40, "127.0.0.4");
addNonTrivialRangeAndSources(rangesWithSources, 41, 50, "127.0.0.5");
RangeFetchMapCalculator calculator = new RangeFetchMapCalculator(rangesWithSources.build(), Collections.emptyList(), "Test");
Multimap<InetAddressAndPort, Range<Token>> map = calculator.getRangeFetchMap();
validateRange(rangesWithSources, map);
Assert.assertEquals(4, map.asMap().keySet().size());
}
@Test
public void testWithNonOverlappingSource() throws Exception
{
EndpointsByRange.Builder rangesWithSources = new EndpointsByRange.Builder();
addNonTrivialRangeAndSources(rangesWithSources, 1, 10, "127.0.0.1", "127.0.0.2");
addNonTrivialRangeAndSources(rangesWithSources, 11, 20, "127.0.0.3", "127.0.0.4");
addNonTrivialRangeAndSources(rangesWithSources, 21, 30, "127.0.0.5", "127.0.0.6");
addNonTrivialRangeAndSources(rangesWithSources, 31, 40, "127.0.0.7", "127.0.0.8");
addNonTrivialRangeAndSources(rangesWithSources, 41, 50, "127.0.0.9", "127.0.0.10");
RangeFetchMapCalculator calculator = new RangeFetchMapCalculator(rangesWithSources.build(), Collections.emptyList(), "Test");
Multimap<InetAddressAndPort, Range<Token>> map = calculator.getRangeFetchMap();
validateRange(rangesWithSources, map);
Assert.assertEquals(5, map.asMap().keySet().size());
}
@Test
public void testWithRFThreeReplacement() throws Exception
{
EndpointsByRange.Builder rangesWithSources = new EndpointsByRange.Builder();
addNonTrivialRangeAndSources(rangesWithSources, 1, 10, "127.0.0.1", "127.0.0.2");
addNonTrivialRangeAndSources(rangesWithSources, 11, 20, "127.0.0.2", "127.0.0.3");
addNonTrivialRangeAndSources(rangesWithSources, 21, 30, "127.0.0.3", "127.0.0.4");
RangeFetchMapCalculator calculator = new RangeFetchMapCalculator(rangesWithSources.build(), Collections.emptyList(), "Test");
Multimap<InetAddressAndPort, Range<Token>> map = calculator.getRangeFetchMap();
validateRange(rangesWithSources, map);
//We should validate that it streamed from 3 unique sources
Assert.assertEquals(3, map.asMap().keySet().size());
}
@Test
public void testForMultipleRoundsComputation() throws Exception
{
EndpointsByRange.Builder rangesWithSources = new EndpointsByRange.Builder();
addNonTrivialRangeAndSources(rangesWithSources, 1, 10, "127.0.0.3");
addNonTrivialRangeAndSources(rangesWithSources, 11, 20, "127.0.0.3");
addNonTrivialRangeAndSources(rangesWithSources, 21, 30, "127.0.0.3");
addNonTrivialRangeAndSources(rangesWithSources, 31, 40, "127.0.0.3");
addNonTrivialRangeAndSources(rangesWithSources, 41, 50, "127.0.0.3", "127.0.0.2");
RangeFetchMapCalculator calculator = new RangeFetchMapCalculator(rangesWithSources.build(), Collections.emptyList(), "Test");
Multimap<InetAddressAndPort, Range<Token>> map = calculator.getRangeFetchMap();
validateRange(rangesWithSources, map);
//We should validate that it streamed from 2 unique sources
Assert.assertEquals(2, map.asMap().keySet().size());
assertArrays(Arrays.asList(generateNonTrivialRange(1, 10), generateNonTrivialRange(11, 20), generateNonTrivialRange(21, 30), generateNonTrivialRange(31, 40)),
map.asMap().get(InetAddressAndPort.getByName("127.0.0.3")));
assertArrays(Arrays.asList(generateNonTrivialRange(41, 50)), map.asMap().get(InetAddressAndPort.getByName("127.0.0.2")));
}
@Test
public void testForMultipleRoundsComputationWithLocalHost() throws Exception
{
EndpointsByRange.Builder rangesWithSources = new EndpointsByRange.Builder();
addNonTrivialRangeAndSources(rangesWithSources, 1, 10, "127.0.0.1");
addNonTrivialRangeAndSources(rangesWithSources, 11, 20, "127.0.0.1");
addNonTrivialRangeAndSources(rangesWithSources, 21, 30, "127.0.0.1");
addNonTrivialRangeAndSources(rangesWithSources, 31, 40, "127.0.0.1");
addNonTrivialRangeAndSources(rangesWithSources, 41, 50, "127.0.0.1", "127.0.0.2");
RangeFetchMapCalculator calculator = new RangeFetchMapCalculator(rangesWithSources.build(), Collections.emptyList(), "Test");
Multimap<InetAddressAndPort, Range<Token>> map = calculator.getRangeFetchMap();
validateRange(rangesWithSources, map);
//We should validate that it streamed from only non local host and only one range
Assert.assertEquals(1, map.asMap().keySet().size());
assertArrays(Arrays.asList(generateNonTrivialRange(41, 50)), map.asMap().get(InetAddressAndPort.getByName("127.0.0.2")));
}
@Test
public void testForEmptyGraph() throws Exception
{
EndpointsByRange.Builder rangesWithSources = new EndpointsByRange.Builder();
addNonTrivialRangeAndSources(rangesWithSources, 1, 10, "127.0.0.1");
addNonTrivialRangeAndSources(rangesWithSources, 11, 20, "127.0.0.1");
addNonTrivialRangeAndSources(rangesWithSources, 21, 30, "127.0.0.1");
addNonTrivialRangeAndSources(rangesWithSources, 31, 40, "127.0.0.1");
addNonTrivialRangeAndSources(rangesWithSources, 41, 50, "127.0.0.1");
RangeFetchMapCalculator calculator = new RangeFetchMapCalculator(rangesWithSources.build(), Collections.emptyList(), "Test");
Multimap<InetAddressAndPort, Range<Token>> map = calculator.getRangeFetchMap();
//All ranges map to local host so we will not stream anything.
assertTrue(map.isEmpty());
}
@Test
public void testWithNoSourceWithLocal() throws Exception
{
EndpointsByRange.Builder rangesWithSources = new EndpointsByRange.Builder();
addNonTrivialRangeAndSources(rangesWithSources, 1, 10, "127.0.0.1", "127.0.0.5");
addNonTrivialRangeAndSources(rangesWithSources, 11, 20, "127.0.0.2");
addNonTrivialRangeAndSources(rangesWithSources, 21, 30, "127.0.0.3");
//Return false for all except 127.0.0.5
final RangeStreamer.SourceFilter filter = new RangeStreamer.SourceFilter()
{
public boolean apply(Replica replica)
{
try
{
if (replica.endpoint().equals(InetAddressAndPort.getByName("127.0.0.5")))
return false;
else
return true;
}
catch (UnknownHostException e)
{
return true;
}
}
public String message(Replica replica)
{
return "Doesn't match 127.0.0.5";
}
};
RangeFetchMapCalculator calculator = new RangeFetchMapCalculator(rangesWithSources.build(), Arrays.asList(filter), "Test");
Multimap<InetAddressAndPort, Range<Token>> map = calculator.getRangeFetchMap();
validateRange(rangesWithSources, map);
//We should validate that it streamed from only non local host and only one range
Assert.assertEquals(2, map.asMap().keySet().size());
assertArrays(Arrays.asList(generateNonTrivialRange(11, 20)), map.asMap().get(InetAddressAndPort.getByName("127.0.0.2")));
assertArrays(Arrays.asList(generateNonTrivialRange(21, 30)), map.asMap().get(InetAddressAndPort.getByName("127.0.0.3")));
}
@Test (expected = IllegalStateException.class)
public void testWithNoLiveSource() throws Exception
{
EndpointsByRange.Builder rangesWithSources = new EndpointsByRange.Builder();
addNonTrivialRangeAndSources(rangesWithSources, 1, 10, "127.0.0.5");
addNonTrivialRangeAndSources(rangesWithSources, 11, 20, "127.0.0.2");
addNonTrivialRangeAndSources(rangesWithSources, 21, 30, "127.0.0.3");
final RangeStreamer.SourceFilter allDeadFilter = new RangeStreamer.SourceFilter()
{
public boolean apply(Replica replica)
{
return false;
}
public String message(Replica replica)
{
return "All dead";
}
};
RangeFetchMapCalculator calculator = new RangeFetchMapCalculator(rangesWithSources.build(), Arrays.asList(allDeadFilter), "Test");
calculator.getRangeFetchMap();
}
@Test
public void testForLocalDC() throws Exception
{
EndpointsByRange.Builder rangesWithSources = new EndpointsByRange.Builder();
addNonTrivialRangeAndSources(rangesWithSources, 1, 10, "127.0.0.1", "127.0.0.3", "127.0.0.53");
addNonTrivialRangeAndSources(rangesWithSources, 11, 20, "127.0.0.1", "127.0.0.3", "127.0.0.57");
addNonTrivialRangeAndSources(rangesWithSources, 21, 30, "127.0.0.2", "127.0.0.59", "127.0.0.61");
RangeFetchMapCalculator calculator = new RangeFetchMapCalculator(rangesWithSources.build(), new ArrayList<>(), "Test");
Multimap<InetAddressAndPort, Range<Token>> map = calculator.getRangeFetchMap();
validateRange(rangesWithSources, map);
Assert.assertEquals(2, map.asMap().size());
//Should have streamed from local DC endpoints
assertArrays(Arrays.asList(generateNonTrivialRange(21, 30)), map.asMap().get(InetAddressAndPort.getByName("127.0.0.2")));
assertArrays(Arrays.asList(generateNonTrivialRange(1, 10), generateNonTrivialRange(11, 20)), map.asMap().get(InetAddressAndPort.getByName("127.0.0.3")));
}
@Test
public void testForRemoteDC() throws Exception
{
EndpointsByRange.Builder rangesWithSources = new EndpointsByRange.Builder();
addNonTrivialRangeAndSources(rangesWithSources, 1, 10, "127.0.0.3", "127.0.0.51");
addNonTrivialRangeAndSources(rangesWithSources, 11, 20, "127.0.0.3", "127.0.0.55");
addNonTrivialRangeAndSources(rangesWithSources, 21, 30, "127.0.0.2", "127.0.0.59");
//Reject only 127.0.0.3 and accept everyone else
final RangeStreamer.SourceFilter localHostFilter = new RangeStreamer.SourceFilter()
{
public boolean apply(Replica replica)
{
try
{
if (replica.endpoint().equals(InetAddressAndPort.getByName("127.0.0.3")))
return false;
else
return true;
}
catch (UnknownHostException e)
{
return true;
}
}
public String message(Replica replica)
{
return "Not 127.0.0.3";
}
};
RangeFetchMapCalculator calculator = new RangeFetchMapCalculator(rangesWithSources.build(), Arrays.asList(localHostFilter), "Test");
Multimap<InetAddressAndPort, Range<Token>> map = calculator.getRangeFetchMap();
validateRange(rangesWithSources, map);
Assert.assertEquals(3, map.asMap().size());
//Should have streamed from remote DC endpoint
assertArrays(Arrays.asList(generateNonTrivialRange(1, 10)), map.asMap().get(InetAddressAndPort.getByName("127.0.0.51")));
assertArrays(Arrays.asList(generateNonTrivialRange(11, 20)), map.asMap().get(InetAddressAndPort.getByName("127.0.0.55")));
assertArrays(Arrays.asList(generateNonTrivialRange(21, 30)), map.asMap().get(InetAddressAndPort.getByName("127.0.0.2")));
}
@Test
public void testTrivialRanges() throws UnknownHostException
{
EndpointsByRange.Builder rangesWithSources = new EndpointsByRange.Builder();
// add non-trivial ranges
addNonTrivialRangeAndSources(rangesWithSources, 1, 10, "127.0.0.3", "127.0.0.51");
addNonTrivialRangeAndSources(rangesWithSources, 11, 20, "127.0.0.3", "127.0.0.55");
addNonTrivialRangeAndSources(rangesWithSources, 21, 30, "127.0.0.2", "127.0.0.59");
// and a trivial one:
addTrivialRangeAndSources(rangesWithSources, 1, 10, "127.0.0.3", "127.0.0.51");
RangeFetchMapCalculator calculator = new RangeFetchMapCalculator(rangesWithSources.build(), Collections.emptyList(), "Test");
Multimap<InetAddressAndPort, Range<Token>> optMap = calculator.getRangeFetchMapForNonTrivialRanges();
Multimap<InetAddressAndPort, Range<Token>> trivialMap = calculator.getRangeFetchMapForTrivialRanges(optMap);
assertTrue(trivialMap.get(InetAddressAndPort.getByName("127.0.0.3")).contains(generateTrivialRange(1,10)) ^
trivialMap.get(InetAddressAndPort.getByName("127.0.0.51")).contains(generateTrivialRange(1,10)));
assertFalse(optMap.containsKey(generateTrivialRange(1, 10)));
}
@Test(expected = IllegalStateException.class)
public void testNotEnoughEndpointsForTrivialRange() throws UnknownHostException
{
EndpointsByRange.Builder rangesWithSources = new EndpointsByRange.Builder();
// add non-trivial ranges
addNonTrivialRangeAndSources(rangesWithSources, 1, 10, "127.0.0.3", "127.0.0.51");
addNonTrivialRangeAndSources(rangesWithSources, 11, 20, "127.0.0.3", "127.0.0.55");
addNonTrivialRangeAndSources(rangesWithSources, 21, 30, "127.0.0.2", "127.0.0.59");
// and a trivial one:
addTrivialRangeAndSources(rangesWithSources, 1, 10, "127.0.0.3");
RangeStreamer.SourceFilter filter = new RangeStreamer.SourceFilter()
{
public boolean apply(Replica replica)
{
try
{
if (replica.endpoint().equals(InetAddressAndPort.getByName("127.0.0.3")))
return false;
}
catch (UnknownHostException e)
{
throw new RuntimeException(e);
}
return true;
}
public String message(Replica replica)
{
return "Not 127.0.0.3";
}
};
RangeFetchMapCalculator calculator = new RangeFetchMapCalculator(rangesWithSources.build(), Collections.singleton(filter), "Test");
Multimap<InetAddressAndPort, Range<Token>> optMap = calculator.getRangeFetchMapForNonTrivialRanges();
Multimap<InetAddressAndPort, Range<Token>> trivialMap = calculator.getRangeFetchMapForTrivialRanges(optMap);
}
private void assertArrays(Collection<Range<Token>> expected, Collection<Range<Token>> result)
{
Assert.assertEquals(expected.size(), result.size());
assertTrue(result.containsAll(expected));
}
private void validateRange(EndpointsByRange.Builder rangesWithSources, Multimap<InetAddressAndPort, Range<Token>> result)
{
for (Map.Entry<InetAddressAndPort, Range<Token>> entry : result.entries())
{
assertTrue(rangesWithSources.get(entry.getValue()).endpoints().contains(entry.getKey()));
}
}
private void addNonTrivialRangeAndSources(EndpointsByRange.Builder rangesWithSources, int left, int right, String... hosts) throws UnknownHostException
{
for (InetAddressAndPort endpoint : makeAddrs(hosts))
{
Range<Token> range = generateNonTrivialRange(left, right);
rangesWithSources.put(range, Replica.fullReplica(endpoint, range));
}
}
private void addTrivialRangeAndSources(EndpointsByRange.Builder rangesWithSources, int left, int right, String... hosts) throws UnknownHostException
{
for (InetAddressAndPort endpoint : makeAddrs(hosts))
{
Range<Token> range = generateTrivialRange(left, right);
rangesWithSources.put(range, Replica.fullReplica(endpoint, range));
}
}
private Collection<InetAddressAndPort> makeAddrs(String... hosts) throws UnknownHostException
{
ArrayList<InetAddressAndPort> addrs = new ArrayList<>(hosts.length);
for (String host : hosts)
addrs.add(InetAddressAndPort.getByName(host));
return addrs;
}
private Range<Token> generateNonTrivialRange(int left, int right)
{
// * 1000 to make sure we dont filter away any trivial ranges:
return new Range<>(new RandomPartitioner.BigIntegerToken(String.valueOf(left * 10000)), new RandomPartitioner.BigIntegerToken(String.valueOf(right * 10000)));
}
private Range<Token> generateTrivialRange(int left, int right)
{
Range<Token> r = new Range<>(new RandomPartitioner.BigIntegerToken(String.valueOf(left)), new RandomPartitioner.BigIntegerToken(String.valueOf(right)));
assertTrue(RangeFetchMapCalculator.isTrivial(r));
return r;
}
}