blob: 6401d2a06e93ef2fab6c24e22e5f559d9cb2af85 [file] [log] [blame]
/*
* Licensed to the Apache Software Foundation (ASF) under one
* or more contributor license agreements. See the NOTICE file
* distributed with this work for additional information
* regarding copyright ownership. The ASF licenses this file
* to you under the Apache License, Version 2.0 (the
* "License"); you may not use this file except in compliance
* with the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/
package org.apache.cassandra.locator;
import com.google.common.annotations.VisibleForTesting;
import com.google.common.base.Preconditions;
import com.google.common.collect.ImmutableSet;
import org.apache.cassandra.dht.Range;
import org.apache.cassandra.dht.Token;
import java.net.UnknownHostException;
import java.util.Arrays;
import java.util.Collection;
import java.util.Collections;
import java.util.List;
import java.util.Map;
import java.util.Objects;
import java.util.Set;
import java.util.stream.Collector;
import static com.google.common.collect.Iterables.all;
import static org.apache.cassandra.locator.ReplicaCollection.Builder.Conflict.*;
/**
* A ReplicaCollection for Ranges occurring at an endpoint. All Replica will be for the same endpoint,
* and must be unique Ranges (though overlapping ranges are presently permitted, these should probably not be permitted to occur)
*/
public class RangesAtEndpoint extends AbstractReplicaCollection<RangesAtEndpoint>
{
private static ReplicaMap<Range<Token>> rangeMap(ReplicaList list) { return new ReplicaMap<>(list, Replica::range); }
private static final ReplicaMap<Range<Token>> EMPTY_MAP = rangeMap(EMPTY_LIST);
private final InetAddressAndPort endpoint;
// volatile not needed, as all of these caching collections have final members,
// besides (transitively) those that cache objects that themselves have only final members
private ReplicaMap<Range<Token>> byRange;
private RangesAtEndpoint onlyFull;
private RangesAtEndpoint onlyTransient;
private RangesAtEndpoint(InetAddressAndPort endpoint, ReplicaList list, ReplicaMap<Range<Token>> byRange)
{
super(list);
this.endpoint = endpoint;
this.byRange = byRange;
assert endpoint != null;
}
public InetAddressAndPort endpoint()
{
return endpoint;
}
@Override
public Set<InetAddressAndPort> endpoints()
{
return Collections.unmodifiableSet(list.isEmpty()
? Collections.emptySet()
: Collections.singleton(endpoint)
);
}
/**
* @return a set of all unique Ranges
* This method is threadsafe, though it is not synchronised
*/
public Set<Range<Token>> ranges()
{
return byRange().keySet();
}
/**
* @return a map of all Ranges, to their owning Replica instance
* This method is threadsafe, though it is not synchronised
*/
public Map<Range<Token>, Replica> byRange()
{
ReplicaMap<Range<Token>> map = byRange;
if (map == null)
byRange = map = rangeMap(list);
return map;
}
@Override
protected RangesAtEndpoint snapshot(ReplicaList newList)
{
if (newList.isEmpty()) return empty(endpoint);
ReplicaMap<Range<Token>> byRange = null;
if (this.byRange != null && list.isSubList(newList))
byRange = this.byRange.forSubList(newList);
return new RangesAtEndpoint(endpoint, newList, byRange);
}
@Override
public RangesAtEndpoint snapshot()
{
return this;
}
@Override
public ReplicaCollection.Builder<RangesAtEndpoint> newBuilder(int initialCapacity)
{
return new Builder(endpoint, initialCapacity);
}
@Override
public boolean contains(Replica replica)
{
return replica != null
&& Objects.equals(
byRange().get(replica.range()),
replica);
}
public RangesAtEndpoint onlyFull()
{
RangesAtEndpoint result = onlyFull;
if (result == null)
onlyFull = result = filter(Replica::isFull);
return result;
}
public RangesAtEndpoint onlyTransient()
{
RangesAtEndpoint result = onlyTransient;
if (result == null)
onlyTransient = result = filter(Replica::isTransient);
return result;
}
public boolean contains(Range<Token> range, boolean isFull)
{
Replica replica = byRange().get(range);
return replica != null && replica.isFull() == isFull;
}
/**
* @return if there are no wrap around ranges contained in this RangesAtEndpoint, return self;
* otherwise, return a RangesAtEndpoint covering the same logical portions of the ring, but with those ranges unwrapped
*/
public RangesAtEndpoint unwrap()
{
int wrapAroundCount = 0;
for (Replica replica : this)
{
if (replica.range().isWrapAround())
++wrapAroundCount;
}
assert wrapAroundCount <= 1;
if (wrapAroundCount == 0)
return snapshot();
RangesAtEndpoint.Builder builder = builder(endpoint, size() + wrapAroundCount);
for (Replica replica : this)
{
if (!replica.range().isWrapAround())
{
builder.add(replica);
continue;
}
for (Range<Token> range : replica.range().unwrap())
builder.add(replica.decorateSubrange(range));
}
return builder.build();
}
public static Collector<Replica, Builder, RangesAtEndpoint> collector(InetAddressAndPort endpoint)
{
return collector(ImmutableSet.of(), () -> new Builder(endpoint));
}
public static class Builder extends RangesAtEndpoint implements ReplicaCollection.Builder<RangesAtEndpoint>
{
boolean built;
public Builder(InetAddressAndPort endpoint) { this(endpoint, 0); }
public Builder(InetAddressAndPort endpoint, int capacity) { this(endpoint, new ReplicaList(capacity)); }
private Builder(InetAddressAndPort endpoint, ReplicaList list) { super(endpoint, list, rangeMap(list)); }
public RangesAtEndpoint.Builder add(Replica replica)
{
return add(replica, Conflict.DUPLICATE);
}
public RangesAtEndpoint.Builder add(Replica replica, Conflict ignoreConflict)
{
if (built) throw new IllegalStateException();
Preconditions.checkNotNull(replica);
if (!Objects.equals(super.endpoint, replica.endpoint()))
throw new IllegalArgumentException("Replica " + replica + " has incorrect endpoint (expected " + super.endpoint + ")");
if (!super.byRange.internalPutIfAbsent(replica, list.size()))
{
switch (ignoreConflict)
{
case DUPLICATE:
if (byRange().get(replica.range()).equals(replica))
break;
case NONE:
throw new IllegalArgumentException("Conflicting replica added (expected unique ranges): "
+ replica + "; existing: " + byRange().get(replica.range()));
case ALL:
}
return this;
}
list.add(replica);
return this;
}
@Override
public RangesAtEndpoint snapshot()
{
return snapshot(list.subList(0, list.size()));
}
public RangesAtEndpoint build()
{
built = true;
return new RangesAtEndpoint(super.endpoint, super.list, super.byRange);
}
}
public static Builder builder(InetAddressAndPort endpoint)
{
return new Builder(endpoint);
}
public static Builder builder(InetAddressAndPort endpoint, int capacity)
{
return new Builder(endpoint, capacity);
}
public static RangesAtEndpoint empty(InetAddressAndPort endpoint)
{
return new RangesAtEndpoint(endpoint, EMPTY_LIST, EMPTY_MAP);
}
public static RangesAtEndpoint of(Replica replica)
{
ReplicaList one = new ReplicaList(1);
one.add(replica);
return new RangesAtEndpoint(replica.endpoint(), one, rangeMap(one));
}
public static RangesAtEndpoint of(Replica ... replicas)
{
return copyOf(Arrays.asList(replicas));
}
public static RangesAtEndpoint copyOf(List<Replica> replicas)
{
if (replicas.isEmpty())
throw new IllegalArgumentException("Must specify a non-empty collection of replicas");
return builder(replicas.get(0).endpoint(), replicas.size()).addAll(replicas).build();
}
/**
* Use of this method to synthesize Replicas is almost always wrong. In repair it turns out the concerns of transient
* vs non-transient are handled at a higher level, but eventually repair needs to ask streaming to actually move
* the data and at that point it doesn't have a great handle on what the replicas are and it doesn't really matter.
*
* Streaming expects to be given Replicas with each replica indicating what type of data (transient or not transient)
* should be sent.
*
* So in this one instance we can lie to streaming and pretend all the replicas are full and use a dummy address
* and it doesn't matter because streaming doesn't rely on the address for anything other than debugging and full
* is a valid value for transientness because streaming is selecting candidate tables from the repair/unrepaired
* set already.
* @param ranges
* @return
*/
@VisibleForTesting
public static RangesAtEndpoint toDummyList(Collection<Range<Token>> ranges)
{
InetAddressAndPort dummy;
try
{
dummy = InetAddressAndPort.getByNameOverrideDefaults("0.0.0.0", 0);
}
catch (UnknownHostException e)
{
throw new RuntimeException(e);
}
//For repair we are less concerned with full vs transient since repair is already dealing with those concerns.
//Always say full and then if the repair is incremental or not will determine what is streamed.
return ranges.stream()
.map(range -> new Replica(dummy, range, true))
.collect(collector(dummy));
}
public static boolean isDummyList(RangesAtEndpoint ranges)
{
return all(ranges, range -> range.endpoint().getHostAddress(true).equals("0.0.0.0:0"));
}
/**
* @return concatenate two DISJOINT collections together
*/
public static RangesAtEndpoint concat(RangesAtEndpoint replicas, RangesAtEndpoint extraReplicas)
{
return AbstractReplicaCollection.concat(replicas, extraReplicas, NONE);
}
}