| /* |
| * Licensed to the Apache Software Foundation (ASF) under one |
| * or more contributor license agreements. See the NOTICE file |
| * distributed with this work for additional information |
| * regarding copyright ownership. The ASF licenses this file |
| * to you under the Apache License, Version 2.0 (the |
| * "License"); you may not use this file except in compliance |
| * with the License. You may obtain a copy of the License at |
| * |
| * http://www.apache.org/licenses/LICENSE-2.0 |
| * |
| * Unless required by applicable law or agreed to in writing, software |
| * distributed under the License is distributed on an "AS IS" BASIS, |
| * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. |
| * See the License for the specific language governing permissions and |
| * limitations under the License. |
| */ |
| |
| package org.apache.cassandra.service.paxos.cleanup; |
| |
| import java.util.Collection; |
| import java.util.Map; |
| import java.util.UUID; |
| import java.util.concurrent.ConcurrentHashMap; |
| |
| import com.google.common.base.Preconditions; |
| import org.slf4j.Logger; |
| import org.slf4j.LoggerFactory; |
| |
| import org.apache.cassandra.config.DatabaseDescriptor; |
| import org.apache.cassandra.db.ConsistencyLevel; |
| import org.apache.cassandra.db.DecoratedKey; |
| import org.apache.cassandra.dht.Range; |
| import org.apache.cassandra.dht.Token; |
| import org.apache.cassandra.schema.Schema; |
| import org.apache.cassandra.schema.TableId; |
| import org.apache.cassandra.schema.TableMetadata; |
| import org.apache.cassandra.service.paxos.AbstractPaxosRepair; |
| import org.apache.cassandra.service.paxos.PaxosRepair; |
| import org.apache.cassandra.service.paxos.PaxosState; |
| import org.apache.cassandra.service.paxos.uncommitted.UncommittedPaxosKey; |
| import org.apache.cassandra.utils.CloseableIterator; |
| import org.apache.cassandra.utils.concurrent.AsyncFuture; |
| |
| import static org.apache.cassandra.service.paxos.cleanup.PaxosCleanupSession.TIMEOUT_NANOS; |
| import static org.apache.cassandra.utils.Clock.Global.nanoTime; |
| |
| public class PaxosCleanupLocalCoordinator extends AsyncFuture<PaxosCleanupResponse> |
| { |
| private static final Logger logger = LoggerFactory.getLogger(PaxosCleanupLocalCoordinator.class); |
| private static final UUID INTERNAL_SESSION = new UUID(0, 0); |
| |
| private final UUID session; |
| private final TableId tableId; |
| private final TableMetadata table; |
| private final Collection<Range<Token>> ranges; |
| private final CloseableIterator<UncommittedPaxosKey> uncommittedIter; |
| private int count = 0; |
| private final long deadline; |
| |
| private final Map<DecoratedKey, AbstractPaxosRepair> inflight = new ConcurrentHashMap<>(); |
| private final PaxosTableRepairs tableRepairs; |
| |
| private PaxosCleanupLocalCoordinator(UUID session, TableId tableId, Collection<Range<Token>> ranges, CloseableIterator<UncommittedPaxosKey> uncommittedIter) |
| { |
| this.session = session; |
| this.tableId = tableId; |
| this.table = Schema.instance.getTableMetadata(tableId); |
| this.ranges = ranges; |
| this.uncommittedIter = uncommittedIter; |
| this.tableRepairs = PaxosTableRepairs.getForTable(tableId); |
| this.deadline = TIMEOUT_NANOS + nanoTime(); |
| } |
| |
| public synchronized void start() |
| { |
| if (table == null) |
| { |
| fail("Unknown tableId: " + tableId); |
| return; |
| } |
| |
| if (!PaxosRepair.validatePeerCompatibility(table, ranges)) |
| { |
| fail("Unsupported peer versions for " + tableId + ' ' + ranges.toString()); |
| return; |
| } |
| |
| logger.info("Completing uncommitted paxos instances for {} on ranges {} for session {}", table, ranges, session); |
| |
| scheduleKeyRepairsOrFinish(); |
| } |
| |
| @SuppressWarnings("resource") |
| public static PaxosCleanupLocalCoordinator create(PaxosCleanupRequest request) |
| { |
| CloseableIterator<UncommittedPaxosKey> iterator = PaxosState.uncommittedTracker().uncommittedKeyIterator(request.tableId, request.ranges); |
| return new PaxosCleanupLocalCoordinator(request.session, request.tableId, request.ranges, iterator); |
| } |
| |
| @SuppressWarnings("resource") |
| public static PaxosCleanupLocalCoordinator createForAutoRepair(TableId tableId, Collection<Range<Token>> ranges) |
| { |
| CloseableIterator<UncommittedPaxosKey> iterator = PaxosState.uncommittedTracker().uncommittedKeyIterator(tableId, ranges); |
| return new PaxosCleanupLocalCoordinator(INTERNAL_SESSION, tableId, ranges, iterator); |
| } |
| |
| /** |
| * Schedule as many key repairs as we can, up to the paralellism limit. If no repairs are scheduled and |
| * none are in flight when the iterator is exhausted, the session will be finished |
| */ |
| private void scheduleKeyRepairsOrFinish() |
| { |
| int parallelism = DatabaseDescriptor.getPaxosRepairParallelism(); |
| Preconditions.checkArgument(parallelism > 0); |
| if (inflight.size() < parallelism) |
| { |
| if (nanoTime() - deadline >= 0) |
| { |
| fail("timeout"); |
| return; |
| } |
| |
| while (inflight.size() < parallelism && uncommittedIter.hasNext()) |
| repairKey(uncommittedIter.next()); |
| |
| } |
| |
| if (inflight.isEmpty()) |
| finish(); |
| } |
| |
| private boolean repairKey(UncommittedPaxosKey uncommitted) |
| { |
| logger.trace("repairing {}", uncommitted); |
| Preconditions.checkState(!inflight.containsKey(uncommitted.getKey())); |
| ConsistencyLevel consistency = uncommitted.getConsistencyLevel(); |
| |
| // we don't know the consistency of this operation, presumably because it originated |
| // before we started tracking paxos cl, so we don't attempt to repair it |
| if (consistency == null) |
| return false; |
| |
| inflight.put(uncommitted.getKey(), tableRepairs.startOrGetOrQueue(uncommitted.getKey(), uncommitted.ballot(), uncommitted.getConsistencyLevel(), table, result -> { |
| if (result.wasSuccessful()) |
| onKeyFinish(uncommitted.getKey()); |
| else |
| onKeyFailure(result.toString()); |
| })); |
| return true; |
| } |
| |
| private synchronized void onKeyFinish(DecoratedKey key) |
| { |
| if (!inflight.containsKey(key)) |
| return; |
| logger.trace("finished repairing {}", key); |
| inflight.remove(key); |
| count++; |
| |
| scheduleKeyRepairsOrFinish(); |
| } |
| |
| private void complete(PaxosCleanupResponse response) |
| { |
| uncommittedIter.close(); |
| trySuccess(response); |
| } |
| |
| private void onKeyFailure(String reason) |
| { |
| // not synchronized to avoid deadlock with callback we register on start |
| inflight.values().forEach(AbstractPaxosRepair::cancel); |
| fail(reason); |
| } |
| |
| private synchronized void fail(String reason) |
| { |
| logger.info("Failing paxos cleanup session {} for {} on ranges {}. Reason: {}", session, table, ranges, reason); |
| complete(PaxosCleanupResponse.failed(session, reason)); |
| } |
| |
| private void finish() |
| { |
| logger.info("Completed {} uncommitted paxos instances for {} on ranges {} for session {}", count, table, ranges, session); |
| complete(PaxosCleanupResponse.success(session)); |
| } |
| } |