| /* |
| * Licensed to the Apache Software Foundation (ASF) under one |
| * or more contributor license agreements. See the NOTICE file |
| * distributed with this work for additional information |
| * regarding copyright ownership. The ASF licenses this file |
| * to you under the Apache License, Version 2.0 (the |
| * "License"); you may not use this file except in compliance |
| * with the License. You may obtain a copy of the License at |
| * |
| * http://www.apache.org/licenses/LICENSE-2.0 |
| * |
| * Unless required by applicable law or agreed to in writing, software |
| * distributed under the License is distributed on an "AS IS" BASIS, |
| * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. |
| * See the License for the specific language governing permissions and |
| * limitations under the License. |
| */ |
| |
| package org.apache.cassandra.locator; |
| |
| import com.google.common.collect.Iterables; |
| import org.apache.cassandra.db.ConsistencyLevel; |
| import org.apache.cassandra.db.Keyspace; |
| import org.apache.cassandra.db.PartitionPosition; |
| import org.apache.cassandra.dht.AbstractBounds; |
| |
| import java.util.function.Predicate; |
| |
| public abstract class ReplicaPlan<E extends Endpoints<E>> |
| { |
| protected final Keyspace keyspace; |
| protected final ConsistencyLevel consistencyLevel; |
| // The snapshot of the replication strategy when instantiating. |
| // It could be different than the one fetched from Keyspace later, e.g. RS altered during the query. |
| // Use the snapshot to calculate {@code blockFor} in order to have a consistent view of RS for the query. |
| protected final AbstractReplicationStrategy replicationStrategy; |
| |
| // all nodes we will contact via any mechanism, including hints |
| // i.e., for: |
| // - reads, only live natural replicas |
| // ==> live.natural().subList(0, blockFor + initial speculate) |
| // - writes, includes all full, and any pending replicas, (and only any necessary transient ones to make up the difference) |
| // ==> liveAndDown.natural().filter(isFull) ++ liveAndDown.pending() ++ live.natural.filter(isTransient, req) |
| // - paxos, includes all live replicas (natural+pending), for this DC if SERIAL_LOCAL |
| // ==> live.all() (if consistencyLevel.isDCLocal(), then .filter(consistencyLevel.isLocal)) |
| private final E contacts; |
| |
| ReplicaPlan(Keyspace keyspace, AbstractReplicationStrategy replicationStrategy, ConsistencyLevel consistencyLevel, E contacts) |
| { |
| assert contacts != null; |
| this.keyspace = keyspace; |
| this.replicationStrategy = replicationStrategy; |
| this.consistencyLevel = consistencyLevel; |
| this.contacts = contacts; |
| } |
| |
| public abstract int blockFor(); |
| |
| public E contacts() { return contacts; } |
| |
| // TODO: should this semantically return true if we contain the endpoint, not the exact replica? |
| public boolean contacts(Replica replica) { return contacts.contains(replica); } |
| public Keyspace keyspace() { return keyspace; } |
| public AbstractReplicationStrategy replicationStrategy() { return replicationStrategy; } |
| public ConsistencyLevel consistencyLevel() { return consistencyLevel; } |
| |
| public static abstract class ForRead<E extends Endpoints<E>> extends ReplicaPlan<E> |
| { |
| // all nodes we *could* contacts; typically all natural replicas that are believed to be alive |
| // we will consult this collection to find uncontacted nodes we might contact if we doubt we will meet consistency level |
| private final E candidates; |
| |
| ForRead(Keyspace keyspace, AbstractReplicationStrategy replicationStrategy, ConsistencyLevel consistencyLevel, E candidates, E contacts) |
| { |
| super(keyspace, replicationStrategy, consistencyLevel, contacts); |
| this.candidates = candidates; |
| } |
| |
| public int blockFor() { return consistencyLevel.blockFor(replicationStrategy); } |
| |
| public E candidates() { return candidates; } |
| |
| public Replica firstUncontactedCandidate(Predicate<Replica> extraPredicate) |
| { |
| return Iterables.tryFind(candidates(), r -> extraPredicate.test(r) && !contacts(r)).orNull(); |
| } |
| |
| public Replica lookup(InetAddressAndPort endpoint) |
| { |
| return candidates().byEndpoint().get(endpoint); |
| } |
| |
| public String toString() |
| { |
| return "ReplicaPlan.ForRead [ CL: " + consistencyLevel + " keyspace: " + keyspace + " candidates: " + candidates + " contacts: " + contacts() + " ]"; |
| } |
| } |
| |
| public static class ForTokenRead extends ForRead<EndpointsForToken> |
| { |
| public ForTokenRead(Keyspace keyspace, |
| AbstractReplicationStrategy replicationStrategy, |
| ConsistencyLevel consistencyLevel, |
| EndpointsForToken candidates, |
| EndpointsForToken contacts) |
| { |
| super(keyspace, replicationStrategy, consistencyLevel, candidates, contacts); |
| } |
| |
| ForTokenRead withContact(EndpointsForToken newContact) |
| { |
| return new ForTokenRead(keyspace, replicationStrategy, consistencyLevel, candidates(), newContact); |
| } |
| } |
| |
| public static class ForRangeRead extends ForRead<EndpointsForRange> |
| { |
| final AbstractBounds<PartitionPosition> range; |
| final int vnodeCount; |
| |
| public ForRangeRead(Keyspace keyspace, |
| AbstractReplicationStrategy replicationStrategy, |
| ConsistencyLevel consistencyLevel, |
| AbstractBounds<PartitionPosition> range, |
| EndpointsForRange candidates, |
| EndpointsForRange contact, |
| int vnodeCount) |
| { |
| super(keyspace, replicationStrategy, consistencyLevel, candidates, contact); |
| this.range = range; |
| this.vnodeCount = vnodeCount; |
| } |
| |
| public AbstractBounds<PartitionPosition> range() { return range; } |
| |
| /** |
| * @return number of vnode ranges covered by the range |
| */ |
| public int vnodeCount() { return vnodeCount; } |
| |
| ForRangeRead withContact(EndpointsForRange newContact) |
| { |
| return new ForRangeRead(keyspace, replicationStrategy, consistencyLevel, range, candidates(), newContact, vnodeCount); |
| } |
| } |
| |
| public static abstract class ForWrite<E extends Endpoints<E>> extends ReplicaPlan<E> |
| { |
| // TODO: this is only needed because of poor isolation of concerns elsewhere - we can remove it soon, and will do so in a follow-up patch |
| final E pending; |
| final E liveAndDown; |
| final E live; |
| |
| ForWrite(Keyspace keyspace, AbstractReplicationStrategy replicationStrategy, ConsistencyLevel consistencyLevel, E pending, E liveAndDown, E live, E contact) |
| { |
| super(keyspace, replicationStrategy, consistencyLevel, contact); |
| this.pending = pending; |
| this.liveAndDown = liveAndDown; |
| this.live = live; |
| } |
| |
| public int blockFor() { return consistencyLevel.blockForWrite(replicationStrategy, pending()); } |
| |
| /** Replicas that a region of the ring is moving to; not yet ready to serve reads, but should receive writes */ |
| public E pending() { return pending; } |
| /** Replicas that can participate in the write - this always includes all nodes (pending and natural) in all DCs, except for paxos LOCAL_QUORUM (which is local DC only) */ |
| public E liveAndDown() { return liveAndDown; } |
| /** The live replicas present in liveAndDown, usually derived from FailureDetector.isReplicaAlive */ |
| public E live() { return live; } |
| /** Calculate which live endpoints we could have contacted, but chose not to */ |
| public E liveUncontacted() { return live().filter(r -> !contacts(r)); } |
| /** Test liveness, consistent with the upfront analysis done for this operation (i.e. test membership of live()) */ |
| public boolean isAlive(Replica replica) { return live.endpoints().contains(replica.endpoint()); } |
| public Replica lookup(InetAddressAndPort endpoint) |
| { |
| return liveAndDown().byEndpoint().get(endpoint); |
| } |
| |
| public String toString() |
| { |
| return "ReplicaPlan.ForWrite [ CL: " + consistencyLevel + " keyspace: " + keyspace + " liveAndDown: " + liveAndDown + " live: " + live + " contacts: " + contacts() + " ]"; |
| } |
| } |
| |
| public static class ForTokenWrite extends ForWrite<EndpointsForToken> |
| { |
| public ForTokenWrite(Keyspace keyspace, AbstractReplicationStrategy replicationStrategy, ConsistencyLevel consistencyLevel, EndpointsForToken pending, EndpointsForToken liveAndDown, EndpointsForToken live, EndpointsForToken contact) |
| { |
| super(keyspace, replicationStrategy, consistencyLevel, pending, liveAndDown, live, contact); |
| } |
| |
| private ReplicaPlan.ForTokenWrite copy(ConsistencyLevel newConsistencyLevel, EndpointsForToken newContact) |
| { |
| return new ReplicaPlan.ForTokenWrite(keyspace, replicationStrategy, newConsistencyLevel, pending(), liveAndDown(), live(), newContact); |
| } |
| |
| ForTokenWrite withConsistencyLevel(ConsistencyLevel newConsistencylevel) { return copy(newConsistencylevel, contacts()); } |
| public ForTokenWrite withContact(EndpointsForToken newContact) { return copy(consistencyLevel, newContact); } |
| } |
| |
| public static class ForPaxosWrite extends ForWrite<EndpointsForToken> |
| { |
| final int requiredParticipants; |
| |
| ForPaxosWrite(Keyspace keyspace, ConsistencyLevel consistencyLevel, EndpointsForToken pending, EndpointsForToken liveAndDown, EndpointsForToken live, EndpointsForToken contact, int requiredParticipants) |
| { |
| super(keyspace, keyspace.getReplicationStrategy(), consistencyLevel, pending, liveAndDown, live, contact); |
| this.requiredParticipants = requiredParticipants; |
| } |
| |
| public int requiredParticipants() { return requiredParticipants; } |
| } |
| |
| /** |
| * Used by AbstractReadExecutor, {Data,Digest}Resolver and ReadRepair to share a ReplicaPlan whose 'contacts' replicas |
| * we progressively modify via various forms of speculation (initial speculation, rr-read and rr-write) |
| * |
| * The internal reference is not volatile, despite being shared between threads. The initial reference provided to |
| * the constructor should be visible by the normal process of sharing data between threads (i.e. executors, etc) |
| * and any updates will either be seen or not seen, perhaps not promptly, but certainly not incompletely. |
| * The contained ReplicaPlan has only final member properties, so it cannot be seen partially initialised. |
| */ |
| public interface Shared<E extends Endpoints<E>, P extends ReplicaPlan<E>> |
| { |
| /** |
| * add the provided replica to this shared plan, by updating the internal reference |
| */ |
| public void addToContacts(Replica replica); |
| /** |
| * get the shared replica plan, non-volatile (so maybe stale) but no risk of partially initialised |
| */ |
| public P get(); |
| /** |
| * get the shared replica plan, non-volatile (so maybe stale) but no risk of partially initialised, |
| * but replace its 'contacts' with those provided |
| */ |
| public abstract P getWithContacts(E endpoints); |
| } |
| |
| public static class SharedForTokenRead implements Shared<EndpointsForToken, ForTokenRead> |
| { |
| private ForTokenRead replicaPlan; |
| SharedForTokenRead(ForTokenRead replicaPlan) { this.replicaPlan = replicaPlan; } |
| public void addToContacts(Replica replica) { replicaPlan = replicaPlan.withContact(Endpoints.append(replicaPlan.contacts(), replica)); } |
| public ForTokenRead get() { return replicaPlan; } |
| public ForTokenRead getWithContacts(EndpointsForToken newContact) { return replicaPlan.withContact(newContact); } |
| } |
| |
| public static class SharedForRangeRead implements Shared<EndpointsForRange, ForRangeRead> |
| { |
| private ForRangeRead replicaPlan; |
| SharedForRangeRead(ForRangeRead replicaPlan) { this.replicaPlan = replicaPlan; } |
| public void addToContacts(Replica replica) { replicaPlan = replicaPlan.withContact(Endpoints.append(replicaPlan.contacts(), replica)); } |
| public ForRangeRead get() { return replicaPlan; } |
| public ForRangeRead getWithContacts(EndpointsForRange newContact) { return replicaPlan.withContact(newContact); } |
| } |
| |
| public static SharedForTokenRead shared(ForTokenRead replicaPlan) { return new SharedForTokenRead(replicaPlan); } |
| public static SharedForRangeRead shared(ForRangeRead replicaPlan) { return new SharedForRangeRead(replicaPlan); } |
| |
| } |