blob: 7bb2b874ea826d56fbff1a0f291a5ea7a4b5cb38 [file] [log] [blame]
/*
* Licensed to the Apache Software Foundation (ASF) under one
* or more contributor license agreements. See the NOTICE file
* distributed with this work for additional information
* regarding copyright ownership. The ASF licenses this file
* to you under the Apache License, Version 2.0 (the
* "License"); you may not use this file except in compliance
* with the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/
package org.apache.cassandra.service;
import java.util.ArrayList;
import java.util.Arrays;
import java.util.Collection;
import java.util.Collections;
import java.util.List;
import com.google.common.base.Predicate;
import com.google.common.collect.ImmutableMap;
import org.apache.cassandra.locator.EndpointsByReplica;
import org.apache.cassandra.locator.EndpointsForRange;
import org.junit.After;
import org.junit.BeforeClass;
import org.junit.Test;
import org.apache.cassandra.config.DatabaseDescriptor;
import org.apache.cassandra.dht.OrderPreservingPartitioner;
import org.apache.cassandra.dht.Range;
import org.apache.cassandra.dht.RangeStreamer;
import org.apache.cassandra.dht.Token;
import org.apache.cassandra.locator.AbstractEndpointSnitch;
import org.apache.cassandra.locator.AbstractReplicationStrategy;
import org.apache.cassandra.locator.IEndpointSnitch;
import org.apache.cassandra.locator.InetAddressAndPort;
import org.apache.cassandra.locator.RangesAtEndpoint;
import org.apache.cassandra.locator.Replica;
import org.apache.cassandra.locator.ReplicaCollection;
import org.apache.cassandra.locator.SimpleStrategy;
import org.apache.cassandra.locator.TokenMetadata;
import org.apache.cassandra.utils.FBUtilities;
import org.apache.cassandra.utils.Pair;
import static org.apache.cassandra.locator.Replica.fullReplica;
import static org.apache.cassandra.locator.Replica.transientReplica;
import static org.apache.cassandra.service.StorageServiceTest.assertMultimapEqualsIgnoreOrder;
/**
* This is also fairly effectively testing source retrieval for bootstrap as well since RangeStreamer
* is used to calculate the endpoints to fetch from and check they are alive for both RangeRelocator (move) and
* bootstrap (RangeRelocator).
*/
public class BootstrapTransientTest
{
static InetAddressAndPort address02;
static InetAddressAndPort address03;
static InetAddressAndPort address04;
static InetAddressAndPort address05;
@BeforeClass
public static void setUpClass() throws Exception
{
address02 = InetAddressAndPort.getByName("127.0.0.2");
address03 = InetAddressAndPort.getByName("127.0.0.3");
address04 = InetAddressAndPort.getByName("127.0.0.4");
address05 = InetAddressAndPort.getByName("127.0.0.5");
}
private final List<InetAddressAndPort> downNodes = new ArrayList<>();
final RangeStreamer.SourceFilter alivePredicate = new RangeStreamer.SourceFilter()
{
public boolean apply(Replica replica)
{
return !downNodes.contains(replica.endpoint());
}
public String message(Replica replica)
{
return "Down nodes: " + downNodes;
}
};
final RangeStreamer.SourceFilter sourceFilterDownNodesPredicate = new RangeStreamer.SourceFilter()
{
public boolean apply(Replica replica)
{
return !sourceFilterDownNodes.contains(replica.endpoint());
}
public String message(Replica replica)
{
return "Source filter down nodes" + sourceFilterDownNodes;
}
};
private final List<InetAddressAndPort> sourceFilterDownNodes = new ArrayList<>();
private final Collection<RangeStreamer.SourceFilter> sourceFilters = Arrays.asList(alivePredicate,
sourceFilterDownNodesPredicate,
new RangeStreamer.ExcludeLocalNodeFilter()
);
@After
public void clearDownNode()
{
// TODO: actually use these
downNodes.clear();
sourceFilterDownNodes.clear();
}
@BeforeClass
public static void setupDD()
{
DatabaseDescriptor.daemonInitialization();
}
Token tenToken = new OrderPreservingPartitioner.StringToken("00010");
Token twentyToken = new OrderPreservingPartitioner.StringToken("00020");
Token thirtyToken = new OrderPreservingPartitioner.StringToken("00030");
Token fourtyToken = new OrderPreservingPartitioner.StringToken("00040");
Range<Token> range30_10 = new Range<>(thirtyToken, tenToken);
Range<Token> range10_20 = new Range<>(tenToken, twentyToken);
Range<Token> range20_30 = new Range<>(twentyToken, thirtyToken);
Range<Token> range30_40 = new Range<>(thirtyToken, fourtyToken);
RangesAtEndpoint toFetch = RangesAtEndpoint.of(new Replica(address05, range30_40, true),
new Replica(address05, range20_30, true),
new Replica(address05, range10_20, false));
public EndpointsForRange endpoints(Replica... replicas)
{
assert replicas.length > 0;
Range<Token> range = replicas[0].range();
EndpointsForRange.Builder builder = EndpointsForRange.builder(range);
for (Replica r : replicas)
{
assert r.range().equals(range);
builder.add(r);
}
return builder.build();
}
@Test
public void testRangeStreamerRangesToFetch() throws Exception
{
EndpointsByReplica expectedResult = new EndpointsByReplica(ImmutableMap.of(
transientReplica(address05, range10_20), endpoints(transientReplica(address02, range10_20)),
fullReplica(address05, range20_30), endpoints(transientReplica(address03, range20_30), fullReplica(address04, range20_30)),
fullReplica(address05, range30_40), endpoints(transientReplica(address04, range30_10), fullReplica(address02, range30_10))));
invokeCalculateRangesToFetchWithPreferredEndpoints(toFetch, constructTMDs(), expectedResult);
}
private Pair<TokenMetadata, TokenMetadata> constructTMDs()
{
TokenMetadata tmd = new TokenMetadata();
tmd.updateNormalToken(range30_10.right, address02);
tmd.updateNormalToken(range10_20.right, address03);
tmd.updateNormalToken(range20_30.right, address04);
TokenMetadata updated = tmd.cloneOnlyTokenMap();
updated.updateNormalToken(range30_40.right, address05);
return Pair.create(tmd, updated);
}
private void invokeCalculateRangesToFetchWithPreferredEndpoints(ReplicaCollection<?> toFetch,
Pair<TokenMetadata, TokenMetadata> tmds,
EndpointsByReplica expectedResult)
{
DatabaseDescriptor.setTransientReplicationEnabledUnsafe(true);
EndpointsByReplica result = RangeStreamer.calculateRangesToFetchWithPreferredEndpoints((address, replicas) -> replicas,
simpleStrategy(tmds.left),
toFetch,
true,
tmds.left,
tmds.right,
"TestKeyspace",
sourceFilters);
result.asMap().forEach((replica, list) -> System.out.printf("Replica %s, sources %s%n", replica, list));
assertMultimapEqualsIgnoreOrder(expectedResult, result);
}
private AbstractReplicationStrategy simpleStrategy(TokenMetadata tmd)
{
IEndpointSnitch snitch = new AbstractEndpointSnitch()
{
public int compareEndpoints(InetAddressAndPort target, Replica r1, Replica r2)
{
return 0;
}
public String getRack(InetAddressAndPort endpoint)
{
return "R1";
}
public String getDatacenter(InetAddressAndPort endpoint)
{
return "DC1";
}
};
return new SimpleStrategy("MoveTransientTest",
tmd,
snitch,
com.google.common.collect.ImmutableMap.of("replication_factor", "3/1"));
}
}