| /* |
| * Licensed to the Apache Software Foundation (ASF) under one |
| * or more contributor license agreements. See the NOTICE file |
| * distributed with this work for additional information |
| * regarding copyright ownership. The ASF licenses this file |
| * to you under the Apache License, Version 2.0 (the |
| * "License"); you may not use this file except in compliance |
| * with the License. You may obtain a copy of the License at |
| * |
| * http://www.apache.org/licenses/LICENSE-2.0 |
| * |
| * Unless required by applicable law or agreed to in writing, software |
| * distributed under the License is distributed on an "AS IS" BASIS, |
| * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. |
| * See the License for the specific language governing permissions and |
| * limitations under the License. |
| */ |
| package org.apache.cassandra.service; |
| |
| import java.nio.ByteBuffer; |
| import java.util.ArrayList; |
| import java.util.Arrays; |
| import java.util.Collection; |
| import java.util.HashMap; |
| import java.util.HashSet; |
| import java.util.List; |
| import java.util.Map; |
| import java.util.Objects; |
| import java.util.Optional; |
| import java.util.Set; |
| import java.util.UUID; |
| import java.util.concurrent.ConcurrentHashMap; |
| import java.util.concurrent.Future; |
| import java.util.concurrent.ThreadLocalRandom; |
| import java.util.concurrent.TimeUnit; |
| import java.util.concurrent.TimeoutException; |
| import java.util.concurrent.atomic.AtomicInteger; |
| import java.util.concurrent.atomic.AtomicLong; |
| import java.util.function.Function; |
| import java.util.stream.Collectors; |
| |
| import com.google.common.base.Preconditions; |
| import com.google.common.cache.CacheLoader; |
| import com.google.common.collect.Iterables; |
| import com.google.common.util.concurrent.Uninterruptibles; |
| |
| import org.apache.cassandra.config.Config; |
| import org.apache.cassandra.service.paxos.*; |
| import org.apache.cassandra.service.paxos.Paxos; |
| import org.apache.cassandra.utils.TimeUUID; |
| import org.apache.cassandra.utils.concurrent.CountDownLatch; |
| |
| import org.slf4j.Logger; |
| import org.slf4j.LoggerFactory; |
| |
| import org.apache.cassandra.batchlog.Batch; |
| import org.apache.cassandra.batchlog.BatchlogManager; |
| import org.apache.cassandra.concurrent.Stage; |
| import org.apache.cassandra.config.CassandraRelevantProperties; |
| import org.apache.cassandra.config.DatabaseDescriptor; |
| import org.apache.cassandra.db.ColumnFamilyStore; |
| import org.apache.cassandra.db.ConsistencyLevel; |
| import org.apache.cassandra.db.CounterMutation; |
| import org.apache.cassandra.db.DecoratedKey; |
| import org.apache.cassandra.db.IMutation; |
| import org.apache.cassandra.db.Keyspace; |
| import org.apache.cassandra.db.MessageParams; |
| import org.apache.cassandra.db.Mutation; |
| import org.apache.cassandra.db.PartitionRangeReadCommand; |
| import org.apache.cassandra.db.ReadCommand; |
| import org.apache.cassandra.db.ReadExecutionController; |
| import org.apache.cassandra.db.ReadResponse; |
| import org.apache.cassandra.db.RejectException; |
| import org.apache.cassandra.db.SinglePartitionReadCommand; |
| import org.apache.cassandra.db.TruncateRequest; |
| import org.apache.cassandra.db.WriteType; |
| import org.apache.cassandra.db.filter.TombstoneOverwhelmingException; |
| import org.apache.cassandra.db.partitions.FilteredPartition; |
| import org.apache.cassandra.db.partitions.PartitionIterator; |
| import org.apache.cassandra.db.partitions.PartitionIterators; |
| import org.apache.cassandra.db.partitions.PartitionUpdate; |
| import org.apache.cassandra.db.partitions.UnfilteredPartitionIterator; |
| import org.apache.cassandra.db.rows.RowIterator; |
| import org.apache.cassandra.db.view.ViewUtils; |
| import org.apache.cassandra.dht.Token; |
| import org.apache.cassandra.exceptions.CasWriteTimeoutException; |
| import org.apache.cassandra.exceptions.CasWriteUnknownResultException; |
| import org.apache.cassandra.exceptions.InvalidRequestException; |
| import org.apache.cassandra.exceptions.IsBootstrappingException; |
| import org.apache.cassandra.exceptions.OverloadedException; |
| import org.apache.cassandra.exceptions.ReadAbortException; |
| import org.apache.cassandra.exceptions.ReadFailureException; |
| import org.apache.cassandra.exceptions.ReadTimeoutException; |
| import org.apache.cassandra.exceptions.RequestFailureException; |
| import org.apache.cassandra.exceptions.RequestFailureReason; |
| import org.apache.cassandra.exceptions.RequestTimeoutException; |
| import org.apache.cassandra.exceptions.UnavailableException; |
| import org.apache.cassandra.exceptions.WriteFailureException; |
| import org.apache.cassandra.exceptions.WriteTimeoutException; |
| import org.apache.cassandra.gms.FailureDetector; |
| import org.apache.cassandra.gms.Gossiper; |
| import org.apache.cassandra.hints.Hint; |
| import org.apache.cassandra.hints.HintsService; |
| import org.apache.cassandra.locator.AbstractReplicationStrategy; |
| import org.apache.cassandra.locator.EndpointsForToken; |
| import org.apache.cassandra.locator.IEndpointSnitch; |
| import org.apache.cassandra.locator.InetAddressAndPort; |
| import org.apache.cassandra.locator.Replica; |
| import org.apache.cassandra.locator.ReplicaLayout; |
| import org.apache.cassandra.locator.ReplicaPlan; |
| import org.apache.cassandra.locator.ReplicaPlans; |
| import org.apache.cassandra.locator.Replicas; |
| import org.apache.cassandra.metrics.CASClientRequestMetrics; |
| import org.apache.cassandra.metrics.DenylistMetrics; |
| import org.apache.cassandra.metrics.ReadRepairMetrics; |
| import org.apache.cassandra.metrics.StorageMetrics; |
| import org.apache.cassandra.net.ForwardingInfo; |
| import org.apache.cassandra.net.Message; |
| import org.apache.cassandra.net.MessageFlag; |
| import org.apache.cassandra.net.MessagingService; |
| import org.apache.cassandra.net.RequestCallback; |
| import org.apache.cassandra.net.Verb; |
| import org.apache.cassandra.schema.PartitionDenylist; |
| import org.apache.cassandra.schema.Schema; |
| import org.apache.cassandra.schema.SchemaConstants; |
| import org.apache.cassandra.schema.TableId; |
| import org.apache.cassandra.schema.TableMetadata; |
| import org.apache.cassandra.service.paxos.v1.PrepareCallback; |
| import org.apache.cassandra.service.paxos.v1.ProposeCallback; |
| import org.apache.cassandra.service.reads.AbstractReadExecutor; |
| import org.apache.cassandra.service.reads.ReadCallback; |
| import org.apache.cassandra.service.reads.range.RangeCommands; |
| import org.apache.cassandra.service.reads.repair.ReadRepair; |
| import org.apache.cassandra.tracing.Tracing; |
| import org.apache.cassandra.triggers.TriggerExecutor; |
| import org.apache.cassandra.utils.Clock; |
| import org.apache.cassandra.utils.FBUtilities; |
| import org.apache.cassandra.utils.MBeanWrapper; |
| import org.apache.cassandra.utils.MonotonicClock; |
| import org.apache.cassandra.utils.NoSpamLogger; |
| import org.apache.cassandra.utils.Pair; |
| import org.apache.cassandra.utils.concurrent.UncheckedInterruptedException; |
| |
| import static com.google.common.collect.Iterables.concat; |
| import static java.util.concurrent.TimeUnit.MILLISECONDS; |
| import static java.util.concurrent.TimeUnit.NANOSECONDS; |
| import static org.apache.cassandra.db.ConsistencyLevel.SERIAL; |
| import static org.apache.cassandra.net.Message.out; |
| import static org.apache.cassandra.metrics.ClientRequestsMetricsHolder.casReadMetrics; |
| import static org.apache.cassandra.metrics.ClientRequestsMetricsHolder.casWriteMetrics; |
| import static org.apache.cassandra.metrics.ClientRequestsMetricsHolder.readMetrics; |
| import static org.apache.cassandra.metrics.ClientRequestsMetricsHolder.readMetricsForLevel; |
| import static org.apache.cassandra.metrics.ClientRequestsMetricsHolder.viewWriteMetrics; |
| import static org.apache.cassandra.metrics.ClientRequestsMetricsHolder.writeMetrics; |
| import static org.apache.cassandra.metrics.ClientRequestsMetricsHolder.writeMetricsForLevel; |
| import static org.apache.cassandra.net.NoPayload.noPayload; |
| import static org.apache.cassandra.net.Verb.*; |
| import static org.apache.cassandra.service.BatchlogResponseHandler.BatchlogCleanup; |
| import static org.apache.cassandra.service.paxos.Ballot.Flag.GLOBAL; |
| import static org.apache.cassandra.service.paxos.Ballot.Flag.LOCAL; |
| import static org.apache.cassandra.service.paxos.BallotGenerator.Global.nextBallot; |
| import static org.apache.cassandra.service.paxos.v1.PrepareVerbHandler.doPrepare; |
| import static org.apache.cassandra.service.paxos.v1.ProposeVerbHandler.doPropose; |
| import static org.apache.cassandra.utils.Clock.Global.currentTimeMillis; |
| import static org.apache.cassandra.utils.Clock.Global.nanoTime; |
| import static org.apache.cassandra.utils.TimeUUID.Generator.nextTimeUUID; |
| import static org.apache.cassandra.utils.concurrent.CountDownLatch.newCountDownLatch; |
| import static org.apache.commons.lang3.StringUtils.join; |
| |
| public class StorageProxy implements StorageProxyMBean |
| { |
| public static final String MBEAN_NAME = "org.apache.cassandra.db:type=StorageProxy"; |
| private static final Logger logger = LoggerFactory.getLogger(StorageProxy.class); |
| |
| public static final String UNREACHABLE = "UNREACHABLE"; |
| |
| private static final int FAILURE_LOGGING_INTERVAL_SECONDS = CassandraRelevantProperties.FAILURE_LOGGING_INTERVAL_SECONDS.getInt(); |
| |
| private static final WritePerformer standardWritePerformer; |
| private static final WritePerformer counterWritePerformer; |
| private static final WritePerformer counterWriteOnCoordinatorPerformer; |
| |
| public static final StorageProxy instance = new StorageProxy(); |
| |
| private static volatile int maxHintsInProgress = 128 * FBUtilities.getAvailableProcessors(); |
| private static final CacheLoader<InetAddressAndPort, AtomicInteger> hintsInProgress = new CacheLoader<InetAddressAndPort, AtomicInteger>() |
| { |
| public AtomicInteger load(InetAddressAndPort inetAddress) |
| { |
| return new AtomicInteger(0); |
| } |
| }; |
| |
| private static final DenylistMetrics denylistMetrics = new DenylistMetrics(); |
| |
| private static final PartitionDenylist partitionDenylist = new PartitionDenylist(); |
| |
| private volatile long logBlockingReadRepairAttemptsUntilNanos = Long.MIN_VALUE; |
| |
| private StorageProxy() |
| { |
| } |
| |
| static |
| { |
| MBeanWrapper.instance.registerMBean(instance, MBEAN_NAME); |
| HintsService.instance.registerMBean(); |
| |
| standardWritePerformer = (mutation, targets, responseHandler, localDataCenter) -> |
| { |
| assert mutation instanceof Mutation; |
| sendToHintedReplicas((Mutation) mutation, targets, responseHandler, localDataCenter, Stage.MUTATION); |
| }; |
| |
| /* |
| * We execute counter writes in 2 places: either directly in the coordinator node if it is a replica, or |
| * in CounterMutationVerbHandler on a replica othewise. The write must be executed on the COUNTER_MUTATION stage |
| * but on the latter case, the verb handler already run on the COUNTER_MUTATION stage, so we must not execute the |
| * underlying on the stage otherwise we risk a deadlock. Hence two different performer. |
| */ |
| counterWritePerformer = (mutation, targets, responseHandler, localDataCenter) -> |
| { |
| EndpointsForToken selected = targets.contacts().withoutSelf(); |
| Replicas.temporaryAssertFull(selected); // TODO CASSANDRA-14548 |
| counterWriteTask(mutation, targets.withContacts(selected), responseHandler, localDataCenter).run(); |
| }; |
| |
| counterWriteOnCoordinatorPerformer = (mutation, targets, responseHandler, localDataCenter) -> |
| { |
| EndpointsForToken selected = targets.contacts().withoutSelf(); |
| Replicas.temporaryAssertFull(selected); // TODO CASSANDRA-14548 |
| Stage.COUNTER_MUTATION.executor() |
| .execute(counterWriteTask(mutation, targets.withContacts(selected), responseHandler, localDataCenter)); |
| }; |
| |
| |
| ReadRepairMetrics.init(); |
| |
| if (!Paxos.isLinearizable()) |
| { |
| logger.warn("This node was started with paxos variant {}. SERIAL (and LOCAL_SERIAL) reads coordinated by this node " + |
| "will not offer linearizability (see CASSANDRA-12126 for details on what this means) with " + |
| "respect to other SERIAL operations. Please note that with this variant, SERIAL reads will be " + |
| "slower than QUORUM reads, yet offer no additional guarantees. This flag should only be used in " + |
| "the restricted case of upgrading from a pre-CASSANDRA-12126 version, and only if you " + |
| "understand the tradeoff.", Paxos.getPaxosVariant()); |
| } |
| } |
| |
| /** |
| * Apply @param updates if and only if the current values in the row for @param key |
| * match the provided @param conditions. The algorithm is "raw" Paxos: that is, Paxos |
| * minus leader election -- any node in the cluster may propose changes for any row, |
| * which (that is, the row) is the unit of values being proposed, not single columns. |
| * |
| * The Paxos cohort is only the replicas for the given key, not the entire cluster. |
| * So we expect performance to be reasonable, but CAS is still intended to be used |
| * "when you really need it," not for all your updates. |
| * |
| * There are three phases to Paxos: |
| * 1. Prepare: the coordinator generates a ballot (timeUUID in our case) and asks replicas to (a) promise |
| * not to accept updates from older ballots and (b) tell us about the most recent update it has already |
| * accepted. |
| * 2. Accept: if a majority of replicas respond, the coordinator asks replicas to accept the value of the |
| * highest proposal ballot it heard about, or a new value if no in-progress proposals were reported. |
| * 3. Commit (Learn): if a majority of replicas acknowledge the accept request, we can commit the new |
| * value. |
| * |
| * Commit procedure is not covered in "Paxos Made Simple," and only briefly mentioned in "Paxos Made Live," |
| * so here is our approach: |
| * 3a. The coordinator sends a commit message to all replicas with the ballot and value. |
| * 3b. Because of 1-2, this will be the highest-seen commit ballot. The replicas will note that, |
| * and send it with subsequent promise replies. This allows us to discard acceptance records |
| * for successfully committed replicas, without allowing incomplete proposals to commit erroneously |
| * later on. |
| * |
| * Note that since we are performing a CAS rather than a simple update, we perform a read (of committed |
| * values) between the prepare and accept phases. This gives us a slightly longer window for another |
| * coordinator to come along and trump our own promise with a newer one but is otherwise safe. |
| * |
| * @param keyspaceName the keyspace for the CAS |
| * @param cfName the column family for the CAS |
| * @param key the row key for the row to CAS |
| * @param request the conditions for the CAS to apply as well as the update to perform if the conditions hold. |
| * @param consistencyForPaxos the consistency for the paxos prepare and propose round. This can only be either SERIAL or LOCAL_SERIAL. |
| * @param consistencyForCommit the consistency for write done during the commit phase. This can be anything, except SERIAL or LOCAL_SERIAL. |
| * |
| * @return null if the operation succeeds in updating the row, or the current values corresponding to conditions. |
| * (since, if the CAS doesn't succeed, it means the current value do not match the conditions). |
| */ |
| public static RowIterator cas(String keyspaceName, |
| String cfName, |
| DecoratedKey key, |
| CASRequest request, |
| ConsistencyLevel consistencyForPaxos, |
| ConsistencyLevel consistencyForCommit, |
| ClientState clientState, |
| int nowInSeconds, |
| long queryStartNanoTime) |
| throws UnavailableException, IsBootstrappingException, RequestFailureException, RequestTimeoutException, InvalidRequestException, CasWriteUnknownResultException |
| { |
| if (DatabaseDescriptor.getPartitionDenylistEnabled() && DatabaseDescriptor.getDenylistWritesEnabled() && !partitionDenylist.isKeyPermitted(keyspaceName, cfName, key.getKey())) |
| { |
| denylistMetrics.incrementWritesRejected(); |
| throw new InvalidRequestException(String.format("Unable to CAS write to denylisted partition [0x%s] in %s/%s", |
| key.toString(), keyspaceName, cfName)); |
| } |
| |
| return Paxos.useV2() |
| ? Paxos.cas(key, request, consistencyForPaxos, consistencyForCommit, clientState) |
| : legacyCas(keyspaceName, cfName, key, request, consistencyForPaxos, consistencyForCommit, clientState, nowInSeconds, queryStartNanoTime); |
| } |
| |
| public static RowIterator legacyCas(String keyspaceName, |
| String cfName, |
| DecoratedKey key, |
| CASRequest request, |
| ConsistencyLevel consistencyForPaxos, |
| ConsistencyLevel consistencyForCommit, |
| ClientState clientState, |
| int nowInSeconds, |
| long queryStartNanoTime) |
| throws UnavailableException, IsBootstrappingException, RequestFailureException, RequestTimeoutException, InvalidRequestException |
| { |
| final long startTimeForMetrics = nanoTime(); |
| try |
| { |
| TableMetadata metadata = Schema.instance.validateTable(keyspaceName, cfName); |
| |
| Function<Ballot, Pair<PartitionUpdate, RowIterator>> updateProposer = ballot -> |
| { |
| // read the current values and check they validate the conditions |
| Tracing.trace("Reading existing values for CAS precondition"); |
| SinglePartitionReadCommand readCommand = (SinglePartitionReadCommand) request.readCommand(nowInSeconds); |
| ConsistencyLevel readConsistency = consistencyForPaxos == ConsistencyLevel.LOCAL_SERIAL ? ConsistencyLevel.LOCAL_QUORUM : ConsistencyLevel.QUORUM; |
| |
| FilteredPartition current; |
| try (RowIterator rowIter = readOne(readCommand, readConsistency, queryStartNanoTime)) |
| { |
| current = FilteredPartition.create(rowIter); |
| } |
| |
| if (!request.appliesTo(current)) |
| { |
| Tracing.trace("CAS precondition does not match current values {}", current); |
| casWriteMetrics.conditionNotMet.inc(); |
| return Pair.create(PartitionUpdate.emptyUpdate(metadata, key), current.rowIterator()); |
| } |
| |
| // Create the desired updates |
| PartitionUpdate updates = request.makeUpdates(current, clientState, ballot); |
| |
| long size = updates.dataSize(); |
| casWriteMetrics.mutationSize.update(size); |
| writeMetricsForLevel(consistencyForPaxos).mutationSize.update(size); |
| |
| // Apply triggers to cas updates. A consideration here is that |
| // triggers emit Mutations, and so a given trigger implementation |
| // may generate mutations for partitions other than the one this |
| // paxos round is scoped for. In this case, TriggerExecutor will |
| // validate that the generated mutations are targetted at the same |
| // partition as the initial updates and reject (via an |
| // InvalidRequestException) any which aren't. |
| updates = TriggerExecutor.instance.execute(updates); |
| |
| return Pair.create(updates, null); |
| }; |
| |
| return doPaxos(metadata, |
| key, |
| consistencyForPaxos, |
| consistencyForCommit, |
| consistencyForCommit, |
| queryStartNanoTime, |
| casWriteMetrics, |
| updateProposer); |
| |
| } |
| catch (CasWriteUnknownResultException e) |
| { |
| casWriteMetrics.unknownResult.mark(); |
| throw e; |
| } |
| catch (CasWriteTimeoutException wte) |
| { |
| casWriteMetrics.timeouts.mark(); |
| writeMetricsForLevel(consistencyForPaxos).timeouts.mark(); |
| throw new CasWriteTimeoutException(wte.writeType, wte.consistency, wte.received, wte.blockFor, wte.contentions); |
| } |
| catch (ReadTimeoutException e) |
| { |
| casWriteMetrics.timeouts.mark(); |
| writeMetricsForLevel(consistencyForPaxos).timeouts.mark(); |
| throw e; |
| } |
| catch (ReadAbortException e) |
| { |
| casWriteMetrics.markAbort(e); |
| writeMetricsForLevel(consistencyForPaxos).markAbort(e); |
| throw e; |
| } |
| catch (WriteFailureException | ReadFailureException e) |
| { |
| casWriteMetrics.failures.mark(); |
| writeMetricsForLevel(consistencyForPaxos).failures.mark(); |
| throw e; |
| } |
| catch (UnavailableException e) |
| { |
| casWriteMetrics.unavailables.mark(); |
| writeMetricsForLevel(consistencyForPaxos).unavailables.mark(); |
| throw e; |
| } |
| finally |
| { |
| final long latency = nanoTime() - startTimeForMetrics; |
| casWriteMetrics.addNano(latency); |
| writeMetricsForLevel(consistencyForPaxos).addNano(latency); |
| } |
| } |
| |
| private static void recordCasContention(TableMetadata table, |
| DecoratedKey key, |
| CASClientRequestMetrics casMetrics, |
| int contentions) |
| { |
| if (contentions == 0) |
| return; |
| |
| casMetrics.contention.update(contentions); |
| Keyspace.open(table.keyspace) |
| .getColumnFamilyStore(table.name) |
| .metric |
| .topCasPartitionContention |
| .addSample(key.getKey(), contentions); |
| } |
| |
| /** |
| * Performs the Paxos rounds for a given proposal, retrying when preempted until the timeout. |
| * |
| * <p>The main 'configurable' of this method is the {@code createUpdateProposal} method: it is called by the method |
| * once a ballot has been successfully 'prepared' to generate the update to 'propose' (and commit if the proposal is |
| * successful). That method also generates the result that the whole method will return. Note that due to retrying, |
| * this method may be called multiple times and does not have to return the same results. |
| * |
| * @param metadata the table to update with Paxos. |
| * @param key the partition updated. |
| * @param consistencyForPaxos the serial consistency of the operation (either {@link ConsistencyLevel#SERIAL} or |
| * {@link ConsistencyLevel#LOCAL_SERIAL}). |
| * @param consistencyForReplayCommits the consistency for the commit phase of "replayed" in-progress operations. |
| * @param consistencyForCommit the consistency for the commit phase of _this_ operation update. |
| * @param queryStartNanoTime the nano time for the start of the query this is part of. This is the base time for |
| * timeouts. |
| * @param casMetrics the metrics to update for this operation. |
| * @param createUpdateProposal method called after a successful 'prepare' phase to obtain 1) the actual update of |
| * this operation and 2) the result that the whole method should return. This can return {@code null} in the |
| * special where, after having "prepared" (and thus potentially replayed in-progress upgdates), we don't want |
| * to propose anything (the whole method then return {@code null}). |
| * @return the second element of the pair returned by {@code createUpdateProposal} (for the last call of that method |
| * if that method is called multiple times due to retries). |
| */ |
| private static RowIterator doPaxos(TableMetadata metadata, |
| DecoratedKey key, |
| ConsistencyLevel consistencyForPaxos, |
| ConsistencyLevel consistencyForReplayCommits, |
| ConsistencyLevel consistencyForCommit, |
| long queryStartNanoTime, |
| CASClientRequestMetrics casMetrics, |
| Function<Ballot, Pair<PartitionUpdate, RowIterator>> createUpdateProposal) |
| throws UnavailableException, IsBootstrappingException, RequestFailureException, RequestTimeoutException, InvalidRequestException |
| { |
| int contentions = 0; |
| Keyspace keyspace = Keyspace.open(metadata.keyspace); |
| AbstractReplicationStrategy latestRs = keyspace.getReplicationStrategy(); |
| try |
| { |
| consistencyForPaxos.validateForCas(); |
| consistencyForReplayCommits.validateForCasCommit(latestRs); |
| consistencyForCommit.validateForCasCommit(latestRs); |
| |
| long timeoutNanos = DatabaseDescriptor.getCasContentionTimeout(NANOSECONDS); |
| while (nanoTime() - queryStartNanoTime < timeoutNanos) |
| { |
| // for simplicity, we'll do a single liveness check at the start of each attempt |
| ReplicaPlan.ForPaxosWrite replicaPlan = ReplicaPlans.forPaxos(keyspace, key, consistencyForPaxos); |
| latestRs = replicaPlan.replicationStrategy(); |
| PaxosBallotAndContention pair = beginAndRepairPaxos(queryStartNanoTime, |
| key, |
| metadata, |
| replicaPlan, |
| consistencyForPaxos, |
| consistencyForReplayCommits, |
| casMetrics); |
| |
| final Ballot ballot = pair.ballot; |
| contentions += pair.contentions; |
| |
| Pair<PartitionUpdate, RowIterator> proposalPair = createUpdateProposal.apply(ballot); |
| // See method javadoc: null here is code for "stop here and return null". |
| if (proposalPair == null) |
| return null; |
| |
| Commit proposal = Commit.newProposal(ballot, proposalPair.left); |
| Tracing.trace("CAS precondition is met; proposing client-requested updates for {}", ballot); |
| if (proposePaxos(proposal, replicaPlan, true, queryStartNanoTime)) |
| { |
| // We skip committing accepted updates when they are empty. This is an optimization which works |
| // because we also skip replaying those same empty update in beginAndRepairPaxos (see the longer |
| // comment there). As empty update are somewhat common (serial reads and non-applying CAS propose |
| // them), this is worth bothering. |
| if (!proposal.update.isEmpty()) |
| commitPaxos(proposal, consistencyForCommit, true, queryStartNanoTime); |
| RowIterator result = proposalPair.right; |
| if (result != null) |
| Tracing.trace("CAS did not apply"); |
| else |
| Tracing.trace("CAS applied successfully"); |
| return result; |
| } |
| |
| Tracing.trace("Paxos proposal not accepted (pre-empted by a higher ballot)"); |
| contentions++; |
| |
| Uninterruptibles.sleepUninterruptibly(ThreadLocalRandom.current().nextInt(100), TimeUnit.MILLISECONDS); |
| // continue to retry |
| } |
| } |
| catch (CasWriteTimeoutException e) |
| { |
| // Might be thrown by beginRepairAndPaxos. In that case, any contention that happened within the method and |
| // led up to the timeout was not accounted in our local 'contentions' variable and we add it now so it the |
| // contention recorded in the finally is correct. |
| contentions += e.contentions; |
| throw e; |
| } |
| catch (WriteTimeoutException e) |
| { |
| // Might be thrown by proposePaxos or commitPaxos |
| throw new CasWriteTimeoutException(e.writeType, e.consistency, e.received, e.blockFor, contentions); |
| } |
| finally |
| { |
| recordCasContention(metadata, key, casMetrics, contentions); |
| } |
| |
| throw new CasWriteTimeoutException(WriteType.CAS, consistencyForPaxos, 0, consistencyForPaxos.blockFor(latestRs), contentions); |
| } |
| |
| /** |
| * begin a Paxos session by sending a prepare request and completing any in-progress requests seen in the replies |
| * |
| * @return the Paxos ballot promised by the replicas if no in-progress requests were seen and a quorum of |
| * nodes have seen the mostRecentCommit. Otherwise, return null. |
| */ |
| private static PaxosBallotAndContention beginAndRepairPaxos(long queryStartNanoTime, |
| DecoratedKey key, |
| TableMetadata metadata, |
| ReplicaPlan.ForPaxosWrite paxosPlan, |
| ConsistencyLevel consistencyForPaxos, |
| ConsistencyLevel consistencyForCommit, |
| CASClientRequestMetrics casMetrics) |
| throws WriteTimeoutException, WriteFailureException |
| { |
| long timeoutNanos = DatabaseDescriptor.getCasContentionTimeout(NANOSECONDS); |
| |
| PrepareCallback summary = null; |
| int contentions = 0; |
| while (nanoTime() - queryStartNanoTime < timeoutNanos) |
| { |
| // We want a timestamp that is guaranteed to be unique for that node (so that the ballot is globally unique), but if we've got a prepare rejected |
| // already we also want to make sure we pick a timestamp that has a chance to be promised, i.e. one that is greater that the most recently known |
| // in progress (#5667). Lastly, we don't want to use a timestamp that is older than the last one assigned by ClientState or operations may appear |
| // out-of-order (#7801). |
| long minTimestampMicrosToUse = summary == null ? Long.MIN_VALUE : 1 + summary.mostRecentInProgressCommit.ballot.unixMicros(); |
| // Note that ballotMicros is not guaranteed to be unique if two proposal are being handled concurrently by the same coordinator. But we still |
| // need ballots to be unique for each proposal so we have to use getRandomTimeUUIDFromMicros. |
| Ballot ballot = nextBallot(minTimestampMicrosToUse, consistencyForPaxos == SERIAL ? GLOBAL : LOCAL); |
| |
| // prepare |
| try |
| { |
| Tracing.trace("Preparing {}", ballot); |
| Commit toPrepare = Commit.newPrepare(key, metadata, ballot); |
| summary = preparePaxos(toPrepare, paxosPlan, queryStartNanoTime); |
| if (!summary.promised) |
| { |
| Tracing.trace("Some replicas have already promised a higher ballot than ours; aborting"); |
| contentions++; |
| // sleep a random amount to give the other proposer a chance to finish |
| Uninterruptibles.sleepUninterruptibly(ThreadLocalRandom.current().nextInt(100), MILLISECONDS); |
| continue; |
| } |
| |
| Commit inProgress = summary.mostRecentInProgressCommit; |
| Commit mostRecent = summary.mostRecentCommit; |
| |
| // If we have an in-progress ballot greater than the MRC we know, then it's an in-progress round that |
| // needs to be completed, so do it. |
| // One special case we make is for update that are empty (which are proposed by serial reads and |
| // non-applying CAS). While we could handle those as any other updates, we can optimize this somewhat by |
| // neither committing those empty updates, nor replaying in-progress ones. The reasoning is this: as the |
| // update is empty, we have nothing to apply to storage in the commit phase, so the only reason to commit |
| // would be to update the MRC. However, if we skip replaying those empty updates, then we don't need to |
| // update the MRC for following updates to make progress (that is, if we didn't had the empty update skip |
| // below _but_ skipped updating the MRC on empty updates, then we'd be stuck always proposing that same |
| // empty update). And the reason skipping that replay is safe is that when an operation tries to propose |
| // an empty value, there can be only 2 cases: |
| // 1) the propose succeed, meaning a quorum of nodes accept it, in which case we are guaranteed no earlier |
| // pending operation can ever be replayed (which is what we want to guarantee with the empty update). |
| // 2) the propose does not succeed. But then the operation proposing the empty update will not succeed |
| // either (it will retry or ultimately timeout), and we're actually ok if earlier pending operation gets |
| // replayed in that case. |
| // Tl;dr, it is safe to skip committing empty updates _as long as_ we also skip replying them below. And |
| // doing is more efficient, so we do so. |
| if (!inProgress.update.isEmpty() && inProgress.isAfter(mostRecent)) |
| { |
| Tracing.trace("Finishing incomplete paxos round {}", inProgress); |
| casMetrics.unfinishedCommit.inc(); |
| Commit refreshedInProgress = Commit.newProposal(ballot, inProgress.update); |
| if (proposePaxos(refreshedInProgress, paxosPlan, false, queryStartNanoTime)) |
| { |
| commitPaxos(refreshedInProgress, consistencyForCommit, false, queryStartNanoTime); |
| } |
| else |
| { |
| Tracing.trace("Some replicas have already promised a higher ballot than ours; aborting"); |
| // sleep a random amount to give the other proposer a chance to finish |
| contentions++; |
| Uninterruptibles.sleepUninterruptibly(ThreadLocalRandom.current().nextInt(100), MILLISECONDS); |
| } |
| continue; |
| } |
| |
| // To be able to propose our value on a new round, we need a quorum of replica to have learn the previous one. Why is explained at: |
| // https://issues.apache.org/jira/browse/CASSANDRA-5062?focusedCommentId=13619810&page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel#comment-13619810) |
| // Since we waited for quorum nodes, if some of them haven't seen the last commit (which may just be a timing issue, but may also |
| // mean we lost messages), we pro-actively "repair" those nodes, and retry. |
| Iterable<InetAddressAndPort> missingMRC = summary.replicasMissingMostRecentCommit(); |
| if (Iterables.size(missingMRC) > 0) |
| { |
| Tracing.trace("Repairing replicas that missed the most recent commit"); |
| sendCommit(mostRecent, missingMRC); |
| // TODO: provided commits don't invalid the prepare we just did above (which they don't), we could just wait |
| // for all the missingMRC to acknowledge this commit and then move on with proposing our value. But that means |
| // adding the ability to have commitPaxos block, which is exactly CASSANDRA-5442 will do. So once we have that |
| // latter ticket, we can pass CL.ALL to the commit above and remove the 'continue'. |
| continue; |
| } |
| |
| return new PaxosBallotAndContention(ballot, contentions); |
| } |
| catch (WriteTimeoutException e) |
| { |
| // We're still doing preparation for the paxos rounds, so we want to use the CAS (see CASSANDRA-8672) |
| throw new CasWriteTimeoutException(WriteType.CAS, e.consistency, e.received, e.blockFor, contentions); |
| } |
| } |
| |
| throw new CasWriteTimeoutException(WriteType.CAS, consistencyForPaxos, 0, consistencyForPaxos.blockFor(paxosPlan.replicationStrategy()), contentions); |
| } |
| |
| /** |
| * Unlike commitPaxos, this does not wait for replies |
| */ |
| private static void sendCommit(Commit commit, Iterable<InetAddressAndPort> replicas) |
| { |
| Message<Commit> message = Message.out(PAXOS_COMMIT_REQ, commit); |
| for (InetAddressAndPort target : replicas) |
| MessagingService.instance().send(message, target); |
| } |
| |
| private static PrepareCallback preparePaxos(Commit toPrepare, ReplicaPlan.ForPaxosWrite replicaPlan, long queryStartNanoTime) |
| throws WriteTimeoutException |
| { |
| PrepareCallback callback = new PrepareCallback(toPrepare.update.partitionKey(), toPrepare.update.metadata(), replicaPlan.requiredParticipants(), replicaPlan.consistencyLevel(), queryStartNanoTime); |
| Message<Commit> message = Message.out(PAXOS_PREPARE_REQ, toPrepare); |
| |
| boolean hasLocalRequest = false; |
| |
| for (Replica replica: replicaPlan.contacts()) |
| { |
| if (replica.isSelf()) |
| { |
| hasLocalRequest = true; |
| PAXOS_PREPARE_REQ.stage.execute(() -> { |
| try |
| { |
| callback.onResponse(message.responseWith(doPrepare(toPrepare))); |
| } |
| catch (Exception ex) |
| { |
| logger.error("Failed paxos prepare locally", ex); |
| } |
| }); |
| } |
| else |
| { |
| MessagingService.instance().sendWithCallback(message, replica.endpoint(), callback); |
| } |
| } |
| |
| if (hasLocalRequest) |
| writeMetrics.localRequests.mark(); |
| else |
| writeMetrics.remoteRequests.mark(); |
| |
| callback.await(); |
| return callback; |
| } |
| |
| /** |
| * Propose the {@param proposal} accoding to the {@param replicaPlan}. |
| * When {@param backoffIfPartial} is true, the proposer backs off when seeing the proposal being accepted by some but not a quorum. |
| * The result of the cooresponding CAS in uncertain as the accepted proposal may or may not be spread to other nodes in later rounds. |
| */ |
| private static boolean proposePaxos(Commit proposal, ReplicaPlan.ForPaxosWrite replicaPlan, boolean backoffIfPartial, long queryStartNanoTime) |
| throws WriteTimeoutException, CasWriteUnknownResultException |
| { |
| ProposeCallback callback = new ProposeCallback(replicaPlan.contacts().size(), replicaPlan.requiredParticipants(), !backoffIfPartial, replicaPlan.consistencyLevel(), queryStartNanoTime); |
| Message<Commit> message = Message.out(PAXOS_PROPOSE_REQ, proposal); |
| for (Replica replica : replicaPlan.contacts()) |
| { |
| if (replica.isSelf()) |
| { |
| PAXOS_PROPOSE_REQ.stage.execute(() -> { |
| try |
| { |
| Message<Boolean> response = message.responseWith(doPropose(proposal)); |
| callback.onResponse(response); |
| } |
| catch (Exception ex) |
| { |
| logger.error("Failed paxos propose locally", ex); |
| } |
| }); |
| } |
| else |
| { |
| MessagingService.instance().sendWithCallback(message, replica.endpoint(), callback); |
| } |
| } |
| callback.await(); |
| |
| if (callback.isSuccessful()) |
| return true; |
| |
| if (backoffIfPartial && !callback.isFullyRefused()) |
| throw new CasWriteUnknownResultException(replicaPlan.consistencyLevel(), callback.getAcceptCount(), replicaPlan.requiredParticipants()); |
| |
| return false; |
| } |
| |
| private static void commitPaxos(Commit proposal, ConsistencyLevel consistencyLevel, boolean allowHints, long queryStartNanoTime) throws WriteTimeoutException |
| { |
| boolean shouldBlock = consistencyLevel != ConsistencyLevel.ANY; |
| Keyspace keyspace = Keyspace.open(proposal.update.metadata().keyspace); |
| |
| Token tk = proposal.update.partitionKey().getToken(); |
| |
| AbstractWriteResponseHandler<Commit> responseHandler = null; |
| // NOTE: this ReplicaPlan is a lie, this usage of ReplicaPlan could do with being clarified - the selected() collection is essentially (I think) never used |
| ReplicaPlan.ForWrite replicaPlan = ReplicaPlans.forWrite(keyspace, consistencyLevel, tk, ReplicaPlans.writeAll); |
| if (shouldBlock) |
| { |
| AbstractReplicationStrategy rs = replicaPlan.replicationStrategy(); |
| responseHandler = rs.getWriteResponseHandler(replicaPlan, null, WriteType.SIMPLE, proposal::makeMutation, queryStartNanoTime); |
| } |
| |
| Message<Commit> message = Message.outWithFlag(PAXOS_COMMIT_REQ, proposal, MessageFlag.CALL_BACK_ON_FAILURE); |
| for (Replica replica : replicaPlan.liveAndDown()) |
| { |
| InetAddressAndPort destination = replica.endpoint(); |
| checkHintOverload(replica); |
| |
| if (replicaPlan.isAlive(replica)) |
| { |
| if (shouldBlock) |
| { |
| if (replica.isSelf()) |
| commitPaxosLocal(replica, message, responseHandler); |
| else |
| MessagingService.instance().sendWriteWithCallback(message, replica, responseHandler); |
| } |
| else |
| { |
| MessagingService.instance().send(message, destination); |
| } |
| } |
| else |
| { |
| if (responseHandler != null) |
| { |
| responseHandler.expired(); |
| } |
| if (allowHints && shouldHint(replica)) |
| { |
| submitHint(proposal.makeMutation(), replica, null); |
| } |
| } |
| } |
| |
| if (shouldBlock) |
| responseHandler.get(); |
| } |
| |
| /** |
| * Commit a PAXOS task locally, and if the task times out rather then submitting a real hint |
| * submit a fake one that executes immediately on the mutation stage, but generates the necessary backpressure |
| * signal for hints |
| */ |
| private static void commitPaxosLocal(Replica localReplica, final Message<Commit> message, final AbstractWriteResponseHandler<?> responseHandler) |
| { |
| PAXOS_COMMIT_REQ.stage.maybeExecuteImmediately(new LocalMutationRunnable(localReplica) |
| { |
| public void runMayThrow() |
| { |
| try |
| { |
| PaxosState.commitDirect(message.payload); |
| if (responseHandler != null) |
| responseHandler.onResponse(null); |
| } |
| catch (Exception ex) |
| { |
| if (!(ex instanceof WriteTimeoutException)) |
| logger.error("Failed to apply paxos commit locally : ", ex); |
| responseHandler.onFailure(FBUtilities.getBroadcastAddressAndPort(), RequestFailureReason.forException(ex)); |
| } |
| } |
| |
| @Override |
| protected Verb verb() |
| { |
| return PAXOS_COMMIT_REQ; |
| } |
| }); |
| } |
| |
| /** |
| * Use this method to have these Mutations applied |
| * across all replicas. This method will take care |
| * of the possibility of a replica being down and hint |
| * the data across to some other replica. |
| * |
| * @param mutations the mutations to be applied across the replicas |
| * @param consistencyLevel the consistency level for the operation |
| * @param queryStartNanoTime the value of nanoTime() when the query started to be processed |
| */ |
| public static void mutate(List<? extends IMutation> mutations, ConsistencyLevel consistencyLevel, long queryStartNanoTime) |
| throws UnavailableException, OverloadedException, WriteTimeoutException, WriteFailureException |
| { |
| Tracing.trace("Determining replicas for mutation"); |
| final String localDataCenter = DatabaseDescriptor.getEndpointSnitch().getLocalDatacenter(); |
| |
| long startTime = nanoTime(); |
| |
| List<AbstractWriteResponseHandler<IMutation>> responseHandlers = new ArrayList<>(mutations.size()); |
| WriteType plainWriteType = mutations.size() <= 1 ? WriteType.SIMPLE : WriteType.UNLOGGED_BATCH; |
| |
| try |
| { |
| for (IMutation mutation : mutations) |
| { |
| if (mutation instanceof CounterMutation) |
| responseHandlers.add(mutateCounter((CounterMutation)mutation, localDataCenter, queryStartNanoTime)); |
| else |
| responseHandlers.add(performWrite(mutation, consistencyLevel, localDataCenter, standardWritePerformer, null, plainWriteType, queryStartNanoTime)); |
| } |
| |
| // upgrade to full quorum any failed cheap quorums |
| for (int i = 0 ; i < mutations.size() ; ++i) |
| { |
| if (!(mutations.get(i) instanceof CounterMutation)) // at the moment, only non-counter writes support cheap quorums |
| responseHandlers.get(i).maybeTryAdditionalReplicas(mutations.get(i), standardWritePerformer, localDataCenter); |
| } |
| |
| // wait for writes. throws TimeoutException if necessary |
| for (AbstractWriteResponseHandler<IMutation> responseHandler : responseHandlers) |
| responseHandler.get(); |
| } |
| catch (WriteTimeoutException|WriteFailureException ex) |
| { |
| if (consistencyLevel == ConsistencyLevel.ANY) |
| { |
| hintMutations(mutations); |
| } |
| else |
| { |
| if (ex instanceof WriteFailureException) |
| { |
| writeMetrics.failures.mark(); |
| writeMetricsForLevel(consistencyLevel).failures.mark(); |
| WriteFailureException fe = (WriteFailureException)ex; |
| Tracing.trace("Write failure; received {} of {} required replies, failed {} requests", |
| fe.received, fe.blockFor, fe.failureReasonByEndpoint.size()); |
| } |
| else |
| { |
| writeMetrics.timeouts.mark(); |
| writeMetricsForLevel(consistencyLevel).timeouts.mark(); |
| WriteTimeoutException te = (WriteTimeoutException)ex; |
| Tracing.trace("Write timeout; received {} of {} required replies", te.received, te.blockFor); |
| } |
| throw ex; |
| } |
| } |
| catch (UnavailableException e) |
| { |
| writeMetrics.unavailables.mark(); |
| writeMetricsForLevel(consistencyLevel).unavailables.mark(); |
| Tracing.trace("Unavailable"); |
| throw e; |
| } |
| catch (OverloadedException e) |
| { |
| writeMetrics.unavailables.mark(); |
| writeMetricsForLevel(consistencyLevel).unavailables.mark(); |
| Tracing.trace("Overloaded"); |
| throw e; |
| } |
| finally |
| { |
| long latency = nanoTime() - startTime; |
| writeMetrics.addNano(latency); |
| writeMetricsForLevel(consistencyLevel).addNano(latency); |
| updateCoordinatorWriteLatencyTableMetric(mutations, latency); |
| } |
| } |
| |
| /** |
| * Hint all the mutations (except counters, which can't be safely retried). This means |
| * we'll re-hint any successful ones; doesn't seem worth it to track individual success |
| * just for this unusual case. |
| * |
| * Only used for CL.ANY |
| * |
| * @param mutations the mutations that require hints |
| */ |
| private static void hintMutations(Collection<? extends IMutation> mutations) |
| { |
| for (IMutation mutation : mutations) |
| if (!(mutation instanceof CounterMutation)) |
| hintMutation((Mutation) mutation); |
| |
| Tracing.trace("Wrote hints to satisfy CL.ANY after no replicas acknowledged the write"); |
| } |
| |
| private static void hintMutation(Mutation mutation) |
| { |
| String keyspaceName = mutation.getKeyspaceName(); |
| Token token = mutation.key().getToken(); |
| |
| // local writes can timeout, but cannot be dropped (see LocalMutationRunnable and CASSANDRA-6510), |
| // so there is no need to hint or retry. |
| EndpointsForToken replicasToHint = ReplicaLayout.forTokenWriteLiveAndDown(Keyspace.open(keyspaceName), token) |
| .all() |
| .filter(StorageProxy::shouldHint); |
| |
| submitHint(mutation, replicasToHint, null); |
| } |
| |
| public boolean appliesLocally(Mutation mutation) |
| { |
| String keyspaceName = mutation.getKeyspaceName(); |
| Token token = mutation.key().getToken(); |
| InetAddressAndPort local = FBUtilities.getBroadcastAddressAndPort(); |
| |
| return ReplicaLayout.forTokenWriteLiveAndDown(Keyspace.open(keyspaceName), token) |
| .all().endpoints().contains(local); |
| } |
| |
| /** |
| * Use this method to have these Mutations applied |
| * across all replicas. |
| * |
| * @param mutations the mutations to be applied across the replicas |
| * @param writeCommitLog if commitlog should be written |
| * @param baseComplete time from epoch in ms that the local base mutation was(or will be) completed |
| * @param queryStartNanoTime the value of nanoTime() when the query started to be processed |
| */ |
| public static void mutateMV(ByteBuffer dataKey, Collection<Mutation> mutations, boolean writeCommitLog, AtomicLong baseComplete, long queryStartNanoTime) |
| throws UnavailableException, OverloadedException, WriteTimeoutException |
| { |
| Tracing.trace("Determining replicas for mutation"); |
| final String localDataCenter = DatabaseDescriptor.getEndpointSnitch().getLocalDatacenter(); |
| |
| long startTime = nanoTime(); |
| |
| |
| try |
| { |
| // if we haven't joined the ring, write everything to batchlog because paired replicas may be stale |
| final TimeUUID batchUUID = nextTimeUUID(); |
| |
| if (StorageService.instance.isStarting() || StorageService.instance.isJoining() || StorageService.instance.isMoving()) |
| { |
| BatchlogManager.store(Batch.createLocal(batchUUID, FBUtilities.timestampMicros(), |
| mutations), writeCommitLog); |
| } |
| else |
| { |
| List<WriteResponseHandlerWrapper> wrappers = new ArrayList<>(mutations.size()); |
| //non-local mutations rely on the base mutation commit-log entry for eventual consistency |
| Set<Mutation> nonLocalMutations = new HashSet<>(mutations); |
| Token baseToken = StorageService.instance.getTokenMetadata().partitioner.getToken(dataKey); |
| |
| ConsistencyLevel consistencyLevel = ConsistencyLevel.ONE; |
| |
| //Since the base -> view replication is 1:1 we only need to store the BL locally |
| ReplicaPlan.ForWrite replicaPlan = ReplicaPlans.forLocalBatchlogWrite(); |
| BatchlogCleanup cleanup = new BatchlogCleanup(mutations.size(), |
| () -> asyncRemoveFromBatchlog(replicaPlan, batchUUID)); |
| |
| // add a handler for each mutation - includes checking availability, but doesn't initiate any writes, yet |
| for (Mutation mutation : mutations) |
| { |
| String keyspaceName = mutation.getKeyspaceName(); |
| Token tk = mutation.key().getToken(); |
| AbstractReplicationStrategy replicationStrategy = Keyspace.open(keyspaceName).getReplicationStrategy(); |
| Optional<Replica> pairedEndpoint = ViewUtils.getViewNaturalEndpoint(replicationStrategy, baseToken, tk); |
| EndpointsForToken pendingReplicas = StorageService.instance.getTokenMetadata().pendingEndpointsForToken(tk, keyspaceName); |
| |
| // if there are no paired endpoints there are probably range movements going on, so we write to the local batchlog to replay later |
| if (!pairedEndpoint.isPresent()) |
| { |
| if (pendingReplicas.isEmpty()) |
| logger.warn("Received base materialized view mutation for key {} that does not belong " + |
| "to this node. There is probably a range movement happening (move or decommission)," + |
| "but this node hasn't updated its ring metadata yet. Adding mutation to " + |
| "local batchlog to be replayed later.", |
| mutation.key()); |
| continue; |
| } |
| |
| // When local node is the endpoint we can just apply the mutation locally, |
| // unless there are pending endpoints, in which case we want to do an ordinary |
| // write so the view mutation is sent to the pending endpoint |
| if (pairedEndpoint.get().isSelf() && StorageService.instance.isJoined() |
| && pendingReplicas.isEmpty()) |
| { |
| try |
| { |
| mutation.apply(writeCommitLog); |
| nonLocalMutations.remove(mutation); |
| // won't trigger cleanup |
| cleanup.decrement(); |
| } |
| catch (Exception exc) |
| { |
| logger.error("Error applying local view update: Mutation (keyspace {}, tables {}, partition key {})", |
| mutation.getKeyspaceName(), mutation.getTableIds(), mutation.key()); |
| throw exc; |
| } |
| } |
| else |
| { |
| ReplicaLayout.ForTokenWrite liveAndDown = ReplicaLayout.forTokenWrite(replicationStrategy, |
| EndpointsForToken.of(tk, pairedEndpoint.get()), |
| pendingReplicas); |
| wrappers.add(wrapViewBatchResponseHandler(mutation, |
| consistencyLevel, |
| consistencyLevel, |
| liveAndDown, |
| baseComplete, |
| WriteType.BATCH, |
| cleanup, |
| queryStartNanoTime)); |
| } |
| } |
| |
| // Apply to local batchlog memtable in this thread |
| if (!nonLocalMutations.isEmpty()) |
| BatchlogManager.store(Batch.createLocal(batchUUID, FBUtilities.timestampMicros(), nonLocalMutations), writeCommitLog); |
| |
| // Perform remote writes |
| if (!wrappers.isEmpty()) |
| asyncWriteBatchedMutations(wrappers, localDataCenter, Stage.VIEW_MUTATION); |
| } |
| } |
| finally |
| { |
| viewWriteMetrics.addNano(nanoTime() - startTime); |
| } |
| } |
| |
| @SuppressWarnings("unchecked") |
| public static void mutateWithTriggers(List<? extends IMutation> mutations, |
| ConsistencyLevel consistencyLevel, |
| boolean mutateAtomically, |
| long queryStartNanoTime) |
| throws WriteTimeoutException, WriteFailureException, UnavailableException, OverloadedException, InvalidRequestException |
| { |
| if (DatabaseDescriptor.getPartitionDenylistEnabled() && DatabaseDescriptor.getDenylistWritesEnabled()) |
| { |
| for (final IMutation mutation : mutations) |
| { |
| for (final TableId tid : mutation.getTableIds()) |
| { |
| if (!partitionDenylist.isKeyPermitted(tid, mutation.key().getKey())) |
| { |
| denylistMetrics.incrementWritesRejected(); |
| // While Schema.instance.getTableMetadata() can return a null value, in this case the isKeyPermitted |
| // call above ensures that we cannot have a null associated tid at this point. |
| final TableMetadata tmd = Schema.instance.getTableMetadata(tid); |
| throw new InvalidRequestException(String.format("Unable to write to denylisted partition [0x%s] in %s/%s", |
| mutation.key().toString(), tmd.keyspace, tmd.name)); |
| } |
| } |
| } |
| } |
| |
| Collection<Mutation> augmented = TriggerExecutor.instance.execute(mutations); |
| |
| boolean updatesView = Keyspace.open(mutations.iterator().next().getKeyspaceName()) |
| .viewManager |
| .updatesAffectView(mutations, true); |
| |
| long size = IMutation.dataSize(mutations); |
| writeMetrics.mutationSize.update(size); |
| writeMetricsForLevel(consistencyLevel).mutationSize.update(size); |
| |
| if (augmented != null) |
| mutateAtomically(augmented, consistencyLevel, updatesView, queryStartNanoTime); |
| else |
| { |
| if (mutateAtomically || updatesView) |
| mutateAtomically((Collection<Mutation>) mutations, consistencyLevel, updatesView, queryStartNanoTime); |
| else |
| mutate(mutations, consistencyLevel, queryStartNanoTime); |
| } |
| } |
| |
| /** |
| * See mutate. Adds additional steps before and after writing a batch. |
| * Before writing the batch (but after doing availability check against the FD for the row replicas): |
| * write the entire batch to a batchlog elsewhere in the cluster. |
| * After: remove the batchlog entry (after writing hints for the batch rows, if necessary). |
| * |
| * @param mutations the Mutations to be applied across the replicas |
| * @param consistency_level the consistency level for the operation |
| * @param requireQuorumForRemove at least a quorum of nodes will see update before deleting batchlog |
| * @param queryStartNanoTime the value of nanoTime() when the query started to be processed |
| */ |
| public static void mutateAtomically(Collection<Mutation> mutations, |
| ConsistencyLevel consistency_level, |
| boolean requireQuorumForRemove, |
| long queryStartNanoTime) |
| throws UnavailableException, OverloadedException, WriteTimeoutException |
| { |
| Tracing.trace("Determining replicas for atomic batch"); |
| long startTime = nanoTime(); |
| |
| List<WriteResponseHandlerWrapper> wrappers = new ArrayList<>(mutations.size()); |
| |
| if (mutations.stream().anyMatch(mutation -> Keyspace.open(mutation.getKeyspaceName()).getReplicationStrategy().hasTransientReplicas())) |
| throw new AssertionError("Logged batches are unsupported with transient replication"); |
| |
| try |
| { |
| |
| // If we are requiring quorum nodes for removal, we upgrade consistency level to QUORUM unless we already |
| // require ALL, or EACH_QUORUM. This is so that *at least* QUORUM nodes see the update. |
| ConsistencyLevel batchConsistencyLevel = requireQuorumForRemove |
| ? ConsistencyLevel.QUORUM |
| : consistency_level; |
| |
| switch (consistency_level) |
| { |
| case ALL: |
| case EACH_QUORUM: |
| batchConsistencyLevel = consistency_level; |
| } |
| |
| ReplicaPlan.ForWrite replicaPlan = ReplicaPlans.forBatchlogWrite(batchConsistencyLevel == ConsistencyLevel.ANY); |
| |
| final TimeUUID batchUUID = nextTimeUUID(); |
| BatchlogCleanup cleanup = new BatchlogCleanup(mutations.size(), |
| () -> asyncRemoveFromBatchlog(replicaPlan, batchUUID)); |
| |
| // add a handler for each mutation - includes checking availability, but doesn't initiate any writes, yet |
| for (Mutation mutation : mutations) |
| { |
| WriteResponseHandlerWrapper wrapper = wrapBatchResponseHandler(mutation, |
| consistency_level, |
| batchConsistencyLevel, |
| WriteType.BATCH, |
| cleanup, |
| queryStartNanoTime); |
| // exit early if we can't fulfill the CL at this time. |
| wrappers.add(wrapper); |
| } |
| |
| // write to the batchlog |
| syncWriteToBatchlog(mutations, replicaPlan, batchUUID, queryStartNanoTime); |
| |
| // now actually perform the writes and wait for them to complete |
| syncWriteBatchedMutations(wrappers, Stage.MUTATION); |
| } |
| catch (UnavailableException e) |
| { |
| writeMetrics.unavailables.mark(); |
| writeMetricsForLevel(consistency_level).unavailables.mark(); |
| Tracing.trace("Unavailable"); |
| throw e; |
| } |
| catch (WriteTimeoutException e) |
| { |
| writeMetrics.timeouts.mark(); |
| writeMetricsForLevel(consistency_level).timeouts.mark(); |
| Tracing.trace("Write timeout; received {} of {} required replies", e.received, e.blockFor); |
| throw e; |
| } |
| catch (WriteFailureException e) |
| { |
| writeMetrics.failures.mark(); |
| writeMetricsForLevel(consistency_level).failures.mark(); |
| Tracing.trace("Write failure; received {} of {} required replies", e.received, e.blockFor); |
| throw e; |
| } |
| finally |
| { |
| long latency = nanoTime() - startTime; |
| writeMetrics.addNano(latency); |
| writeMetricsForLevel(consistency_level).addNano(latency); |
| updateCoordinatorWriteLatencyTableMetric(mutations, latency); |
| } |
| } |
| |
| private static void updateCoordinatorWriteLatencyTableMetric(Collection<? extends IMutation> mutations, long latency) |
| { |
| if (null == mutations) |
| { |
| return; |
| } |
| |
| try |
| { |
| //We could potentially pass a callback into performWrite. And add callback provision for mutateCounter or mutateAtomically (sendToHintedEndPoints) |
| //However, Trade off between write metric per CF accuracy vs performance hit due to callbacks. Similar issue exists with CoordinatorReadLatency metric. |
| mutations.stream() |
| .flatMap(m -> m.getTableIds().stream().map(tableId -> Keyspace.open(m.getKeyspaceName()).getColumnFamilyStore(tableId))) |
| .distinct() |
| .forEach(store -> store.metric.coordinatorWriteLatency.update(latency, TimeUnit.NANOSECONDS)); |
| } |
| catch (Exception ex) |
| { |
| logger.warn("Exception occurred updating coordinatorWriteLatency metric", ex); |
| } |
| } |
| |
| private static void syncWriteToBatchlog(Collection<Mutation> mutations, ReplicaPlan.ForWrite replicaPlan, TimeUUID uuid, long queryStartNanoTime) |
| throws WriteTimeoutException, WriteFailureException |
| { |
| WriteResponseHandler<?> handler = new WriteResponseHandler(replicaPlan, |
| WriteType.BATCH_LOG, |
| null, |
| queryStartNanoTime); |
| |
| Batch batch = Batch.createLocal(uuid, FBUtilities.timestampMicros(), mutations); |
| Message<Batch> message = Message.out(BATCH_STORE_REQ, batch); |
| for (Replica replica : replicaPlan.liveAndDown()) |
| { |
| logger.trace("Sending batchlog store request {} to {} for {} mutations", batch.id, replica, batch.size()); |
| |
| if (replica.isSelf()) |
| performLocally(Stage.MUTATION, replica, () -> BatchlogManager.store(batch), handler); |
| else |
| MessagingService.instance().sendWithCallback(message, replica.endpoint(), handler); |
| } |
| handler.get(); |
| } |
| |
| private static void asyncRemoveFromBatchlog(ReplicaPlan.ForWrite replicaPlan, TimeUUID uuid) |
| { |
| Message<TimeUUID> message = Message.out(Verb.BATCH_REMOVE_REQ, uuid); |
| for (Replica target : replicaPlan.contacts()) |
| { |
| if (logger.isTraceEnabled()) |
| logger.trace("Sending batchlog remove request {} to {}", uuid, target); |
| |
| if (target.isSelf()) |
| performLocally(Stage.MUTATION, target, () -> BatchlogManager.remove(uuid)); |
| else |
| MessagingService.instance().send(message, target.endpoint()); |
| } |
| } |
| |
| private static void asyncWriteBatchedMutations(List<WriteResponseHandlerWrapper> wrappers, String localDataCenter, Stage stage) |
| { |
| for (WriteResponseHandlerWrapper wrapper : wrappers) |
| { |
| Replicas.temporaryAssertFull(wrapper.handler.replicaPlan.liveAndDown()); // TODO: CASSANDRA-14549 |
| ReplicaPlan.ForWrite replicas = wrapper.handler.replicaPlan.withContacts(wrapper.handler.replicaPlan.liveAndDown()); |
| |
| try |
| { |
| sendToHintedReplicas(wrapper.mutation, replicas, wrapper.handler, localDataCenter, stage); |
| } |
| catch (OverloadedException | WriteTimeoutException e) |
| { |
| wrapper.handler.onFailure(FBUtilities.getBroadcastAddressAndPort(), RequestFailureReason.forException(e)); |
| } |
| } |
| } |
| |
| private static void syncWriteBatchedMutations(List<WriteResponseHandlerWrapper> wrappers, Stage stage) |
| throws WriteTimeoutException, OverloadedException |
| { |
| String localDataCenter = DatabaseDescriptor.getEndpointSnitch().getLocalDatacenter(); |
| |
| for (WriteResponseHandlerWrapper wrapper : wrappers) |
| { |
| EndpointsForToken sendTo = wrapper.handler.replicaPlan.liveAndDown(); |
| Replicas.temporaryAssertFull(sendTo); // TODO: CASSANDRA-14549 |
| sendToHintedReplicas(wrapper.mutation, wrapper.handler.replicaPlan.withContacts(sendTo), wrapper.handler, localDataCenter, stage); |
| } |
| |
| for (WriteResponseHandlerWrapper wrapper : wrappers) |
| wrapper.handler.get(); |
| } |
| |
| /** |
| * Perform the write of a mutation given a WritePerformer. |
| * Gather the list of write endpoints, apply locally and/or forward the mutation to |
| * said write endpoint (deletaged to the actual WritePerformer) and wait for the |
| * responses based on consistency level. |
| * |
| * @param mutation the mutation to be applied |
| * @param consistencyLevel the consistency level for the write operation |
| * @param performer the WritePerformer in charge of appliying the mutation |
| * given the list of write endpoints (either standardWritePerformer for |
| * standard writes or counterWritePerformer for counter writes). |
| * @param callback an optional callback to be run if and when the write is |
| * @param queryStartNanoTime the value of nanoTime() when the query started to be processed |
| */ |
| public static AbstractWriteResponseHandler<IMutation> performWrite(IMutation mutation, |
| ConsistencyLevel consistencyLevel, |
| String localDataCenter, |
| WritePerformer performer, |
| Runnable callback, |
| WriteType writeType, |
| long queryStartNanoTime) |
| { |
| String keyspaceName = mutation.getKeyspaceName(); |
| Keyspace keyspace = Keyspace.open(keyspaceName); |
| Token tk = mutation.key().getToken(); |
| |
| ReplicaPlan.ForWrite replicaPlan = ReplicaPlans.forWrite(keyspace, consistencyLevel, tk, ReplicaPlans.writeNormal); |
| |
| if (replicaPlan.lookup(FBUtilities.getBroadcastAddressAndPort()) != null) |
| writeMetrics.localRequests.mark(); |
| else |
| writeMetrics.remoteRequests.mark(); |
| |
| AbstractReplicationStrategy rs = replicaPlan.replicationStrategy(); |
| AbstractWriteResponseHandler<IMutation> responseHandler = rs.getWriteResponseHandler(replicaPlan, callback, writeType, mutation.hintOnFailure(), queryStartNanoTime); |
| |
| performer.apply(mutation, replicaPlan, responseHandler, localDataCenter); |
| return responseHandler; |
| } |
| |
| // same as performWrites except does not initiate writes (but does perform availability checks). |
| private static WriteResponseHandlerWrapper wrapBatchResponseHandler(Mutation mutation, |
| ConsistencyLevel consistencyLevel, |
| ConsistencyLevel batchConsistencyLevel, |
| WriteType writeType, |
| BatchlogResponseHandler.BatchlogCleanup cleanup, |
| long queryStartNanoTime) |
| { |
| Keyspace keyspace = Keyspace.open(mutation.getKeyspaceName()); |
| Token tk = mutation.key().getToken(); |
| |
| ReplicaPlan.ForWrite replicaPlan = ReplicaPlans.forWrite(keyspace, consistencyLevel, tk, ReplicaPlans.writeNormal); |
| |
| if (replicaPlan.lookup(FBUtilities.getBroadcastAddressAndPort()) != null) |
| writeMetrics.localRequests.mark(); |
| else |
| writeMetrics.remoteRequests.mark(); |
| |
| AbstractReplicationStrategy rs = replicaPlan.replicationStrategy(); |
| AbstractWriteResponseHandler<IMutation> writeHandler = rs.getWriteResponseHandler(replicaPlan, null, writeType, mutation, queryStartNanoTime); |
| BatchlogResponseHandler<IMutation> batchHandler = new BatchlogResponseHandler<>(writeHandler, batchConsistencyLevel.blockFor(rs), cleanup, queryStartNanoTime); |
| return new WriteResponseHandlerWrapper(batchHandler, mutation); |
| } |
| |
| /** |
| * Same as performWrites except does not initiate writes (but does perform availability checks). |
| * Keeps track of ViewWriteMetrics |
| */ |
| private static WriteResponseHandlerWrapper wrapViewBatchResponseHandler(Mutation mutation, |
| ConsistencyLevel consistencyLevel, |
| ConsistencyLevel batchConsistencyLevel, |
| ReplicaLayout.ForTokenWrite liveAndDown, |
| AtomicLong baseComplete, |
| WriteType writeType, |
| BatchlogResponseHandler.BatchlogCleanup cleanup, |
| long queryStartNanoTime) |
| { |
| Keyspace keyspace = Keyspace.open(mutation.getKeyspaceName()); |
| ReplicaPlan.ForWrite replicaPlan = ReplicaPlans.forWrite(keyspace, consistencyLevel, liveAndDown, ReplicaPlans.writeAll); |
| AbstractReplicationStrategy replicationStrategy = replicaPlan.replicationStrategy(); |
| AbstractWriteResponseHandler<IMutation> writeHandler = replicationStrategy.getWriteResponseHandler(replicaPlan, () -> { |
| long delay = Math.max(0, currentTimeMillis() - baseComplete.get()); |
| viewWriteMetrics.viewWriteLatency.update(delay, MILLISECONDS); |
| }, writeType, mutation, queryStartNanoTime); |
| BatchlogResponseHandler<IMutation> batchHandler = new ViewWriteMetricsWrapped(writeHandler, batchConsistencyLevel.blockFor(replicationStrategy), cleanup, queryStartNanoTime); |
| return new WriteResponseHandlerWrapper(batchHandler, mutation); |
| } |
| |
| // used by atomic_batch_mutate to decouple availability check from the write itself, caches consistency level and endpoints. |
| private static class WriteResponseHandlerWrapper |
| { |
| final BatchlogResponseHandler<IMutation> handler; |
| final Mutation mutation; |
| |
| WriteResponseHandlerWrapper(BatchlogResponseHandler<IMutation> handler, Mutation mutation) |
| { |
| this.handler = handler; |
| this.mutation = mutation; |
| } |
| } |
| |
| /** |
| * Send the mutations to the right targets, write it locally if it corresponds or writes a hint when the node |
| * is not available. |
| * |
| * Note about hints: |
| * <pre> |
| * {@code |
| * | Hinted Handoff | Consist. Level | |
| * | on | >=1 | --> wait for hints. We DO NOT notify the handler with handler.response() for hints; |
| * | on | ANY | --> wait for hints. Responses count towards consistency. |
| * | off | >=1 | --> DO NOT fire hints. And DO NOT wait for them to complete. |
| * | off | ANY | --> DO NOT fire hints. And DO NOT wait for them to complete. |
| * } |
| * </pre> |
| * |
| * @throws OverloadedException if the hints cannot be written/enqueued |
| */ |
| public static void sendToHintedReplicas(final Mutation mutation, |
| ReplicaPlan.ForWrite plan, |
| AbstractWriteResponseHandler<IMutation> responseHandler, |
| String localDataCenter, |
| Stage stage) |
| throws OverloadedException |
| { |
| // this dc replicas: |
| Collection<Replica> localDc = null; |
| // extra-datacenter replicas, grouped by dc |
| Map<String, Collection<Replica>> dcGroups = null; |
| // only need to create a Message for non-local writes |
| Message<Mutation> message = null; |
| |
| boolean insertLocal = false; |
| Replica localReplica = null; |
| Collection<Replica> endpointsToHint = null; |
| |
| List<InetAddressAndPort> backPressureHosts = null; |
| |
| for (Replica destination : plan.contacts()) |
| { |
| checkHintOverload(destination); |
| |
| if (plan.isAlive(destination)) |
| { |
| if (destination.isSelf()) |
| { |
| insertLocal = true; |
| localReplica = destination; |
| } |
| else |
| { |
| // belongs on a different server |
| if (message == null) |
| message = Message.outWithFlag(MUTATION_REQ, mutation, MessageFlag.CALL_BACK_ON_FAILURE); |
| |
| String dc = DatabaseDescriptor.getEndpointSnitch().getDatacenter(destination); |
| |
| // direct writes to local DC or old Cassandra versions |
| // (1.1 knows how to forward old-style String message IDs; updated to int in 2.0) |
| if (localDataCenter.equals(dc)) |
| { |
| if (localDc == null) |
| localDc = new ArrayList<>(plan.contacts().size()); |
| |
| localDc.add(destination); |
| } |
| else |
| { |
| if (dcGroups == null) |
| dcGroups = new HashMap<>(); |
| |
| Collection<Replica> messages = dcGroups.get(dc); |
| if (messages == null) |
| messages = dcGroups.computeIfAbsent(dc, (v) -> new ArrayList<>(3)); // most DCs will have <= 3 replicas |
| |
| messages.add(destination); |
| } |
| |
| if (backPressureHosts == null) |
| backPressureHosts = new ArrayList<>(plan.contacts().size()); |
| |
| backPressureHosts.add(destination.endpoint()); |
| } |
| } |
| else |
| { |
| //Immediately mark the response as expired since the request will not be sent |
| responseHandler.expired(); |
| if (shouldHint(destination)) |
| { |
| if (endpointsToHint == null) |
| endpointsToHint = new ArrayList<>(); |
| |
| endpointsToHint.add(destination); |
| } |
| } |
| } |
| |
| if (endpointsToHint != null) |
| submitHint(mutation, EndpointsForToken.copyOf(mutation.key().getToken(), endpointsToHint), responseHandler); |
| |
| if (insertLocal) |
| { |
| Preconditions.checkNotNull(localReplica); |
| performLocally(stage, localReplica, mutation::apply, responseHandler); |
| } |
| |
| if (localDc != null) |
| { |
| for (Replica destination : localDc) |
| MessagingService.instance().sendWriteWithCallback(message, destination, responseHandler); |
| } |
| if (dcGroups != null) |
| { |
| // for each datacenter, send the message to one node to relay the write to other replicas |
| for (Collection<Replica> dcTargets : dcGroups.values()) |
| sendMessagesToNonlocalDC(message, EndpointsForToken.copyOf(mutation.key().getToken(), dcTargets), responseHandler); |
| } |
| } |
| |
| private static void checkHintOverload(Replica destination) |
| { |
| // avoid OOMing due to excess hints. we need to do this check even for "live" nodes, since we can |
| // still generate hints for those if it's overloaded or simply dead but not yet known-to-be-dead. |
| // The idea is that if we have over maxHintsInProgress hints in flight, this is probably due to |
| // a small number of nodes causing problems, so we should avoid shutting down writes completely to |
| // healthy nodes. Any node with no hintsInProgress is considered healthy. |
| if (StorageMetrics.totalHintsInProgress.getCount() > maxHintsInProgress |
| && (getHintsInProgressFor(destination.endpoint()).get() > 0 && shouldHint(destination))) |
| { |
| throw new OverloadedException("Too many in flight hints: " + StorageMetrics.totalHintsInProgress.getCount() + |
| " destination: " + destination + |
| " destination hints: " + getHintsInProgressFor(destination.endpoint()).get()); |
| } |
| } |
| |
| /* |
| * Send the message to the first replica of targets, and have it forward the message to others in its DC |
| */ |
| private static void sendMessagesToNonlocalDC(Message<? extends IMutation> message, |
| EndpointsForToken targets, |
| AbstractWriteResponseHandler<IMutation> handler) |
| { |
| final Replica target; |
| |
| if (targets.size() > 1) |
| { |
| target = targets.get(ThreadLocalRandom.current().nextInt(0, targets.size())); |
| EndpointsForToken forwardToReplicas = targets.filter(r -> r != target, targets.size()); |
| |
| for (Replica replica : forwardToReplicas) |
| { |
| MessagingService.instance().callbacks.addWithExpiration(handler, message, replica); |
| logger.trace("Adding FWD message to {}@{}", message.id(), replica); |
| } |
| |
| // starting with 4.0, use the same message id for all replicas |
| long[] messageIds = new long[forwardToReplicas.size()]; |
| Arrays.fill(messageIds, message.id()); |
| |
| message = message.withForwardTo(new ForwardingInfo(forwardToReplicas.endpointList(), messageIds)); |
| } |
| else |
| { |
| target = targets.get(0); |
| } |
| |
| MessagingService.instance().sendWriteWithCallback(message, target, handler); |
| logger.trace("Sending message to {}@{}", message.id(), target); |
| } |
| |
| private static void performLocally(Stage stage, Replica localReplica, final Runnable runnable) |
| { |
| stage.maybeExecuteImmediately(new LocalMutationRunnable(localReplica) |
| { |
| public void runMayThrow() |
| { |
| try |
| { |
| runnable.run(); |
| } |
| catch (Exception ex) |
| { |
| logger.error("Failed to apply mutation locally : ", ex); |
| } |
| } |
| |
| @Override |
| protected Verb verb() |
| { |
| return Verb.MUTATION_REQ; |
| } |
| }); |
| } |
| |
| private static void performLocally(Stage stage, Replica localReplica, final Runnable runnable, final RequestCallback<?> handler) |
| { |
| stage.maybeExecuteImmediately(new LocalMutationRunnable(localReplica) |
| { |
| public void runMayThrow() |
| { |
| try |
| { |
| runnable.run(); |
| handler.onResponse(null); |
| } |
| catch (Exception ex) |
| { |
| if (!(ex instanceof WriteTimeoutException)) |
| logger.error("Failed to apply mutation locally : ", ex); |
| handler.onFailure(FBUtilities.getBroadcastAddressAndPort(), RequestFailureReason.forException(ex)); |
| } |
| } |
| |
| @Override |
| protected Verb verb() |
| { |
| return Verb.MUTATION_REQ; |
| } |
| }); |
| } |
| |
| /** |
| * Handle counter mutation on the coordinator host. |
| * |
| * A counter mutation needs to first be applied to a replica (that we'll call the leader for the mutation) before being |
| * replicated to the other endpoint. To achieve so, there is two case: |
| * 1) the coordinator host is a replica: we proceed to applying the update locally and replicate throug |
| * applyCounterMutationOnCoordinator |
| * 2) the coordinator is not a replica: we forward the (counter)mutation to a chosen replica (that will proceed through |
| * applyCounterMutationOnLeader upon receive) and wait for its acknowledgment. |
| * |
| * Implementation note: We check if we can fulfill the CL on the coordinator host even if he is not a replica to allow |
| * quicker response and because the WriteResponseHandlers don't make it easy to send back an error. We also always gather |
| * the write latencies at the coordinator node to make gathering point similar to the case of standard writes. |
| */ |
| public static AbstractWriteResponseHandler<IMutation> mutateCounter(CounterMutation cm, String localDataCenter, long queryStartNanoTime) throws UnavailableException, OverloadedException |
| { |
| Replica replica = findSuitableReplica(cm.getKeyspaceName(), cm.key(), localDataCenter, cm.consistency()); |
| |
| if (replica.isSelf()) |
| { |
| return applyCounterMutationOnCoordinator(cm, localDataCenter, queryStartNanoTime); |
| } |
| else |
| { |
| // Exit now if we can't fulfill the CL here instead of forwarding to the leader replica |
| String keyspaceName = cm.getKeyspaceName(); |
| Keyspace keyspace = Keyspace.open(keyspaceName); |
| Token tk = cm.key().getToken(); |
| |
| // we build this ONLY to perform the sufficiency check that happens on construction |
| ReplicaPlans.forWrite(keyspace, cm.consistency(), tk, ReplicaPlans.writeAll); |
| |
| // This host isn't a replica, so mark the request as being remote. If this host is a |
| // replica, applyCounterMutationOnCoordinator() in the branch above will call performWrite(), and |
| // there we'll mark a local request against the metrics. |
| writeMetrics.remoteRequests.mark(); |
| |
| // Forward the actual update to the chosen leader replica |
| AbstractWriteResponseHandler<IMutation> responseHandler = new WriteResponseHandler<>(ReplicaPlans.forForwardingCounterWrite(keyspace, tk, replica), |
| WriteType.COUNTER, null, queryStartNanoTime); |
| |
| Tracing.trace("Enqueuing counter update to {}", replica); |
| Message message = Message.outWithFlag(Verb.COUNTER_MUTATION_REQ, cm, MessageFlag.CALL_BACK_ON_FAILURE); |
| MessagingService.instance().sendWriteWithCallback(message, replica, responseHandler); |
| return responseHandler; |
| } |
| } |
| |
| /** |
| * Find a suitable replica as leader for counter update. |
| * For now, we pick a random replica in the local DC (or ask the snitch if |
| * there is no replica alive in the local DC). |
| * TODO: if we track the latency of the counter writes (which makes sense |
| * contrarily to standard writes since there is a read involved), we could |
| * trust the dynamic snitch entirely, which may be a better solution. It |
| * is unclear we want to mix those latencies with read latencies, so this |
| * may be a bit involved. |
| */ |
| private static Replica findSuitableReplica(String keyspaceName, DecoratedKey key, String localDataCenter, ConsistencyLevel cl) throws UnavailableException |
| { |
| Keyspace keyspace = Keyspace.open(keyspaceName); |
| IEndpointSnitch snitch = DatabaseDescriptor.getEndpointSnitch(); |
| AbstractReplicationStrategy replicationStrategy = keyspace.getReplicationStrategy(); |
| EndpointsForToken replicas = replicationStrategy.getNaturalReplicasForToken(key); |
| |
| // CASSANDRA-13043: filter out those endpoints not accepting clients yet, maybe because still bootstrapping |
| replicas = replicas.filter(replica -> StorageService.instance.isRpcReady(replica.endpoint())); |
| |
| // CASSANDRA-17411: filter out endpoints that are not alive |
| replicas = replicas.filter(replica -> FailureDetector.instance.isAlive(replica.endpoint())); |
| |
| // TODO have a way to compute the consistency level |
| if (replicas.isEmpty()) |
| throw UnavailableException.create(cl, cl.blockFor(replicationStrategy), 0); |
| |
| List<Replica> localReplicas = new ArrayList<>(replicas.size()); |
| |
| for (Replica replica : replicas) |
| if (snitch.getDatacenter(replica).equals(localDataCenter)) |
| localReplicas.add(replica); |
| |
| if (localReplicas.isEmpty()) |
| { |
| // If the consistency required is local then we should not involve other DCs |
| if (cl.isDatacenterLocal()) |
| throw UnavailableException.create(cl, cl.blockFor(replicationStrategy), 0); |
| |
| // No endpoint in local DC, pick the closest endpoint according to the snitch |
| replicas = snitch.sortedByProximity(FBUtilities.getBroadcastAddressAndPort(), replicas); |
| return replicas.get(0); |
| } |
| |
| return localReplicas.get(ThreadLocalRandom.current().nextInt(localReplicas.size())); |
| } |
| |
| // Must be called on a replica of the mutation. This replica becomes the |
| // leader of this mutation. |
| public static AbstractWriteResponseHandler<IMutation> applyCounterMutationOnLeader(CounterMutation cm, String localDataCenter, Runnable callback, long queryStartNanoTime) |
| throws UnavailableException, OverloadedException |
| { |
| return performWrite(cm, cm.consistency(), localDataCenter, counterWritePerformer, callback, WriteType.COUNTER, queryStartNanoTime); |
| } |
| |
| // Same as applyCounterMutationOnLeader but must with the difference that it use the MUTATION stage to execute the write (while |
| // applyCounterMutationOnLeader assumes it is on the MUTATION stage already) |
| public static AbstractWriteResponseHandler<IMutation> applyCounterMutationOnCoordinator(CounterMutation cm, String localDataCenter, long queryStartNanoTime) |
| throws UnavailableException, OverloadedException |
| { |
| return performWrite(cm, cm.consistency(), localDataCenter, counterWriteOnCoordinatorPerformer, null, WriteType.COUNTER, queryStartNanoTime); |
| } |
| |
| private static Runnable counterWriteTask(final IMutation mutation, |
| final ReplicaPlan.ForWrite replicaPlan, |
| final AbstractWriteResponseHandler<IMutation> responseHandler, |
| final String localDataCenter) |
| { |
| return new DroppableRunnable(Verb.COUNTER_MUTATION_REQ) |
| { |
| @Override |
| public void runMayThrow() throws OverloadedException, WriteTimeoutException |
| { |
| assert mutation instanceof CounterMutation; |
| |
| Mutation result = ((CounterMutation) mutation).applyCounterMutation(); |
| responseHandler.onResponse(null); |
| sendToHintedReplicas(result, replicaPlan, responseHandler, localDataCenter, Stage.COUNTER_MUTATION); |
| } |
| }; |
| } |
| |
| private static boolean systemKeyspaceQuery(List<? extends ReadCommand> cmds) |
| { |
| for (ReadCommand cmd : cmds) |
| if (!SchemaConstants.isLocalSystemKeyspace(cmd.metadata().keyspace)) |
| return false; |
| return true; |
| } |
| |
| public static RowIterator readOne(SinglePartitionReadCommand command, ConsistencyLevel consistencyLevel, long queryStartNanoTime) |
| throws UnavailableException, IsBootstrappingException, ReadFailureException, ReadTimeoutException, InvalidRequestException |
| { |
| return PartitionIterators.getOnlyElement(read(SinglePartitionReadCommand.Group.one(command), consistencyLevel, queryStartNanoTime), command); |
| } |
| |
| /** |
| * Performs the actual reading of a row out of the StorageService, fetching |
| * a specific set of column names from a given column family. |
| */ |
| public static PartitionIterator read(SinglePartitionReadCommand.Group group, ConsistencyLevel consistencyLevel, long queryStartNanoTime) |
| throws UnavailableException, IsBootstrappingException, ReadFailureException, ReadTimeoutException, InvalidRequestException |
| { |
| if (StorageService.instance.isBootstrapMode() && !systemKeyspaceQuery(group.queries)) |
| { |
| readMetrics.unavailables.mark(); |
| readMetricsForLevel(consistencyLevel).unavailables.mark(); |
| IsBootstrappingException exception = new IsBootstrappingException(); |
| logRequestException(exception, group.queries); |
| throw exception; |
| } |
| |
| if (DatabaseDescriptor.getPartitionDenylistEnabled() && DatabaseDescriptor.getDenylistReadsEnabled()) |
| { |
| for (SinglePartitionReadCommand command : group.queries) |
| { |
| if (!partitionDenylist.isKeyPermitted(command.metadata().id, command.partitionKey().getKey())) |
| { |
| denylistMetrics.incrementReadsRejected(); |
| throw new InvalidRequestException(String.format("Unable to read denylisted partition [0x%s] in %s/%s", |
| command.partitionKey().toString(), command.metadata().keyspace, command.metadata().name)); |
| } |
| } |
| } |
| |
| return consistencyLevel.isSerialConsistency() |
| ? readWithPaxos(group, consistencyLevel, queryStartNanoTime) |
| : readRegular(group, consistencyLevel, queryStartNanoTime); |
| } |
| |
| private static PartitionIterator readWithPaxos(SinglePartitionReadCommand.Group group, ConsistencyLevel consistencyLevel, long queryStartNanoTime) |
| throws InvalidRequestException, UnavailableException, ReadFailureException, ReadTimeoutException |
| { |
| return Paxos.useV2() |
| ? Paxos.read(group, consistencyLevel) |
| : legacyReadWithPaxos(group, consistencyLevel, queryStartNanoTime); |
| } |
| |
| private static PartitionIterator legacyReadWithPaxos(SinglePartitionReadCommand.Group group, ConsistencyLevel consistencyLevel, long queryStartNanoTime) |
| throws InvalidRequestException, UnavailableException, ReadFailureException, ReadTimeoutException |
| { |
| if (group.queries.size() > 1) |
| throw new InvalidRequestException("SERIAL/LOCAL_SERIAL consistency may only be requested for one partition at a time"); |
| |
| long start = nanoTime(); |
| SinglePartitionReadCommand command = group.queries.get(0); |
| TableMetadata metadata = command.metadata(); |
| DecoratedKey key = command.partitionKey(); |
| // calculate the blockFor before repair any paxos round to avoid RS being altered in between. |
| int blockForRead = consistencyLevel.blockFor(Keyspace.open(metadata.keyspace).getReplicationStrategy()); |
| |
| PartitionIterator result = null; |
| try |
| { |
| final ConsistencyLevel consistencyForReplayCommitsOrFetch = consistencyLevel == ConsistencyLevel.LOCAL_SERIAL |
| ? ConsistencyLevel.LOCAL_QUORUM |
| : ConsistencyLevel.QUORUM; |
| |
| try |
| { |
| // Commit an empty update to make sure all in-progress updates that should be finished first is, _and_ |
| // that no other in-progress can get resurrected. |
| Function<Ballot, Pair<PartitionUpdate, RowIterator>> updateProposer = |
| !Paxos.isLinearizable() |
| ? ballot -> null |
| : ballot -> Pair.create(PartitionUpdate.emptyUpdate(metadata, key), null); |
| // When replaying, we commit at quorum/local quorum, as we want to be sure the following read (done at |
| // quorum/local_quorum) sees any replayed updates. Our own update is however empty, and those don't even |
| // get committed due to an optimiation described in doPaxos/beingRepairAndPaxos, so the commit |
| // consistency is irrelevant (we use ANY just to emphasis that we don't wait on our commit). |
| doPaxos(metadata, |
| key, |
| consistencyLevel, |
| consistencyForReplayCommitsOrFetch, |
| ConsistencyLevel.ANY, |
| start, |
| casReadMetrics, |
| updateProposer); |
| } |
| catch (WriteTimeoutException e) |
| { |
| throw new ReadTimeoutException(consistencyLevel, 0, blockForRead, false); |
| } |
| catch (WriteFailureException e) |
| { |
| throw new ReadFailureException(consistencyLevel, e.received, e.blockFor, false, e.failureReasonByEndpoint); |
| } |
| |
| result = fetchRows(group.queries, consistencyForReplayCommitsOrFetch, queryStartNanoTime); |
| } |
| catch (UnavailableException e) |
| { |
| readMetrics.unavailables.mark(); |
| casReadMetrics.unavailables.mark(); |
| readMetricsForLevel(consistencyLevel).unavailables.mark(); |
| logRequestException(e, group.queries); |
| throw e; |
| } |
| catch (ReadTimeoutException e) |
| { |
| readMetrics.timeouts.mark(); |
| casReadMetrics.timeouts.mark(); |
| readMetricsForLevel(consistencyLevel).timeouts.mark(); |
| logRequestException(e, group.queries); |
| throw e; |
| } |
| catch (ReadAbortException e) |
| { |
| readMetrics.markAbort(e); |
| casReadMetrics.markAbort(e); |
| readMetricsForLevel(consistencyLevel).markAbort(e); |
| throw e; |
| } |
| catch (ReadFailureException e) |
| { |
| readMetrics.failures.mark(); |
| casReadMetrics.failures.mark(); |
| readMetricsForLevel(consistencyLevel).failures.mark(); |
| throw e; |
| } |
| finally |
| { |
| long latency = nanoTime() - start; |
| readMetrics.addNano(latency); |
| casReadMetrics.addNano(latency); |
| readMetricsForLevel(consistencyLevel).addNano(latency); |
| Keyspace.open(metadata.keyspace).getColumnFamilyStore(metadata.name).metric.coordinatorReadLatency.update(latency, TimeUnit.NANOSECONDS); |
| } |
| |
| return result; |
| } |
| |
| @SuppressWarnings("resource") |
| private static PartitionIterator readRegular(SinglePartitionReadCommand.Group group, ConsistencyLevel consistencyLevel, long queryStartNanoTime) |
| throws UnavailableException, ReadFailureException, ReadTimeoutException |
| { |
| long start = nanoTime(); |
| try |
| { |
| PartitionIterator result = fetchRows(group.queries, consistencyLevel, queryStartNanoTime); |
| // Note that the only difference between the command in a group must be the partition key on which |
| // they applied. |
| boolean enforceStrictLiveness = group.queries.get(0).metadata().enforceStrictLiveness(); |
| // If we have more than one command, then despite each read command honoring the limit, the total result |
| // might not honor it and so we should enforce it |
| if (group.queries.size() > 1) |
| result = group.limits().filter(result, group.nowInSec(), group.selectsFullPartition(), enforceStrictLiveness); |
| return result; |
| } |
| catch (UnavailableException e) |
| { |
| readMetrics.unavailables.mark(); |
| readMetricsForLevel(consistencyLevel).unavailables.mark(); |
| logRequestException(e, group.queries); |
| throw e; |
| } |
| catch (ReadTimeoutException e) |
| { |
| readMetrics.timeouts.mark(); |
| readMetricsForLevel(consistencyLevel).timeouts.mark(); |
| logRequestException(e, group.queries); |
| throw e; |
| } |
| catch (ReadAbortException e) |
| { |
| recordReadRegularAbort(consistencyLevel, e); |
| throw e; |
| } |
| catch (ReadFailureException e) |
| { |
| readMetrics.failures.mark(); |
| readMetricsForLevel(consistencyLevel).failures.mark(); |
| throw e; |
| } |
| finally |
| { |
| long latency = nanoTime() - start; |
| readMetrics.addNano(latency); |
| readMetricsForLevel(consistencyLevel).addNano(latency); |
| // TODO avoid giving every command the same latency number. Can fix this in CASSADRA-5329 |
| for (ReadCommand command : group.queries) |
| Keyspace.openAndGetStore(command.metadata()).metric.coordinatorReadLatency.update(latency, TimeUnit.NANOSECONDS); |
| } |
| } |
| |
| public static void recordReadRegularAbort(ConsistencyLevel consistencyLevel, Throwable cause) |
| { |
| readMetrics.markAbort(cause); |
| readMetricsForLevel(consistencyLevel).markAbort(cause); |
| } |
| |
| public static PartitionIterator concatAndBlockOnRepair(List<PartitionIterator> iterators, List<ReadRepair<?, ?>> repairs) |
| { |
| PartitionIterator concatenated = PartitionIterators.concat(iterators); |
| |
| if (repairs.isEmpty()) |
| return concatenated; |
| |
| return new PartitionIterator() |
| { |
| public void close() |
| { |
| concatenated.close(); |
| repairs.forEach(ReadRepair::maybeSendAdditionalWrites); |
| repairs.forEach(ReadRepair::awaitWrites); |
| } |
| |
| public boolean hasNext() |
| { |
| return concatenated.hasNext(); |
| } |
| |
| public RowIterator next() |
| { |
| return concatenated.next(); |
| } |
| }; |
| } |
| |
| /** |
| * This function executes local and remote reads, and blocks for the results: |
| * |
| * 1. Get the replica locations, sorted by response time according to the snitch |
| * 2. Send a data request to the closest replica, and digest requests to either |
| * a) all the replicas, if read repair is enabled |
| * b) the closest R-1 replicas, where R is the number required to satisfy the ConsistencyLevel |
| * 3. Wait for a response from R replicas |
| * 4. If the digests (if any) match the data return the data |
| * 5. else carry out read repair by getting data from all the nodes. |
| */ |
| private static PartitionIterator fetchRows(List<SinglePartitionReadCommand> commands, ConsistencyLevel consistencyLevel, long queryStartNanoTime) |
| throws UnavailableException, ReadFailureException, ReadTimeoutException |
| { |
| int cmdCount = commands.size(); |
| |
| AbstractReadExecutor[] reads = new AbstractReadExecutor[cmdCount]; |
| |
| // Get the replica locations, sorted by response time according to the snitch, and create a read executor |
| // for type of speculation we'll use in this read |
| for (int i=0; i<cmdCount; i++) |
| { |
| reads[i] = AbstractReadExecutor.getReadExecutor(commands.get(i), consistencyLevel, queryStartNanoTime); |
| |
| if (reads[i].hasLocalRead()) |
| readMetrics.localRequests.mark(); |
| else |
| readMetrics.remoteRequests.mark(); |
| } |
| |
| // sends a data request to the closest replica, and a digest request to the others. If we have a speculating |
| // read executoe, we'll only send read requests to enough replicas to satisfy the consistency level |
| for (int i=0; i<cmdCount; i++) |
| { |
| reads[i].executeAsync(); |
| } |
| |
| // if we have a speculating read executor and it looks like we may not receive a response from the initial |
| // set of replicas we sent messages to, speculatively send an additional messages to an un-contacted replica |
| for (int i=0; i<cmdCount; i++) |
| { |
| reads[i].maybeTryAdditionalReplicas(); |
| } |
| |
| // wait for enough responses to meet the consistency level. If there's a digest mismatch, begin the read |
| // repair process by sending full data reads to all replicas we received responses from. |
| boolean logBlockingRepairAttempts = instance.isLoggingReadRepairs(); |
| for (int i=0; i<cmdCount; i++) |
| { |
| reads[i].awaitResponses(logBlockingRepairAttempts); |
| } |
| |
| // read repair - if it looks like we may not receive enough full data responses to meet CL, send |
| // an additional request to any remaining replicas we haven't contacted (if there are any) |
| for (int i=0; i<cmdCount; i++) |
| { |
| reads[i].maybeSendAdditionalDataRequests(); |
| } |
| |
| // read repair - block on full data responses |
| for (int i=0; i<cmdCount; i++) |
| { |
| reads[i].awaitReadRepair(); |
| } |
| |
| // if we didn't do a read repair, return the contents of the data response, if we did do a read |
| // repair, merge the full data reads |
| List<PartitionIterator> results = new ArrayList<>(cmdCount); |
| List<ReadRepair<?, ?>> repairs = new ArrayList<>(cmdCount); |
| for (int i=0; i<cmdCount; i++) |
| { |
| results.add(reads[i].getResult()); |
| repairs.add(reads[i].getReadRepair()); |
| } |
| |
| // if we did a read repair, assemble repair mutation and block on them |
| return concatAndBlockOnRepair(results, repairs); |
| } |
| |
| public static class LocalReadRunnable extends DroppableRunnable |
| { |
| private final ReadCommand command; |
| private final ReadCallback handler; |
| private final boolean trackRepairedStatus; |
| |
| public LocalReadRunnable(ReadCommand command, ReadCallback handler) |
| { |
| this(command, handler, false); |
| } |
| |
| public LocalReadRunnable(ReadCommand command, ReadCallback handler, boolean trackRepairedStatus) |
| { |
| super(Verb.READ_REQ); |
| this.command = command; |
| this.handler = handler; |
| this.trackRepairedStatus = trackRepairedStatus; |
| } |
| |
| protected void runMayThrow() |
| { |
| try |
| { |
| MessageParams.reset(); |
| |
| boolean readRejected = false; |
| command.setMonitoringTime(approxCreationTimeNanos, false, verb.expiresAfterNanos(), DatabaseDescriptor.getSlowQueryTimeout(NANOSECONDS)); |
| |
| ReadResponse response; |
| try (ReadExecutionController controller = command.executionController(trackRepairedStatus); |
| UnfilteredPartitionIterator iterator = command.executeLocally(controller)) |
| { |
| response = command.createResponse(iterator, controller.getRepairedDataInfo()); |
| } |
| catch (RejectException e) |
| { |
| if (!command.isTrackingWarnings()) |
| throw e; |
| |
| response = command.createEmptyResponse(); |
| readRejected = true; |
| } |
| |
| if (command.complete()) |
| { |
| handler.response(response); |
| } |
| else |
| { |
| MessagingService.instance().metrics.recordSelfDroppedMessage(verb, MonotonicClock.Global.approxTime.now() - approxCreationTimeNanos, NANOSECONDS); |
| handler.onFailure(FBUtilities.getBroadcastAddressAndPort(), RequestFailureReason.UNKNOWN); |
| } |
| |
| if (!readRejected) |
| MessagingService.instance().latencySubscribers.add(FBUtilities.getBroadcastAddressAndPort(), MonotonicClock.Global.approxTime.now() - approxCreationTimeNanos, NANOSECONDS); |
| } |
| catch (Throwable t) |
| { |
| if (t instanceof TombstoneOverwhelmingException) |
| { |
| handler.onFailure(FBUtilities.getBroadcastAddressAndPort(), RequestFailureReason.READ_TOO_MANY_TOMBSTONES); |
| logger.error(t.getMessage()); |
| } |
| else |
| { |
| handler.onFailure(FBUtilities.getBroadcastAddressAndPort(), RequestFailureReason.UNKNOWN); |
| throw t; |
| } |
| } |
| } |
| } |
| |
| public static PartitionIterator getRangeSlice(PartitionRangeReadCommand command, |
| ConsistencyLevel consistencyLevel, |
| long queryStartNanoTime) |
| { |
| if (DatabaseDescriptor.getPartitionDenylistEnabled() && DatabaseDescriptor.getDenylistRangeReadsEnabled()) |
| { |
| final int denylisted = partitionDenylist.getDeniedKeysInRangeCount(command.metadata().id, command.dataRange().keyRange()); |
| if (denylisted > 0) |
| { |
| denylistMetrics.incrementRangeReadsRejected(); |
| String tokens = command.loggableTokens(); |
| throw new InvalidRequestException(String.format("Attempted to read a range containing %d denylisted keys in %s/%s." + |
| " Range read: %s", denylisted, command.metadata().keyspace, command.metadata().name, |
| tokens)); |
| } |
| } |
| return RangeCommands.partitions(command, consistencyLevel, queryStartNanoTime); |
| } |
| |
| public Map<String, List<String>> getSchemaVersions() |
| { |
| return describeSchemaVersions(false); |
| } |
| |
| public Map<String, List<String>> getSchemaVersionsWithPort() |
| { |
| return describeSchemaVersions(true); |
| } |
| |
| /** |
| * initiate a request/response session with each live node to check whether or not everybody is using the same |
| * migration id. This is useful for determining if a schema change has propagated through the cluster. Disagreement |
| * is assumed if any node fails to respond. |
| */ |
| public static Map<String, List<String>> describeSchemaVersions(boolean withPort) |
| { |
| final String myVersion = Schema.instance.getVersion().toString(); |
| final Map<InetAddressAndPort, UUID> versions = new ConcurrentHashMap<>(); |
| final Set<InetAddressAndPort> liveHosts = Gossiper.instance.getLiveMembers(); |
| final CountDownLatch latch = newCountDownLatch(liveHosts.size()); |
| |
| RequestCallback<UUID> cb = message -> |
| { |
| // record the response from the remote node. |
| versions.put(message.from(), message.payload); |
| latch.decrement(); |
| }; |
| // an empty message acts as a request to the SchemaVersionVerbHandler. |
| Message message = out(SCHEMA_VERSION_REQ, noPayload); |
| for (InetAddressAndPort endpoint : liveHosts) |
| MessagingService.instance().sendWithCallback(message, endpoint, cb); |
| |
| try |
| { |
| // wait for as long as possible. timeout-1s if possible. |
| latch.await(DatabaseDescriptor.getRpcTimeout(NANOSECONDS), NANOSECONDS); |
| } |
| catch (InterruptedException e) |
| { |
| throw new UncheckedInterruptedException(e); |
| } |
| |
| // maps versions to hosts that are on that version. |
| Map<String, List<String>> results = new HashMap<String, List<String>>(); |
| Iterable<InetAddressAndPort> allHosts = concat(Gossiper.instance.getLiveMembers(), Gossiper.instance.getUnreachableMembers()); |
| for (InetAddressAndPort host : allHosts) |
| { |
| UUID version = versions.get(host); |
| String stringVersion = version == null ? UNREACHABLE : version.toString(); |
| List<String> hosts = results.get(stringVersion); |
| if (hosts == null) |
| { |
| hosts = new ArrayList<String>(); |
| results.put(stringVersion, hosts); |
| } |
| hosts.add(host.getHostAddress(withPort)); |
| } |
| |
| // we're done: the results map is ready to return to the client. the rest is just debug logging: |
| if (results.get(UNREACHABLE) != null) |
| logger.debug("Hosts not in agreement. Didn't get a response from everybody: {}", join(results.get(UNREACHABLE), ",")); |
| for (Map.Entry<String, List<String>> entry : results.entrySet()) |
| { |
| // check for version disagreement. log the hosts that don't agree. |
| if (entry.getKey().equals(UNREACHABLE) || entry.getKey().equals(myVersion)) |
| continue; |
| for (String host : entry.getValue()) |
| logger.debug("{} disagrees ({})", host, entry.getKey()); |
| } |
| if (results.size() == 1) |
| logger.debug("Schemas are in agreement."); |
| |
| return results; |
| } |
| |
| public boolean getHintedHandoffEnabled() |
| { |
| return DatabaseDescriptor.hintedHandoffEnabled(); |
| } |
| |
| public void setHintedHandoffEnabled(boolean b) |
| { |
| synchronized (StorageService.instance) |
| { |
| if (b) |
| StorageService.instance.checkServiceAllowedToStart("hinted handoff"); |
| |
| DatabaseDescriptor.setHintedHandoffEnabled(b); |
| } |
| } |
| |
| public void enableHintsForDC(String dc) |
| { |
| DatabaseDescriptor.enableHintsForDC(dc); |
| } |
| |
| public void disableHintsForDC(String dc) |
| { |
| DatabaseDescriptor.disableHintsForDC(dc); |
| } |
| |
| public Set<String> getHintedHandoffDisabledDCs() |
| { |
| return DatabaseDescriptor.hintedHandoffDisabledDCs(); |
| } |
| |
| public int getMaxHintWindow() |
| { |
| return DatabaseDescriptor.getMaxHintWindow(); |
| } |
| |
| public void setMaxHintWindow(int ms) |
| { |
| DatabaseDescriptor.setMaxHintWindow(ms); |
| } |
| |
| public int getMaxHintsSizePerHostInMiB() |
| { |
| return DatabaseDescriptor.getMaxHintsSizePerHostInMiB(); |
| } |
| |
| public void setMaxHintsSizePerHostInMiB(int value) |
| { |
| DatabaseDescriptor.setMaxHintsSizePerHostInMiB(value); |
| } |
| |
| public static boolean shouldHint(Replica replica) |
| { |
| return shouldHint(replica, true); |
| } |
| |
| /** |
| * Determines whether a hint should be stored or not. |
| * It rejects early if any of the condition is met: |
| * - Hints disabled entirely or for the belonging datacetner of the replica |
| * - The replica is transient or is the self node |
| * - The replica is no longer part of the ring |
| * - The hint window has expired |
| * - The hints have reached to the size limit for the node |
| * Otherwise, it permits. |
| * |
| * @param replica, the replica for the hint |
| * @param tryEnablePersistentWindow, true to consider hint_window_persistent_enabled; otherwise, ignores |
| * @return true to permit or false to reject hint |
| */ |
| public static boolean shouldHint(Replica replica, boolean tryEnablePersistentWindow) |
| { |
| if (!DatabaseDescriptor.hintedHandoffEnabled() |
| || replica.isTransient() |
| || replica.isSelf()) |
| return false; |
| |
| Set<String> disabledDCs = DatabaseDescriptor.hintedHandoffDisabledDCs(); |
| if (!disabledDCs.isEmpty()) |
| { |
| final String dc = DatabaseDescriptor.getEndpointSnitch().getDatacenter(replica); |
| if (disabledDCs.contains(dc)) |
| { |
| Tracing.trace("Not hinting {} since its data center {} has been disabled {}", replica, dc, disabledDCs); |
| return false; |
| } |
| } |
| |
| InetAddressAndPort endpoint = replica.endpoint(); |
| int maxHintWindow = DatabaseDescriptor.getMaxHintWindow(); |
| long endpointDowntime = Gossiper.instance.getEndpointDowntime(endpoint); |
| boolean hintWindowExpired = endpointDowntime > maxHintWindow; |
| |
| UUID hostIdForEndpoint = StorageService.instance.getHostIdForEndpoint(endpoint); |
| if (hostIdForEndpoint == null) |
| { |
| Tracing.trace("Discarding hint for endpoint not part of ring: {}", endpoint); |
| return false; |
| } |
| |
| // if persisting hints window, hintWindowExpired might be updated according to the timestamp of the earliest hint |
| if (tryEnablePersistentWindow && !hintWindowExpired && DatabaseDescriptor.hintWindowPersistentEnabled()) |
| { |
| long earliestHint = HintsService.instance.getEarliestHintForHost(hostIdForEndpoint); |
| hintWindowExpired = Clock.Global.currentTimeMillis() - maxHintWindow > earliestHint; |
| if (hintWindowExpired) |
| Tracing.trace("Not hinting {} for which there is the earliest hint stored at {}", replica, earliestHint); |
| } |
| |
| if (hintWindowExpired) |
| { |
| HintsService.instance.metrics.incrPastWindow(endpoint); |
| Tracing.trace("Not hinting {} which has been down {} ms", endpoint, endpointDowntime); |
| return false; |
| } |
| |
| long maxHintsSize = DatabaseDescriptor.getMaxHintsSizePerHost(); |
| long actualTotalHintsSize = HintsService.instance.getTotalHintsSize(hostIdForEndpoint); |
| boolean hasHintsReachedMaxSize = maxHintsSize > 0 && actualTotalHintsSize > maxHintsSize; |
| if (hasHintsReachedMaxSize) |
| { |
| Tracing.trace("Not hinting {} which has reached to the max hints size {} bytes on disk. The actual hints size on disk: {}", |
| endpoint, maxHintsSize, actualTotalHintsSize); |
| return false; |
| } |
| |
| return true; |
| } |
| |
| /** |
| * Performs the truncate operatoin, which effectively deletes all data from |
| * the column family cfname |
| * @param keyspace |
| * @param cfname |
| * @throws UnavailableException If some of the hosts in the ring are down. |
| * @throws TimeoutException |
| */ |
| public static void truncateBlocking(String keyspace, String cfname) throws UnavailableException, TimeoutException |
| { |
| logger.debug("Starting a blocking truncate operation on keyspace {}, CF {}", keyspace, cfname); |
| if (isAnyStorageHostDown()) |
| { |
| logger.info("Cannot perform truncate, some hosts are down"); |
| // Since the truncate operation is so aggressive and is typically only |
| // invoked by an admin, for simplicity we require that all nodes are up |
| // to perform the operation. |
| int liveMembers = Gossiper.instance.getLiveMembers().size(); |
| throw UnavailableException.create(ConsistencyLevel.ALL, liveMembers + Gossiper.instance.getUnreachableMembers().size(), liveMembers); |
| } |
| |
| Set<InetAddressAndPort> allEndpoints = StorageService.instance.getLiveRingMembers(true); |
| |
| int blockFor = allEndpoints.size(); |
| final TruncateResponseHandler responseHandler = new TruncateResponseHandler(blockFor); |
| |
| // Send out the truncate calls and track the responses with the callbacks. |
| Tracing.trace("Enqueuing truncate messages to hosts {}", allEndpoints); |
| Message<TruncateRequest> message = Message.out(TRUNCATE_REQ, new TruncateRequest(keyspace, cfname)); |
| for (InetAddressAndPort endpoint : allEndpoints) |
| MessagingService.instance().sendWithCallback(message, endpoint, responseHandler); |
| |
| // Wait for all |
| try |
| { |
| responseHandler.get(); |
| } |
| catch (TimeoutException e) |
| { |
| Tracing.trace("Timed out"); |
| throw e; |
| } |
| } |
| |
| /** |
| * Asks the gossiper if there are any nodes that are currently down. |
| * @return true if the gossiper thinks all nodes are up. |
| */ |
| private static boolean isAnyStorageHostDown() |
| { |
| return !Gossiper.instance.getUnreachableTokenOwners().isEmpty(); |
| } |
| |
| public interface WritePerformer |
| { |
| public void apply(IMutation mutation, |
| ReplicaPlan.ForWrite targets, |
| AbstractWriteResponseHandler<IMutation> responseHandler, |
| String localDataCenter) throws OverloadedException; |
| } |
| |
| /** |
| * This class captures metrics for views writes. |
| */ |
| private static class ViewWriteMetricsWrapped extends BatchlogResponseHandler<IMutation> |
| { |
| public ViewWriteMetricsWrapped(AbstractWriteResponseHandler<IMutation> writeHandler, int i, BatchlogCleanup cleanup, long queryStartNanoTime) |
| { |
| super(writeHandler, i, cleanup, queryStartNanoTime); |
| viewWriteMetrics.viewReplicasAttempted.inc(candidateReplicaCount()); |
| } |
| |
| public void onResponse(Message<IMutation> msg) |
| { |
| super.onResponse(msg); |
| viewWriteMetrics.viewReplicasSuccess.inc(); |
| } |
| } |
| |
| /** |
| * A Runnable that aborts if it doesn't start running before it times out |
| */ |
| private static abstract class DroppableRunnable implements Runnable |
| { |
| final long approxCreationTimeNanos; |
| final Verb verb; |
| |
| public DroppableRunnable(Verb verb) |
| { |
| this.approxCreationTimeNanos = MonotonicClock.Global.approxTime.now(); |
| this.verb = verb; |
| } |
| |
| public final void run() |
| { |
| long approxCurrentTimeNanos = MonotonicClock.Global.approxTime.now(); |
| long expirationTimeNanos = verb.expiresAtNanos(approxCreationTimeNanos); |
| if (approxCurrentTimeNanos > expirationTimeNanos) |
| { |
| long timeTakenNanos = approxCurrentTimeNanos - approxCreationTimeNanos; |
| MessagingService.instance().metrics.recordSelfDroppedMessage(verb, timeTakenNanos, NANOSECONDS); |
| return; |
| } |
| try |
| { |
| runMayThrow(); |
| } |
| catch (Exception e) |
| { |
| throw new RuntimeException(e); |
| } |
| } |
| |
| abstract protected void runMayThrow() throws Exception; |
| } |
| |
| /** |
| * Like DroppableRunnable, but if it aborts, it will rerun (on the mutation stage) after |
| * marking itself as a hint in progress so that the hint backpressure mechanism can function. |
| */ |
| private static abstract class LocalMutationRunnable implements Runnable |
| { |
| private final long approxCreationTimeNanos = MonotonicClock.Global.approxTime.now(); |
| |
| private final Replica localReplica; |
| |
| LocalMutationRunnable(Replica localReplica) |
| { |
| this.localReplica = localReplica; |
| } |
| |
| public final void run() |
| { |
| final Verb verb = verb(); |
| long nowNanos = MonotonicClock.Global.approxTime.now(); |
| long expirationTimeNanos = verb.expiresAtNanos(approxCreationTimeNanos); |
| if (nowNanos > expirationTimeNanos) |
| { |
| long timeTakenNanos = nowNanos - approxCreationTimeNanos; |
| MessagingService.instance().metrics.recordSelfDroppedMessage(Verb.MUTATION_REQ, timeTakenNanos, NANOSECONDS); |
| |
| HintRunnable runnable = new HintRunnable(EndpointsForToken.of(localReplica.range().right, localReplica)) |
| { |
| protected void runMayThrow() throws Exception |
| { |
| LocalMutationRunnable.this.runMayThrow(); |
| } |
| }; |
| submitHint(runnable); |
| return; |
| } |
| |
| try |
| { |
| runMayThrow(); |
| } |
| catch (Exception e) |
| { |
| throw new RuntimeException(e); |
| } |
| } |
| |
| abstract protected Verb verb(); |
| abstract protected void runMayThrow() throws Exception; |
| } |
| |
| public static void logRequestException(Exception exception, Collection<? extends ReadCommand> commands) |
| { |
| NoSpamLogger.log(logger, NoSpamLogger.Level.INFO, FAILURE_LOGGING_INTERVAL_SECONDS, TimeUnit.SECONDS, |
| "\"{}\" while executing {}", |
| () -> new Object[] |
| { |
| exception.getMessage(), |
| commands.stream().map(ReadCommand::toCQLString).collect(Collectors.joining("; ")) |
| }); |
| } |
| |
| /** |
| * HintRunnable will decrease totalHintsInProgress and targetHints when finished. |
| * It is the caller's responsibility to increment them initially. |
| */ |
| private abstract static class HintRunnable implements Runnable |
| { |
| public final EndpointsForToken targets; |
| |
| protected HintRunnable(EndpointsForToken targets) |
| { |
| this.targets = targets; |
| } |
| |
| public void run() |
| { |
| try |
| { |
| runMayThrow(); |
| } |
| catch (Exception e) |
| { |
| throw new RuntimeException(e); |
| } |
| finally |
| { |
| StorageMetrics.totalHintsInProgress.dec(targets.size()); |
| for (InetAddressAndPort target : targets.endpoints()) |
| getHintsInProgressFor(target).decrementAndGet(); |
| } |
| } |
| |
| abstract protected void runMayThrow() throws Exception; |
| } |
| |
| public long getTotalHints() |
| { |
| return StorageMetrics.totalHints.getCount(); |
| } |
| |
| public int getMaxHintsInProgress() |
| { |
| return maxHintsInProgress; |
| } |
| |
| public void setMaxHintsInProgress(int qs) |
| { |
| maxHintsInProgress = qs; |
| } |
| |
| public int getHintsInProgress() |
| { |
| return (int) StorageMetrics.totalHintsInProgress.getCount(); |
| } |
| |
| public void verifyNoHintsInProgress() |
| { |
| if (getHintsInProgress() > 0) |
| logger.warn("Some hints were not written before shutdown. This is not supposed to happen. You should (a) run repair, and (b) file a bug report"); |
| } |
| |
| private static AtomicInteger getHintsInProgressFor(InetAddressAndPort destination) |
| { |
| try |
| { |
| return hintsInProgress.load(destination); |
| } |
| catch (Exception e) |
| { |
| throw new AssertionError(e); |
| } |
| } |
| |
| public static Future<Void> submitHint(Mutation mutation, Replica target, AbstractWriteResponseHandler<IMutation> responseHandler) |
| { |
| return submitHint(mutation, EndpointsForToken.of(target.range().right, target), responseHandler); |
| } |
| |
| public static Future<Void> submitHint(Mutation mutation, |
| EndpointsForToken targets, |
| AbstractWriteResponseHandler<IMutation> responseHandler) |
| { |
| Replicas.assertFull(targets); // hints should not be written for transient replicas |
| HintRunnable runnable = new HintRunnable(targets) |
| { |
| public void runMayThrow() |
| { |
| Set<InetAddressAndPort> validTargets = new HashSet<>(targets.size()); |
| Set<UUID> hostIds = new HashSet<>(targets.size()); |
| for (InetAddressAndPort target : targets.endpoints()) |
| { |
| UUID hostId = StorageService.instance.getHostIdForEndpoint(target); |
| if (hostId != null) |
| { |
| hostIds.add(hostId); |
| validTargets.add(target); |
| } |
| else |
| logger.debug("Discarding hint for endpoint not part of ring: {}", target); |
| } |
| logger.trace("Adding hints for {}", validTargets); |
| HintsService.instance.write(hostIds, Hint.create(mutation, currentTimeMillis())); |
| validTargets.forEach(HintsService.instance.metrics::incrCreatedHints); |
| // Notify the handler only for CL == ANY |
| if (responseHandler != null && responseHandler.replicaPlan.consistencyLevel() == ConsistencyLevel.ANY) |
| responseHandler.onResponse(null); |
| } |
| }; |
| |
| return submitHint(runnable); |
| } |
| |
| private static Future<Void> submitHint(HintRunnable runnable) |
| { |
| StorageMetrics.totalHintsInProgress.inc(runnable.targets.size()); |
| for (Replica target : runnable.targets) |
| getHintsInProgressFor(target.endpoint()).incrementAndGet(); |
| return (Future<Void>) Stage.MUTATION.submit(runnable); |
| } |
| |
| public Long getRpcTimeout() { return DatabaseDescriptor.getRpcTimeout(MILLISECONDS); } |
| public void setRpcTimeout(Long timeoutInMillis) { DatabaseDescriptor.setRpcTimeout(timeoutInMillis); } |
| |
| public Long getReadRpcTimeout() { return DatabaseDescriptor.getReadRpcTimeout(MILLISECONDS); } |
| public void setReadRpcTimeout(Long timeoutInMillis) { DatabaseDescriptor.setReadRpcTimeout(timeoutInMillis); } |
| |
| public Long getWriteRpcTimeout() { return DatabaseDescriptor.getWriteRpcTimeout(MILLISECONDS); } |
| public void setWriteRpcTimeout(Long timeoutInMillis) { DatabaseDescriptor.setWriteRpcTimeout(timeoutInMillis); } |
| |
| public Long getCounterWriteRpcTimeout() { return DatabaseDescriptor.getCounterWriteRpcTimeout(MILLISECONDS); } |
| public void setCounterWriteRpcTimeout(Long timeoutInMillis) { DatabaseDescriptor.setCounterWriteRpcTimeout(timeoutInMillis); } |
| |
| public Long getCasContentionTimeout() { return DatabaseDescriptor.getCasContentionTimeout(MILLISECONDS); } |
| public void setCasContentionTimeout(Long timeoutInMillis) { DatabaseDescriptor.setCasContentionTimeout(timeoutInMillis); } |
| |
| public Long getRangeRpcTimeout() { return DatabaseDescriptor.getRangeRpcTimeout(MILLISECONDS); } |
| public void setRangeRpcTimeout(Long timeoutInMillis) { DatabaseDescriptor.setRangeRpcTimeout(timeoutInMillis); } |
| |
| public Long getTruncateRpcTimeout() { return DatabaseDescriptor.getTruncateRpcTimeout(MILLISECONDS); } |
| public void setTruncateRpcTimeout(Long timeoutInMillis) { DatabaseDescriptor.setTruncateRpcTimeout(timeoutInMillis); } |
| |
| public Long getNativeTransportMaxConcurrentConnections() { return DatabaseDescriptor.getNativeTransportMaxConcurrentConnections(); } |
| public void setNativeTransportMaxConcurrentConnections(Long nativeTransportMaxConcurrentConnections) { DatabaseDescriptor.setNativeTransportMaxConcurrentConnections(nativeTransportMaxConcurrentConnections); } |
| |
| public Long getNativeTransportMaxConcurrentConnectionsPerIp() { return DatabaseDescriptor.getNativeTransportMaxConcurrentConnectionsPerIp(); } |
| public void setNativeTransportMaxConcurrentConnectionsPerIp(Long nativeTransportMaxConcurrentConnections) { DatabaseDescriptor.setNativeTransportMaxConcurrentConnectionsPerIp(nativeTransportMaxConcurrentConnections); } |
| |
| public void reloadTriggerClasses() { TriggerExecutor.instance.reloadClasses(); } |
| |
| public long getReadRepairAttempted() |
| { |
| return ReadRepairMetrics.attempted.getCount(); |
| } |
| |
| public long getReadRepairRepairedBlocking() |
| { |
| return ReadRepairMetrics.repairedBlocking.getCount(); |
| } |
| |
| public long getReadRepairRepairedBackground() |
| { |
| return ReadRepairMetrics.repairedBackground.getCount(); |
| } |
| |
| public long getReadRepairRepairTimedOut() |
| { |
| return ReadRepairMetrics.timedOut.getCount(); |
| } |
| |
| public int getNumberOfTables() |
| { |
| return Schema.instance.getNumberOfTables(); |
| } |
| |
| public String getIdealConsistencyLevel() |
| { |
| return Objects.toString(DatabaseDescriptor.getIdealConsistencyLevel(), ""); |
| } |
| |
| public String setIdealConsistencyLevel(String cl) |
| { |
| ConsistencyLevel original = DatabaseDescriptor.getIdealConsistencyLevel(); |
| ConsistencyLevel newCL = ConsistencyLevel.valueOf(cl.trim().toUpperCase()); |
| DatabaseDescriptor.setIdealConsistencyLevel(newCL); |
| return String.format("Updating ideal consistency level new value: %s old value %s", newCL, original.toString()); |
| } |
| |
| @Deprecated |
| public int getOtcBacklogExpirationInterval() { |
| return 0; |
| } |
| |
| @Deprecated |
| public void setOtcBacklogExpirationInterval(int intervalInMillis) { } |
| |
| @Override |
| public void enableRepairedDataTrackingForRangeReads() |
| { |
| DatabaseDescriptor.setRepairedDataTrackingForRangeReadsEnabled(true); |
| } |
| |
| @Override |
| public void disableRepairedDataTrackingForRangeReads() |
| { |
| DatabaseDescriptor.setRepairedDataTrackingForRangeReadsEnabled(false); |
| } |
| |
| @Override |
| public boolean getRepairedDataTrackingEnabledForRangeReads() |
| { |
| return DatabaseDescriptor.getRepairedDataTrackingForRangeReadsEnabled(); |
| } |
| |
| @Override |
| public void enableRepairedDataTrackingForPartitionReads() |
| { |
| DatabaseDescriptor.setRepairedDataTrackingForPartitionReadsEnabled(true); |
| } |
| |
| @Override |
| public void disableRepairedDataTrackingForPartitionReads() |
| { |
| DatabaseDescriptor.setRepairedDataTrackingForPartitionReadsEnabled(false); |
| } |
| |
| @Override |
| public boolean getRepairedDataTrackingEnabledForPartitionReads() |
| { |
| return DatabaseDescriptor.getRepairedDataTrackingForPartitionReadsEnabled(); |
| } |
| |
| @Override |
| public void enableReportingUnconfirmedRepairedDataMismatches() |
| { |
| DatabaseDescriptor.reportUnconfirmedRepairedDataMismatches(true); |
| } |
| |
| @Override |
| public void disableReportingUnconfirmedRepairedDataMismatches() |
| { |
| DatabaseDescriptor.reportUnconfirmedRepairedDataMismatches(false); |
| } |
| |
| @Override |
| public boolean getReportingUnconfirmedRepairedDataMismatchesEnabled() |
| { |
| return DatabaseDescriptor.reportUnconfirmedRepairedDataMismatches(); |
| } |
| |
| @Override |
| public boolean getSnapshotOnRepairedDataMismatchEnabled() |
| { |
| return DatabaseDescriptor.snapshotOnRepairedDataMismatch(); |
| } |
| |
| @Override |
| public void enableSnapshotOnRepairedDataMismatch() |
| { |
| DatabaseDescriptor.setSnapshotOnRepairedDataMismatch(true); |
| } |
| |
| @Override |
| public void disableSnapshotOnRepairedDataMismatch() |
| { |
| DatabaseDescriptor.setSnapshotOnRepairedDataMismatch(false); |
| } |
| |
| static class PaxosBallotAndContention |
| { |
| final Ballot ballot; |
| final int contentions; |
| |
| PaxosBallotAndContention(Ballot ballot, int contentions) |
| { |
| this.ballot = ballot; |
| this.contentions = contentions; |
| } |
| |
| @Override |
| public final int hashCode() |
| { |
| int hashCode = 31 + (ballot == null ? 0 : ballot.hashCode()); |
| return 31 * hashCode * this.contentions; |
| } |
| |
| @Override |
| public final boolean equals(Object o) |
| { |
| if(!(o instanceof PaxosBallotAndContention)) |
| return false; |
| PaxosBallotAndContention that = (PaxosBallotAndContention)o; |
| // handles nulls properly |
| return Objects.equals(ballot, that.ballot) && contentions == that.contentions; |
| } |
| } |
| |
| @Override |
| public boolean getSnapshotOnDuplicateRowDetectionEnabled() |
| { |
| return DatabaseDescriptor.snapshotOnDuplicateRowDetection(); |
| } |
| |
| @Override |
| public void enableSnapshotOnDuplicateRowDetection() |
| { |
| DatabaseDescriptor.setSnapshotOnDuplicateRowDetection(true); |
| } |
| |
| @Override |
| public void disableSnapshotOnDuplicateRowDetection() |
| { |
| DatabaseDescriptor.setSnapshotOnDuplicateRowDetection(false); |
| } |
| |
| @Override |
| public boolean getCheckForDuplicateRowsDuringReads() |
| { |
| return DatabaseDescriptor.checkForDuplicateRowsDuringReads(); |
| } |
| |
| @Override |
| public void enableCheckForDuplicateRowsDuringReads() |
| { |
| DatabaseDescriptor.setCheckForDuplicateRowsDuringReads(true); |
| } |
| |
| @Override |
| public void disableCheckForDuplicateRowsDuringReads() |
| { |
| DatabaseDescriptor.setCheckForDuplicateRowsDuringReads(false); |
| } |
| |
| @Override |
| public boolean getCheckForDuplicateRowsDuringCompaction() |
| { |
| return DatabaseDescriptor.checkForDuplicateRowsDuringCompaction(); |
| } |
| |
| @Override |
| public void enableCheckForDuplicateRowsDuringCompaction() |
| { |
| DatabaseDescriptor.setCheckForDuplicateRowsDuringCompaction(true); |
| } |
| |
| @Override |
| public void disableCheckForDuplicateRowsDuringCompaction() |
| { |
| DatabaseDescriptor.setCheckForDuplicateRowsDuringCompaction(false); |
| } |
| |
| public void initialLoadPartitionDenylist() |
| { |
| partitionDenylist.initialLoad(); |
| } |
| |
| @Override |
| public void loadPartitionDenylist() |
| { |
| partitionDenylist.load(); |
| } |
| |
| @Override |
| public int getPartitionDenylistLoadAttempts() |
| { |
| return partitionDenylist.getLoadAttempts(); |
| } |
| |
| @Override |
| public int getPartitionDenylistLoadSuccesses() |
| { |
| return partitionDenylist.getLoadSuccesses(); |
| } |
| |
| @Override |
| public void setEnablePartitionDenylist(boolean enabled) |
| { |
| DatabaseDescriptor.setPartitionDenylistEnabled(enabled); |
| } |
| |
| @Override |
| public void setEnableDenylistWrites(boolean enabled) |
| { |
| DatabaseDescriptor.setDenylistWritesEnabled(enabled); |
| } |
| |
| @Override |
| public void setEnableDenylistReads(boolean enabled) |
| { |
| DatabaseDescriptor.setDenylistReadsEnabled(enabled); |
| } |
| |
| @Override |
| public void setEnableDenylistRangeReads(boolean enabled) |
| { |
| DatabaseDescriptor.setDenylistRangeReadsEnabled(enabled); |
| } |
| |
| @Override |
| public void setDenylistMaxKeysPerTable(int value) |
| { |
| DatabaseDescriptor.setDenylistMaxKeysPerTable(value); |
| } |
| |
| @Override |
| public void setDenylistMaxKeysTotal(int value) |
| { |
| DatabaseDescriptor.setDenylistMaxKeysTotal(value); |
| } |
| |
| /** |
| * Actively denies read and write access to the provided Partition Key |
| * @param keyspace Name of keyspace containing the PK you wish to deny access to |
| * @param table Name of table containing the PK you wish to deny access to |
| * @param partitionKeyAsString String representation of the PK you want to deny access to |
| * @return true if successfully added, false if failure |
| */ |
| @Override |
| public boolean denylistKey(String keyspace, String table, String partitionKeyAsString) |
| { |
| if (!Schema.instance.getKeyspaces().contains(keyspace)) |
| return false; |
| |
| final ColumnFamilyStore cfs = ColumnFamilyStore.getIfExists(keyspace, table); |
| if (cfs == null) |
| return false; |
| |
| final ByteBuffer bytes = cfs.metadata.get().partitionKeyType.fromString(partitionKeyAsString); |
| return partitionDenylist.addKeyToDenylist(keyspace, table, bytes); |
| } |
| |
| /** |
| * Attempts to remove the provided pk from the ks + table deny list |
| * @param keyspace Keyspace containing the pk to remove the denylist entry for |
| * @param table Table containing the pk to remove denylist entry for |
| * @param partitionKeyAsString String representation of the PK you want to re-allow access to |
| * @return true if found and removed, false if not |
| */ |
| @Override |
| public boolean removeDenylistKey(String keyspace, String table, String partitionKeyAsString) |
| { |
| if (!Schema.instance.getKeyspaces().contains(keyspace)) |
| return false; |
| |
| final ColumnFamilyStore cfs = ColumnFamilyStore.getIfExists(keyspace, table); |
| if (cfs == null) |
| return false; |
| |
| final ByteBuffer bytes = cfs.metadata.get().partitionKeyType.fromString(partitionKeyAsString); |
| return partitionDenylist.removeKeyFromDenylist(keyspace, table, bytes); |
| } |
| |
| /** |
| * A simple check for operators to determine what the denylisted value for a pk is on a node |
| */ |
| public boolean isKeyDenylisted(String keyspace, String table, String partitionKeyAsString) |
| { |
| if (!Schema.instance.getKeyspaces().contains(keyspace)) |
| return false; |
| |
| final ColumnFamilyStore cfs = ColumnFamilyStore.getIfExists(keyspace, table); |
| if (cfs == null) |
| return false; |
| |
| final ByteBuffer bytes = cfs.metadata.get().partitionKeyType.fromString(partitionKeyAsString); |
| return !partitionDenylist.isKeyPermitted(keyspace, table, bytes); |
| } |
| |
| @Override |
| public void logBlockingReadRepairAttemptsForNSeconds(int seconds) |
| { |
| logBlockingReadRepairAttemptsUntilNanos = nanoTime() + TimeUnit.SECONDS.toNanos(seconds); |
| } |
| |
| @Override |
| public boolean isLoggingReadRepairs() |
| { |
| return nanoTime() <= StorageProxy.instance.logBlockingReadRepairAttemptsUntilNanos; |
| } |
| |
| @Override |
| public void setPaxosVariant(String variant) |
| { |
| Preconditions.checkNotNull(variant); |
| Paxos.setPaxosVariant(Config.PaxosVariant.valueOf(variant)); |
| } |
| |
| @Override |
| public String getPaxosVariant() |
| { |
| return Paxos.getPaxosVariant().toString(); |
| } |
| |
| @Override |
| public boolean getUseStatementsEnabled() |
| { |
| return DatabaseDescriptor.getUseStatementsEnabled(); |
| } |
| |
| @Override |
| public void setUseStatementsEnabled(boolean enabled) |
| { |
| DatabaseDescriptor.setUseStatementsEnabled(enabled); |
| } |
| |
| public void setPaxosContentionStrategy(String spec) |
| { |
| ContentionStrategy.setStrategy(spec); |
| } |
| |
| public String getPaxosContentionStrategy() |
| { |
| return ContentionStrategy.getStrategySpec(); |
| } |
| |
| @Override |
| public void setPaxosCoordinatorLockingDisabled(boolean disabled) |
| { |
| PaxosState.setDisableCoordinatorLocking(disabled); |
| } |
| |
| @Override |
| public boolean getPaxosCoordinatorLockingDisabled() |
| { |
| return PaxosState.getDisableCoordinatorLocking(); |
| } |
| } |