blob: dda6863153a39d516c95556d6669e5a298d32dc1 [file] [log] [blame]
/*
* Licensed to the Apache Software Foundation (ASF) under one
* or more contributor license agreements. See the NOTICE file
* distributed with this work for additional information
* regarding copyright ownership. The ASF licenses this file
* to you under the Apache License, Version 2.0 (the
* "License"); you may not use this file except in compliance
* with the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/
package org.apache.cassandra.dht;
import java.util.ArrayList;
import java.util.Collection;
import java.util.HashMap;
import java.util.List;
import java.util.Map;
import java.util.Optional;
import java.util.Set;
import java.util.function.BiFunction;
import java.util.function.Function;
import java.util.stream.Collectors;
import com.google.common.annotations.VisibleForTesting;
import com.google.common.base.Preconditions;
import com.google.common.base.Predicate;
import com.google.common.collect.HashMultimap;
import com.google.common.collect.ImmutableMultimap;
import com.google.common.collect.Iterables;
import com.google.common.collect.Multimap;
import org.apache.commons.lang3.StringUtils;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
import org.apache.cassandra.db.Keyspace;
import org.apache.cassandra.db.SystemKeyspace;
import org.apache.cassandra.gms.FailureDetector;
import org.apache.cassandra.gms.Gossiper;
import org.apache.cassandra.gms.IFailureDetector;
import org.apache.cassandra.locator.AbstractReplicationStrategy;
import org.apache.cassandra.locator.Endpoints;
import org.apache.cassandra.locator.EndpointsByRange;
import org.apache.cassandra.locator.EndpointsByReplica;
import org.apache.cassandra.locator.EndpointsForRange;
import org.apache.cassandra.locator.IEndpointSnitch;
import org.apache.cassandra.locator.InetAddressAndPort;
import org.apache.cassandra.locator.LocalStrategy;
import org.apache.cassandra.locator.NetworkTopologyStrategy;
import org.apache.cassandra.locator.RangesAtEndpoint;
import org.apache.cassandra.locator.Replica;
import org.apache.cassandra.locator.ReplicaCollection;
import org.apache.cassandra.locator.ReplicaCollection.Builder.Conflict;
import org.apache.cassandra.locator.Replicas;
import org.apache.cassandra.locator.TokenMetadata;
import org.apache.cassandra.streaming.PreviewKind;
import org.apache.cassandra.streaming.StreamOperation;
import org.apache.cassandra.streaming.StreamPlan;
import org.apache.cassandra.streaming.StreamResultFuture;
import org.apache.cassandra.utils.FBUtilities;
import static com.google.common.base.Predicates.and;
import static com.google.common.base.Predicates.not;
import static com.google.common.collect.Iterables.all;
import static com.google.common.collect.Iterables.any;
import static org.apache.cassandra.locator.Replica.fullReplica;
/**
* Assists in streaming ranges to this node.
*/
public class RangeStreamer
{
private static final Logger logger = LoggerFactory.getLogger(RangeStreamer.class);
public static Predicate<Replica> ALIVE_PREDICATE = replica ->
(!Gossiper.instance.isEnabled() ||
(Gossiper.instance.getEndpointStateForEndpoint(replica.endpoint()) == null ||
Gossiper.instance.getEndpointStateForEndpoint(replica.endpoint()).isAlive())) &&
FailureDetector.instance.isAlive(replica.endpoint());
/* bootstrap tokens. can be null if replacing the node. */
private final Collection<Token> tokens;
/* current token ring */
private final TokenMetadata metadata;
/* address of this node */
private final InetAddressAndPort address;
/* streaming description */
private final String description;
private final Map<String, Multimap<InetAddressAndPort, FetchReplica>> toFetch = new HashMap<>();
private final List<SourceFilter> sourceFilters = new ArrayList<>();
private final StreamPlan streamPlan;
private final boolean useStrictConsistency;
private final IEndpointSnitch snitch;
private final StreamStateStore stateStore;
public static class FetchReplica
{
public final Replica local;
// Source replica
public final Replica remote;
public FetchReplica(Replica local, Replica remote)
{
Preconditions.checkNotNull(local);
Preconditions.checkNotNull(remote);
assert local.isSelf() && !remote.isSelf();
this.local = local;
this.remote = remote;
}
public String toString()
{
return "FetchReplica{" +
"local=" + local +
", remote=" + remote +
'}';
}
public boolean equals(Object o)
{
if (this == o) return true;
if (o == null || getClass() != o.getClass()) return false;
FetchReplica that = (FetchReplica) o;
if (!local.equals(that.local)) return false;
return remote.equals(that.remote);
}
public int hashCode()
{
int result = local.hashCode();
result = 31 * result + remote.hashCode();
return result;
}
}
public interface SourceFilter extends Predicate<Replica>
{
public boolean apply(Replica replica);
public String message(Replica replica);
}
/**
* Source filter which excludes any endpoints that are not alive according to a
* failure detector.
*/
public static class FailureDetectorSourceFilter implements SourceFilter
{
private final IFailureDetector fd;
public FailureDetectorSourceFilter(IFailureDetector fd)
{
this.fd = fd;
}
@Override
public boolean apply(Replica replica)
{
return fd.isAlive(replica.endpoint());
}
@Override
public String message(Replica replica)
{
return "Filtered " + replica + " out because it was down";
}
}
/**
* Source filter which excludes any endpoints that are not in a specific data center.
*/
public static class SingleDatacenterFilter implements SourceFilter
{
private final String sourceDc;
private final IEndpointSnitch snitch;
public SingleDatacenterFilter(IEndpointSnitch snitch, String sourceDc)
{
this.sourceDc = sourceDc;
this.snitch = snitch;
}
@Override
public boolean apply(Replica replica)
{
return snitch.getDatacenter(replica).equals(sourceDc);
}
@Override
public String message(Replica replica)
{
return "Filtered " + replica + " out because it does not belong to " + sourceDc + " datacenter";
}
}
/**
* Source filter which excludes the current node from source calculations
*/
public static class ExcludeLocalNodeFilter implements SourceFilter
{
@Override
public boolean apply(Replica replica)
{
return !replica.isSelf();
}
@Override
public String message(Replica replica)
{
return "Filtered " + replica + " out because it is local";
}
}
/**
* Source filter which only includes endpoints contained within a provided set.
*/
public static class AllowedSourcesFilter implements SourceFilter
{
private final Set<InetAddressAndPort> allowedSources;
public AllowedSourcesFilter(Set<InetAddressAndPort> allowedSources)
{
this.allowedSources = allowedSources;
}
public boolean apply(Replica replica)
{
return allowedSources.contains(replica.endpoint());
}
@Override
public String message(Replica replica)
{
return "Filtered " + replica + " out because it was not in the allowed set: " + allowedSources;
}
}
public RangeStreamer(TokenMetadata metadata,
Collection<Token> tokens,
InetAddressAndPort address,
StreamOperation streamOperation,
boolean useStrictConsistency,
IEndpointSnitch snitch,
StreamStateStore stateStore,
boolean connectSequentially,
int connectionsPerHost)
{
this(metadata, tokens, address, streamOperation, useStrictConsistency, snitch, stateStore,
FailureDetector.instance, connectSequentially, connectionsPerHost);
}
RangeStreamer(TokenMetadata metadata,
Collection<Token> tokens,
InetAddressAndPort address,
StreamOperation streamOperation,
boolean useStrictConsistency,
IEndpointSnitch snitch,
StreamStateStore stateStore,
IFailureDetector failureDetector,
boolean connectSequentially,
int connectionsPerHost)
{
Preconditions.checkArgument(streamOperation == StreamOperation.BOOTSTRAP || streamOperation == StreamOperation.REBUILD, streamOperation);
this.metadata = metadata;
this.tokens = tokens;
this.address = address;
this.description = streamOperation.getDescription();
this.streamPlan = new StreamPlan(streamOperation, connectionsPerHost, connectSequentially, null, PreviewKind.NONE);
this.useStrictConsistency = useStrictConsistency;
this.snitch = snitch;
this.stateStore = stateStore;
streamPlan.listeners(this.stateStore);
// We're _always_ filtering out a local node and down sources
addSourceFilter(new RangeStreamer.FailureDetectorSourceFilter(failureDetector));
addSourceFilter(new RangeStreamer.ExcludeLocalNodeFilter());
}
public void addSourceFilter(SourceFilter filter)
{
sourceFilters.add(filter);
}
// Creates error message from source filters
private static String buildErrorMessage(Collection<SourceFilter> sourceFilters, ReplicaCollection<?> replicas)
{
StringBuilder failureMessage = new StringBuilder();
for (Replica r : replicas)
{
for (SourceFilter filter : sourceFilters)
{
if (!filter.apply(r))
{
failureMessage.append(filter.message(r));
break;
}
}
}
return failureMessage.toString();
}
/**
* Add ranges to be streamed for given keyspace.
*
* @param keyspaceName keyspace name
* @param replicas ranges to be streamed
*/
public void addRanges(String keyspaceName, ReplicaCollection<?> replicas)
{
Keyspace keyspace = Keyspace.open(keyspaceName);
AbstractReplicationStrategy strat = keyspace.getReplicationStrategy();
if(strat instanceof LocalStrategy)
{
logger.info("Not adding ranges for Local Strategy keyspace={}", keyspaceName);
return;
}
boolean useStrictSource = useStrictSourcesForRanges(strat);
EndpointsByReplica fetchMap = calculateRangesToFetchWithPreferredEndpoints(replicas, keyspace, useStrictSource);
for (Map.Entry<Replica, Replica> entry : fetchMap.flattenEntries())
logger.info("{}: range {} exists on {} for keyspace {}", description, entry.getKey(), entry.getValue(), keyspaceName);
Multimap<InetAddressAndPort, FetchReplica> workMap;
//Only use the optimized strategy if we don't care about strict sources, have a replication factor > 1, and no
//transient replicas.
if (useStrictSource || strat == null || strat.getReplicationFactor().allReplicas == 1 || strat.getReplicationFactor().hasTransientReplicas())
{
workMap = convertPreferredEndpointsToWorkMap(fetchMap);
}
else
{
workMap = getOptimizedWorkMap(fetchMap, sourceFilters, keyspaceName);
}
if (toFetch.put(keyspaceName, workMap) != null)
throw new IllegalArgumentException("Keyspace is already added to fetch map");
if (logger.isTraceEnabled())
{
for (Map.Entry<InetAddressAndPort, Collection<FetchReplica>> entry : workMap.asMap().entrySet())
{
for (FetchReplica r : entry.getValue())
logger.trace("{}: range source {} local range {} for keyspace {}", description, r.remote, r.local, keyspaceName);
}
}
}
/**
* @param strat AbstractReplicationStrategy of keyspace to check
* @return true when the node is bootstrapping, useStrictConsistency is true and # of nodes in the cluster is more than # of replica
*/
private boolean useStrictSourcesForRanges(AbstractReplicationStrategy strat)
{
boolean res = useStrictConsistency && tokens != null;
if (res)
{
int nodes = 0;
if (strat instanceof NetworkTopologyStrategy)
{
ImmutableMultimap<String, InetAddressAndPort> dc2Nodes = metadata.getDC2AllEndpoints(snitch);
NetworkTopologyStrategy ntps = (NetworkTopologyStrategy) strat;
for (String dc : dc2Nodes.keySet())
nodes += ntps.getReplicationFactor(dc).allReplicas > 0 ? dc2Nodes.get(dc).size() : 0;
}
else
nodes = metadata.getSizeOfAllEndpoints();
res = nodes > strat.getReplicationFactor().allReplicas;
}
return res;
}
/**
* Wrapper method to assemble the arguments for invoking the implementation with RangeStreamer's parameters
*/
private EndpointsByReplica calculateRangesToFetchWithPreferredEndpoints(ReplicaCollection<?> fetchRanges, Keyspace keyspace, boolean useStrictConsistency)
{
AbstractReplicationStrategy strat = keyspace.getReplicationStrategy();
TokenMetadata tmd = metadata.cloneOnlyTokenMap();
TokenMetadata tmdAfter = null;
if (tokens != null)
{
// Pending ranges
tmdAfter = tmd.cloneOnlyTokenMap();
tmdAfter.updateNormalTokens(tokens, address);
}
else if (useStrictConsistency)
{
throw new IllegalArgumentException("Can't ask for strict consistency and not supply tokens");
}
return calculateRangesToFetchWithPreferredEndpoints(snitch::sortedByProximity,
strat,
fetchRanges,
useStrictConsistency,
tmd,
tmdAfter,
keyspace.getName(),
sourceFilters);
}
/**
* Get a map of all ranges and the source that will be cleaned up once this bootstrapped node is added for the given ranges.
* For each range, the list should only contain a single source. This allows us to consistently migrate data without violating
* consistency.
**/
public static EndpointsByReplica
calculateRangesToFetchWithPreferredEndpoints(BiFunction<InetAddressAndPort, EndpointsForRange, EndpointsForRange> snitchGetSortedListByProximity,
AbstractReplicationStrategy strat,
ReplicaCollection<?> fetchRanges,
boolean useStrictConsistency,
TokenMetadata tmdBefore,
TokenMetadata tmdAfter,
String keyspace,
Collection<SourceFilter> sourceFilters)
{
EndpointsByRange rangeAddresses = strat.getRangeAddresses(tmdBefore);
InetAddressAndPort localAddress = FBUtilities.getBroadcastAddressAndPort();
logger.debug ("Keyspace: {}", keyspace);
logger.debug("To fetch RN: {}", fetchRanges);
logger.debug("Fetch ranges: {}", rangeAddresses);
Predicate<Replica> testSourceFilters = and(sourceFilters);
Function<EndpointsForRange, EndpointsForRange> sorted =
endpoints -> snitchGetSortedListByProximity.apply(localAddress, endpoints);
//This list of replicas is just candidates. With strict consistency it's going to be a narrow list.
EndpointsByReplica.Builder rangesToFetchWithPreferredEndpoints = new EndpointsByReplica.Builder();
for (Replica toFetch : fetchRanges)
{
//Replica that is sufficient to provide the data we need
//With strict consistency and transient replication we may end up with multiple types
//so this isn't used with strict consistency
Predicate<Replica> isSufficient = r -> toFetch.isTransient() || r.isFull();
logger.debug("To fetch {}", toFetch);
for (Range<Token> range : rangeAddresses.keySet())
{
if (!range.contains(toFetch.range()))
continue;
final EndpointsForRange oldEndpoints = sorted.apply(rangeAddresses.get(range));
//Ultimately we populate this with whatever is going to be fetched from to satisfy toFetch
//It could be multiple endpoints and we must fetch from all of them if they are there
//With transient replication and strict consistency this is to get the full data from a full replica and
//transient data from the transient replica losing data
EndpointsForRange sources;
//Due to CASSANDRA-5953 we can have a higher RF than we have endpoints.
//So we need to be careful to only be strict when endpoints == RF
boolean isStrictConsistencyApplicable = useStrictConsistency && (oldEndpoints.size() == strat.getReplicationFactor().allReplicas);
if (isStrictConsistencyApplicable)
{
EndpointsForRange strictEndpoints;
//Start with two sets of who replicates the range before and who replicates it after
EndpointsForRange newEndpoints = strat.calculateNaturalReplicas(toFetch.range().right, tmdAfter);
logger.debug("Old endpoints {}", oldEndpoints);
logger.debug("New endpoints {}", newEndpoints);
// Remove new endpoints from old endpoints based on address
strictEndpoints = oldEndpoints.without(newEndpoints.endpoints());
if (strictEndpoints.size() > 1)
throw new AssertionError("Expected <= 1 endpoint but found " + strictEndpoints);
//We have to check the source filters here to see if they will remove any replicas
//required for strict consistency
if (!all(strictEndpoints, testSourceFilters))
throw new IllegalStateException("Necessary replicas for strict consistency were removed by source filters: " + buildErrorMessage(sourceFilters, strictEndpoints));
//If we are transitioning from transient to full and and the set of replicas for the range is not changing
//we might end up with no endpoints to fetch from by address. In that case we can pick any full replica safely
//since we are already a transient replica and the existing replica remains.
//The old behavior where we might be asked to fetch ranges we don't need shouldn't occur anymore.
//So it's an error if we don't find what we need.
if (strictEndpoints.isEmpty() && toFetch.isTransient())
throw new AssertionError("If there are no endpoints to fetch from then we must be transitioning from transient to full for range " + toFetch);
if (!any(strictEndpoints, isSufficient))
{
// need an additional replica; include all our filters, to ensure we include a matching node
Optional<Replica> fullReplica = Iterables.<Replica>tryFind(oldEndpoints, and(isSufficient, testSourceFilters)).toJavaUtil();
if (fullReplica.isPresent())
strictEndpoints = Endpoints.concat(strictEndpoints, EndpointsForRange.of(fullReplica.get()));
else
throw new IllegalStateException("Couldn't find any matching sufficient replica out of " + buildErrorMessage(sourceFilters, oldEndpoints));
}
sources = strictEndpoints;
}
else
{
//Without strict consistency we have given up on correctness so no point in fetching from
//a random full + transient replica since it's also likely to lose data
//Also apply testSourceFilters that were given to us so we can safely select a single source
sources = sorted.apply(oldEndpoints.filter(and(isSufficient, testSourceFilters)));
//Limit it to just the first possible source, we don't need more than one and downstream
//will fetch from every source we supply
sources = sources.size() > 0 ? sources.subList(0, 1) : sources;
}
// storing range and preferred endpoint set
rangesToFetchWithPreferredEndpoints.putAll(toFetch, sources, Conflict.NONE);
logger.debug("Endpoints to fetch for {} are {}", toFetch, sources);
}
EndpointsForRange addressList = rangesToFetchWithPreferredEndpoints.getIfPresent(toFetch);
if (addressList == null)
throw new IllegalStateException("Failed to find endpoints to fetch " + toFetch);
/*
* When we move forwards (shrink our bucket) we are the one losing a range and no one else loses
* from that action (we also don't gain). When we move backwards there are two people losing a range. One is a full replica
* and the other is a transient replica. So we must need fetch from two places in that case for the full range we gain.
* For a transient range we only need to fetch from one.
*/
if (useStrictConsistency && addressList.size() > 1 && (addressList.filter(Replica::isFull).size() > 1 || addressList.filter(Replica::isTransient).size() > 1))
throw new IllegalStateException(String.format("Multiple strict sources found for %s, sources: %s", toFetch, addressList));
//We must have enough stuff to fetch from
if (!any(addressList, isSufficient))
{
if (strat.getReplicationFactor().allReplicas == 1)
{
if (useStrictConsistency)
{
logger.warn("A node required to move the data consistently is down");
throw new IllegalStateException("Unable to find sufficient sources for streaming range " + toFetch + " in keyspace " + keyspace + " with RF=1. " +
"Ensure this keyspace contains replicas in the source datacenter.");
}
else
logger.warn("Unable to find sufficient sources for streaming range {} in keyspace {} with RF=1. " +
"Keyspace might be missing data.", toFetch, keyspace);
}
else
{
if (useStrictConsistency)
logger.warn("A node required to move the data consistently is down");
throw new IllegalStateException("Unable to find sufficient sources for streaming range " + toFetch + " in keyspace " + keyspace);
}
}
}
return rangesToFetchWithPreferredEndpoints.build();
}
/**
* The preferred endpoint list is the wrong format because it is keyed by Replica (this node) rather than the source
* endpoint we will fetch from which streaming wants.
*/
public static Multimap<InetAddressAndPort, FetchReplica> convertPreferredEndpointsToWorkMap(EndpointsByReplica preferredEndpoints)
{
Multimap<InetAddressAndPort, FetchReplica> workMap = HashMultimap.create();
for (Map.Entry<Replica, EndpointsForRange> e : preferredEndpoints.entrySet())
{
for (Replica source : e.getValue())
{
assert (e.getKey()).isSelf();
assert !source.isSelf();
workMap.put(source.endpoint(), new FetchReplica(e.getKey(), source));
}
}
logger.debug("Work map {}", workMap);
return workMap;
}
/**
* Optimized version that also outputs the final work map
*/
private static Multimap<InetAddressAndPort, FetchReplica> getOptimizedWorkMap(EndpointsByReplica rangesWithSources,
Collection<SourceFilter> sourceFilters,
String keyspace)
{
//For now we just aren't going to use the optimized range fetch map with transient replication to shrink
//the surface area to test and introduce bugs.
//In the future it's possible we could run it twice once for full ranges with only full replicas
//and once with transient ranges and all replicas. Then merge the result.
EndpointsByRange.Builder unwrapped = new EndpointsByRange.Builder();
for (Map.Entry<Replica, Replica> entry : rangesWithSources.flattenEntries())
{
Replicas.temporaryAssertFull(entry.getValue());
unwrapped.put(entry.getKey().range(), entry.getValue());
}
EndpointsByRange unwrappedView = unwrapped.build();
RangeFetchMapCalculator calculator = new RangeFetchMapCalculator(unwrappedView, sourceFilters, keyspace);
Multimap<InetAddressAndPort, Range<Token>> rangeFetchMapMap = calculator.getRangeFetchMap();
logger.info("Output from RangeFetchMapCalculator for keyspace {}", keyspace);
validateRangeFetchMap(unwrappedView, rangeFetchMapMap, keyspace);
//Need to rewrap as Replicas
Multimap<InetAddressAndPort, FetchReplica> wrapped = HashMultimap.create();
for (Map.Entry<InetAddressAndPort, Range<Token>> entry : rangeFetchMapMap.entries())
{
Replica toFetch = null;
for (Replica r : rangesWithSources.keySet())
{
if (r.range().equals(entry.getValue()))
{
if (toFetch != null)
throw new AssertionError(String.format("There shouldn't be multiple replicas for range %s, replica %s and %s here", r.range(), r, toFetch));
toFetch = r;
}
}
if (toFetch == null)
throw new AssertionError("Shouldn't be possible for the Replica we fetch to be null here");
//Committing the cardinal sin of synthesizing a Replica, but it's ok because we assert earlier all of them
//are full and optimized range fetch map doesn't support transient replication yet.
wrapped.put(entry.getKey(), new FetchReplica(toFetch, fullReplica(entry.getKey(), entry.getValue())));
}
return wrapped;
}
/**
* Verify that source returned for each range is correct
*/
@VisibleForTesting
static void validateRangeFetchMap(EndpointsByRange rangesWithSources, Multimap<InetAddressAndPort, Range<Token>> rangeFetchMapMap, String keyspace)
{
for (Map.Entry<InetAddressAndPort, Range<Token>> entry : rangeFetchMapMap.entries())
{
if(entry.getKey().equals(FBUtilities.getBroadcastAddressAndPort()))
{
throw new IllegalStateException("Trying to stream locally. Range: " + entry.getValue()
+ " in keyspace " + keyspace);
}
if (!rangesWithSources.get(entry.getValue()).endpoints().contains(entry.getKey()))
{
throw new IllegalStateException("Trying to stream from wrong endpoint. Range: " + entry.getValue()
+ " in keyspace " + keyspace + " from endpoint: " + entry.getKey());
}
logger.info("Streaming range {} from endpoint {} for keyspace {}", entry.getValue(), entry.getKey(), keyspace);
}
}
// For testing purposes
@VisibleForTesting
Map<String, Multimap<InetAddressAndPort, FetchReplica>> toFetch()
{
return toFetch;
}
public StreamResultFuture fetchAsync()
{
toFetch.forEach((keyspace, sources) -> {
logger.debug("Keyspace {} Sources {}", keyspace, sources);
sources.asMap().forEach((source, fetchReplicas) -> {
// filter out already streamed ranges
SystemKeyspace.AvailableRanges available = stateStore.getAvailableRanges(keyspace, metadata.partitioner);
Predicate<FetchReplica> isAvailable = fetch -> {
boolean isInFull = available.full.contains(fetch.local.range());
boolean isInTrans = available.trans.contains(fetch.local.range());
if (!isInFull && !isInTrans)
//Range is unavailable
return false;
if (fetch.local.isFull())
//For full, pick only replicas with matching transientness
return isInFull == fetch.remote.isFull();
// Any transient or full will do
return true;
};
List<FetchReplica> remaining = fetchReplicas.stream().filter(not(isAvailable)).collect(Collectors.toList());
if (remaining.size() < available.full.size() + available.trans.size())
{
List<FetchReplica> skipped = fetchReplicas.stream().filter(isAvailable).collect(Collectors.toList());
logger.info("Some ranges of {} are already available. Skipping streaming those ranges. Skipping {}. Fully available {} Transiently available {}",
fetchReplicas, skipped, available.full, available.trans);
}
if (logger.isTraceEnabled())
logger.trace("{}ing from {} ranges {}", description, source, StringUtils.join(remaining, ", "));
InetAddressAndPort self = FBUtilities.getBroadcastAddressAndPort();
RangesAtEndpoint full = remaining.stream()
.filter(pair -> pair.remote.isFull())
.map(pair -> pair.local)
.collect(RangesAtEndpoint.collector(self));
RangesAtEndpoint transientReplicas = remaining.stream()
.filter(pair -> pair.remote.isTransient())
.map(pair -> pair.local)
.collect(RangesAtEndpoint.collector(self));
logger.debug("Source and our replicas {}", fetchReplicas);
logger.debug("Source {} Keyspace {} streaming full {} transient {}", source, keyspace, full, transientReplicas);
/* Send messages to respective folks to stream data over to me */
streamPlan.requestRanges(source, keyspace, full, transientReplicas);
});
});
return streamPlan.execute();
}
}