blob: 52beab9d7d450003f8208f2b9aa513c3ac390dfe [file] [log] [blame]
/*
* Licensed to the Apache Software Foundation (ASF) under one or more
* contributor license agreements. See the NOTICE file distributed with
* this work for additional information regarding copyright ownership.
* The ASF licenses this file to You under the Apache License, Version 2.0
* (the "License"); you may not use this file except in compliance with
* the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/
package org.apache.ignite.internal.tx.impl;
import static org.apache.ignite.internal.util.CompletableFutures.allOf;
import static org.apache.ignite.internal.util.CompletableFutures.nullCompletedFuture;
import static org.apache.ignite.internal.util.ExceptionUtils.unwrapCause;
import java.util.ArrayList;
import java.util.List;
import java.util.Map;
import java.util.Set;
import java.util.UUID;
import java.util.concurrent.CompletableFuture;
import java.util.concurrent.ConcurrentHashMap;
import org.apache.ignite.internal.hlc.ClockService;
import org.apache.ignite.internal.hlc.HybridTimestamp;
import org.apache.ignite.internal.logger.IgniteLogger;
import org.apache.ignite.internal.logger.Loggers;
import org.apache.ignite.internal.placementdriver.PlacementDriver;
import org.apache.ignite.internal.replicator.ReplicaService;
import org.apache.ignite.internal.replicator.TablePartitionId;
import org.apache.ignite.internal.replicator.exception.PrimaryReplicaMissException;
import org.apache.ignite.internal.tx.message.TxMessagesFactory;
import org.apache.ignite.internal.tx.message.VacuumTxStateReplicaRequest;
import org.apache.ignite.network.ClusterNode;
/**
* Implements the logic of persistent tx states vacuum.
*/
public class PersistentTxStateVacuumizer {
private static final IgniteLogger LOG = Loggers.forClass(PersistentTxStateVacuumizer.class);
private static final TxMessagesFactory TX_MESSAGES_FACTORY = new TxMessagesFactory();
private final ReplicaService replicaService;
private final ClusterNode localNode;
private final ClockService clockService;
private final PlacementDriver placementDriver;
/**
* Constructor.
*
* @param replicaService Replica service.
* @param localNode Local node.
* @param clockService Clock service.
* @param placementDriver Placement driver.
*/
public PersistentTxStateVacuumizer(
ReplicaService replicaService,
ClusterNode localNode,
ClockService clockService,
PlacementDriver placementDriver
) {
this.replicaService = replicaService;
this.localNode = localNode;
this.clockService = clockService;
this.placementDriver = placementDriver;
}
/**
* Vacuum persistent tx states.
*
* @param txIds Transaction ids to vacuum; map of commit partition ids to sets of tx ids.
* @return A future, result is the set of successfully vacuumized txn states.
*/
public CompletableFuture<Set<UUID>> vacuumPersistentTxStates(Map<TablePartitionId, Set<UUID>> txIds) {
Set<UUID> successful = ConcurrentHashMap.newKeySet();
List<CompletableFuture<?>> futures = new ArrayList<>();
HybridTimestamp now = clockService.now();
txIds.forEach((commitPartitionId, txs) -> {
CompletableFuture<?> future = placementDriver.getPrimaryReplica(commitPartitionId, now)
.thenCompose(replicaMeta -> {
// If the primary replica is absent or is not located on the local node, this means that the primary either is
// on another node or would be re-elected on local one; then the volatile state (as well as cleanup completion
// timestamp) would be updated there, and then this operation would be called from there.
// Also, we are going to send the vacuum request only to the local node.
if (replicaMeta != null && localNode.id().equals(replicaMeta.getLeaseholderId())) {
VacuumTxStateReplicaRequest request = TX_MESSAGES_FACTORY.vacuumTxStateReplicaRequest()
.enlistmentConsistencyToken(replicaMeta.getStartTime().longValue())
.groupId(commitPartitionId)
.transactionIds(txs)
.build();
return replicaService.invoke(localNode, request).whenComplete((v, e) -> {
if (e == null) {
successful.addAll(txs);
// We can log the exceptions without further handling because failed requests' txns are not added
// to the set of successful and will be retried. PrimaryReplicaMissException can be considered as
// a part of regular flow and doesn't need to be logged.
} else if (unwrapCause(e) instanceof PrimaryReplicaMissException) {
LOG.debug("Failed to vacuum tx states from the persistent storage.", e);
} else {
LOG.warn("Failed to vacuum tx states from the persistent storage.", e);
}
});
} else {
successful.addAll(txs);
return nullCompletedFuture();
}
});
futures.add(future);
});
return allOf(futures.toArray(new CompletableFuture[0]))
.handle((unused, unusedEx) -> successful);
}
}