blob: 690762339773d0561578cb169758912b09334545 [file] [log] [blame]
/*
* Licensed to the Apache Software Foundation (ASF) under one or more
* contributor license agreements. See the NOTICE file distributed with
* this work for additional information regarding copyright ownership.
* The ASF licenses this file to You under the Apache License, Version 2.0
* (the "License"); you may not use this file except in compliance with
* the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/
package org.apache.ignite.internal.table.distributed.disaster;
import static java.util.Collections.emptyList;
import static java.util.concurrent.CompletableFuture.allOf;
import static java.util.stream.Collectors.groupingBy;
import static java.util.stream.Collectors.toList;
import static java.util.stream.Collectors.toMap;
import static java.util.stream.Collectors.toSet;
import static org.apache.ignite.internal.table.distributed.disaster.GlobalPartitionStateEnum.AVAILABLE;
import static org.apache.ignite.internal.table.distributed.disaster.GlobalPartitionStateEnum.DEGRADED;
import static org.apache.ignite.internal.table.distributed.disaster.GlobalPartitionStateEnum.READ_ONLY;
import static org.apache.ignite.internal.table.distributed.disaster.LocalPartitionStateEnum.BROKEN;
import static org.apache.ignite.internal.table.distributed.disaster.LocalPartitionStateEnum.CATCHING_UP;
import static org.apache.ignite.internal.table.distributed.disaster.LocalPartitionStateEnum.HEALTHY;
import static org.apache.ignite.internal.table.distributed.disaster.LocalPartitionStateEnum.INITIALIZING;
import static org.apache.ignite.internal.table.distributed.disaster.LocalPartitionStateEnum.INSTALLING_SNAPSHOT;
import static org.apache.ignite.internal.util.CompletableFutures.nullCompletedFuture;
import static org.apache.ignite.lang.ErrorGroups.DisasterRecovery.PARTITION_STATE_ERR;
import java.util.ArrayList;
import java.util.Collection;
import java.util.HashMap;
import java.util.List;
import java.util.Map;
import java.util.Set;
import java.util.UUID;
import java.util.concurrent.CompletableFuture;
import java.util.concurrent.ConcurrentHashMap;
import java.util.concurrent.ExecutorService;
import java.util.concurrent.TimeUnit;
import org.apache.ignite.internal.affinity.Assignments;
import org.apache.ignite.internal.catalog.Catalog;
import org.apache.ignite.internal.catalog.CatalogManager;
import org.apache.ignite.internal.catalog.descriptors.CatalogObjectDescriptor;
import org.apache.ignite.internal.catalog.descriptors.CatalogTableDescriptor;
import org.apache.ignite.internal.catalog.descriptors.CatalogZoneDescriptor;
import org.apache.ignite.internal.distributionzones.DistributionZoneManager;
import org.apache.ignite.internal.distributionzones.NodeWithAttributes;
import org.apache.ignite.internal.lang.ByteArray;
import org.apache.ignite.internal.lang.NodeStoppingException;
import org.apache.ignite.internal.logger.IgniteLogger;
import org.apache.ignite.internal.logger.Loggers;
import org.apache.ignite.internal.manager.IgniteComponent;
import org.apache.ignite.internal.metastorage.Entry;
import org.apache.ignite.internal.metastorage.MetaStorageManager;
import org.apache.ignite.internal.metastorage.WatchEvent;
import org.apache.ignite.internal.metastorage.WatchListener;
import org.apache.ignite.internal.network.MessagingService;
import org.apache.ignite.internal.network.NetworkMessage;
import org.apache.ignite.internal.raft.Loza;
import org.apache.ignite.internal.replicator.TablePartitionId;
import org.apache.ignite.internal.table.distributed.TableMessageGroup;
import org.apache.ignite.internal.table.distributed.TableMessagesFactory;
import org.apache.ignite.internal.table.distributed.disaster.exceptions.DisasterRecoveryException;
import org.apache.ignite.internal.table.distributed.disaster.exceptions.NodesNotFoundException;
import org.apache.ignite.internal.table.distributed.disaster.exceptions.PartitionsNotFoundException;
import org.apache.ignite.internal.table.distributed.disaster.exceptions.ZonesNotFoundException;
import org.apache.ignite.internal.table.distributed.disaster.messages.LocalPartitionStateMessage;
import org.apache.ignite.internal.table.distributed.disaster.messages.LocalPartitionStatesRequest;
import org.apache.ignite.internal.table.distributed.disaster.messages.LocalPartitionStatesResponse;
import org.apache.ignite.internal.util.ByteUtils;
import org.apache.ignite.internal.util.CollectionUtils;
import org.apache.ignite.network.ClusterNode;
import org.apache.ignite.raft.jraft.Node;
import org.apache.ignite.raft.jraft.core.State;
import org.jetbrains.annotations.Nullable;
/**
* Manager, responsible for "disaster recovery" operations.
* Internally it triggers meta-storage updates, in order to acquire unique causality token.
* As a reaction to these updates, manager performs actual recovery operations, such as {@link #resetPartitions(int, int)}.
* More details are in the <a href="https://issues.apache.org/jira/browse/IGNITE-21140">epic</a>.
*/
public class DisasterRecoveryManager implements IgniteComponent {
/** Logger. */
private static final IgniteLogger LOG = Loggers.forClass(DisasterRecoveryManager.class);
/** Single key for writing disaster recovery requests into meta-storage. */
static final ByteArray RECOVERY_TRIGGER_KEY = new ByteArray("disaster.recovery.trigger");
private static final TableMessagesFactory MSG_FACTORY = new TableMessagesFactory();
/** Disaster recovery operations timeout in seconds. */
private static final int TIMEOUT_SECONDS = 30;
/**
* Maximal allowed difference between committed index on the leader and on the follower, that differentiates
* {@link LocalPartitionStateEnum#HEALTHY} from {@link LocalPartitionStateEnum#CATCHING_UP}.
*/
private static final int CATCH_UP_THRESHOLD = 100;
/** Thread pool executor for async parts. */
private final ExecutorService threadPool;
/** Messaging service. */
private final MessagingService messagingService;
/** Meta-storage manager. */
final MetaStorageManager metaStorageManager;
/** Catalog manager. */
final CatalogManager catalogManager;
/** Distribution zone manager. */
final DistributionZoneManager dzManager;
/** Raft manager. */
private final Loza raftManager;
/** Watch listener for {@link #RECOVERY_TRIGGER_KEY}. */
private final WatchListener watchListener;
/**
* Map of operations, triggered by local node, that have not yet been processed by {@link #watchListener}. Values in the map are the
* futures, returned from the {@link #processNewRequest(ManualGroupUpdateRequest)}, they are completed by
* {@link #handleTriggerKeyUpdate(WatchEvent)} when node receives corresponding events from the metastorage (or if it doesn't receive
* this event within a 30 seconds window).
*/
private final Map<UUID, CompletableFuture<Void>> ongoingOperationsById = new ConcurrentHashMap<>();
/**
* Constructor.
*/
public DisasterRecoveryManager(
ExecutorService threadPool,
MessagingService messagingService,
MetaStorageManager metaStorageManager,
CatalogManager catalogManager,
DistributionZoneManager dzManager,
Loza raftManager
) {
this.threadPool = threadPool;
this.messagingService = messagingService;
this.metaStorageManager = metaStorageManager;
this.catalogManager = catalogManager;
this.dzManager = dzManager;
this.raftManager = raftManager;
watchListener = new WatchListener() {
@Override
public CompletableFuture<Void> onUpdate(WatchEvent event) {
return handleTriggerKeyUpdate(event);
}
@Override
public void onError(Throwable e) {
// No-op.
}
};
}
@Override
public CompletableFuture<Void> startAsync() {
messagingService.addMessageHandler(TableMessageGroup.class, this::handleMessage);
metaStorageManager.registerExactWatch(RECOVERY_TRIGGER_KEY, watchListener);
return nullCompletedFuture();
}
@Override
public CompletableFuture<Void> stopAsync() {
metaStorageManager.unregisterWatch(watchListener);
for (CompletableFuture<Void> future : ongoingOperationsById.values()) {
future.completeExceptionally(new NodeStoppingException());
}
return nullCompletedFuture();
}
/**
* Updates assignments of the table in a forced manner, allowing for the recovery of raft group with lost majorities. It is achieved via
* triggering a new rebalance with {@code force} flag enabled in {@link Assignments} for partitions where it's required. New pending
* assignments with {@code force} flag remove old stable nodes from the distribution, and force new Raft configuration via "resetPeers"
* so that a new leader could be elected.
*
* @param zoneId Distribution zone ID.
* @param tableId Table ID.
* @return Operation future.
*/
public CompletableFuture<Void> resetPartitions(int zoneId, int tableId) {
return processNewRequest(new ManualGroupUpdateRequest(UUID.randomUUID(), zoneId, tableId));
}
/**
* Returns states of partitions in the cluster. Result is a mapping of {@link TablePartitionId} to the mapping
* between a node name and a partition state.
*
* @param zoneNames Names specifying zones to get partition states from. Case-sensitive, empty set means "all zones".
* @param nodeNames Names specifying nodes to get partition states from. Case-sensitive, empty set means "all nodes".
* @param partitionIds IDs of partitions to get states of. Empty set means "all partitions".
* @return Future with the mapping.
*/
public CompletableFuture<Map<TablePartitionId, LocalPartitionStateByNode>> localPartitionStates(
Set<String> zoneNames,
Set<String> nodeNames,
Set<Integer> partitionIds
) {
Catalog catalog = catalogManager.catalog(catalogManager.latestCatalogVersion());
return localPartitionStatesInternal(zoneNames, nodeNames, partitionIds, catalog)
.thenApply(res -> normalizeLocal(res, catalog));
}
/**
* Returns states of partitions in the cluster. Result is a mapping of {@link TablePartitionId} to the global
* partition state enum value.
*
* @param zoneNames Names specifying zones to get partition states. Case-sensitive, empty set means "all zones".
* @param partitionIds IDs of partitions to get states of. Empty set means "all partitions".
* @return Future with the mapping.
*/
public CompletableFuture<Map<TablePartitionId, GlobalPartitionState>> globalPartitionStates(
Set<String> zoneNames,
Set<Integer> partitionIds
) {
Catalog catalog = catalogManager.catalog(catalogManager.latestCatalogVersion());
return localPartitionStatesInternal(zoneNames, Set.of(), partitionIds, catalog)
.thenApply(res -> normalizeLocal(res, catalog))
.thenApply(res -> assembleGlobal(res, catalog));
}
private CompletableFuture<Map<TablePartitionId, LocalPartitionStateMessageByNode>> localPartitionStatesInternal(
Set<String> zoneNames,
Set<String> nodeNames,
Set<Integer> partitionIds,
Catalog catalog
) {
Set<Integer> zoneIds = getZoneIds(zoneNames, catalog);
Set<NodeWithAttributes> nodes = getNodes(nodeNames);
LocalPartitionStatesRequest localPartitionStatesRequest = MSG_FACTORY.localPartitionStatesRequest()
.zoneIds(zoneIds)
.partitionIds(partitionIds)
.catalogVersion(catalog.version())
.build();
Map<TablePartitionId, LocalPartitionStateMessageByNode> result = new ConcurrentHashMap<>();
CompletableFuture<?>[] futures = new CompletableFuture[nodes.size()];
int i = 0;
for (NodeWithAttributes node : nodes) {
CompletableFuture<NetworkMessage> invokeFuture = messagingService.invoke(
node.nodeName(),
localPartitionStatesRequest,
TimeUnit.SECONDS.toMillis(TIMEOUT_SECONDS)
);
futures[i++] = invokeFuture.thenAccept(networkMessage -> {
assert networkMessage instanceof LocalPartitionStatesResponse : networkMessage;
var response = (LocalPartitionStatesResponse) networkMessage;
for (LocalPartitionStateMessage state : response.states()) {
result.compute(state.partitionId().asTablePartitionId(), (tablePartitionId, messageByNode) -> {
if (messageByNode == null) {
return new LocalPartitionStateMessageByNode(Map.of(node.nodeName(), state));
}
messageByNode = new LocalPartitionStateMessageByNode(messageByNode);
messageByNode.put(node.nodeName(), state);
return messageByNode;
});
}
});
}
return allOf(futures).handle((unused, err) -> {
if (err != null) {
throw new DisasterRecoveryException(PARTITION_STATE_ERR, err);
}
if (!partitionIds.isEmpty()) {
Set<Integer> foundPartitionIds = result.keySet().stream()
.map(TablePartitionId::partitionId)
.collect(toSet());
checkPartitions(foundPartitionIds, partitionIds);
}
return result;
});
}
private Set<NodeWithAttributes> getNodes(Set<String> nodeNames) throws NodesNotFoundException {
if (nodeNames.isEmpty()) {
return dzManager.logicalTopology();
}
Set<NodeWithAttributes> nodes = dzManager.logicalTopology().stream()
.filter(node -> nodeNames.contains(node.nodeName()))
.collect(toSet());
Set<String> foundNodeNames = nodes.stream()
.map(NodeWithAttributes::nodeName)
.collect(toSet());
if (!nodeNames.equals(foundNodeNames)) {
Set<String> missingNodeNames = CollectionUtils.difference(nodeNames, foundNodeNames);
throw new NodesNotFoundException(missingNodeNames);
}
return nodes;
}
private static Set<Integer> getZoneIds(Set<String> zoneNames, Catalog catalog) throws ZonesNotFoundException {
if (zoneNames.isEmpty()) {
return Set.of();
}
List<CatalogZoneDescriptor> zoneDescriptors = catalog.zones().stream()
.filter(catalogZoneDescriptor -> zoneNames.contains(catalogZoneDescriptor.name()))
.collect(toList());
Set<String> foundZoneNames = zoneDescriptors.stream()
.map(CatalogObjectDescriptor::name)
.collect(toSet());
if (!zoneNames.equals(foundZoneNames)) {
Set<String> missingZoneNames = CollectionUtils.difference(zoneNames, foundZoneNames);
throw new ZonesNotFoundException(missingZoneNames);
}
return zoneDescriptors.stream()
.map(CatalogObjectDescriptor::id)
.collect(toSet());
}
/**
* Creates new operation future, associated with the request, and writes it into meta-storage.
*
* @param request Request.
* @return Operation future.
*/
private CompletableFuture<Void> processNewRequest(ManualGroupUpdateRequest request) {
UUID operationId = request.operationId();
CompletableFuture<Void> operationFuture = new CompletableFuture<Void>()
.whenComplete((v, throwable) -> ongoingOperationsById.remove(operationId))
.orTimeout(TIMEOUT_SECONDS, TimeUnit.SECONDS);
ongoingOperationsById.put(operationId, operationFuture);
metaStorageManager.put(RECOVERY_TRIGGER_KEY, ByteUtils.toBytes(request));
return operationFuture;
}
/**
* Handler for {@link #RECOVERY_TRIGGER_KEY} update event. Deserializes the request and delegates the execution to
* {@link DisasterRecoveryRequest#handle(DisasterRecoveryManager, long, CompletableFuture)}.
*/
private CompletableFuture<Void> handleTriggerKeyUpdate(WatchEvent watchEvent) {
Entry newEntry = watchEvent.entryEvent().newEntry();
byte[] requestBytes = newEntry.value();
assert requestBytes != null;
DisasterRecoveryRequest request;
try {
request = ByteUtils.fromBytes(requestBytes);
} catch (Exception e) {
LOG.warn("Unable to deserialize disaster recovery request.", e);
return nullCompletedFuture();
}
CompletableFuture<Void> operationFuture = ongoingOperationsById.remove(request.operationId());
if (operationFuture == null) {
// We're not the initiator, or timeout has passed. Just ignore it.
return nullCompletedFuture();
}
return request.handle(this, watchEvent.revision(), operationFuture);
}
private void handleMessage(NetworkMessage message, ClusterNode sender, @Nullable Long correlationId) {
if (message instanceof LocalPartitionStatesRequest) {
handleLocalPartitionStatesRequest((LocalPartitionStatesRequest) message, sender, correlationId);
}
}
private void handleLocalPartitionStatesRequest(LocalPartitionStatesRequest request, ClusterNode sender, @Nullable Long correlationId) {
assert correlationId != null;
int catalogVersion = request.catalogVersion();
catalogManager.catalogReadyFuture(catalogVersion).thenRunAsync(() -> {
List<LocalPartitionStateMessage> statesList = new ArrayList<>();
raftManager.forEach((raftNodeId, raftGroupService) -> {
if (raftNodeId.groupId() instanceof TablePartitionId) {
var tablePartitionId = (TablePartitionId) raftNodeId.groupId();
if (!containsOrEmpty(tablePartitionId.partitionId(), request.partitionIds())) {
return;
}
CatalogTableDescriptor tableDescriptor = catalogManager.table(tablePartitionId.tableId(), catalogVersion);
// Only tables that belong to a specific catalog version will be returned.
if (tableDescriptor == null || !containsOrEmpty(tableDescriptor.zoneId(), request.zoneIds())
) {
return;
}
Node raftNode = raftGroupService.getRaftNode();
State nodeState = raftNode.getNodeState();
LocalPartitionStateEnum localState = convertState(nodeState);
long lastLogIndex = raftNode.lastLogIndex();
if (localState == HEALTHY) {
// Node without log didn't process anything yet, it's not really "healthy" before it accepts leader's configuration.
if (lastLogIndex == 0) {
localState = INITIALIZING;
}
if (raftNode.isInstallingSnapshot()) {
localState = INSTALLING_SNAPSHOT;
}
}
statesList.add(MSG_FACTORY.localPartitionStateMessage()
.partitionId(MSG_FACTORY.tablePartitionIdMessage()
.tableId(tablePartitionId.tableId())
.partitionId(tablePartitionId.partitionId())
.build())
.state(localState)
.logIndex(lastLogIndex)
.build()
);
}
});
LocalPartitionStatesResponse response = MSG_FACTORY.localPartitionStatesResponse().states(statesList).build();
messagingService.respond(sender, response, correlationId);
}, threadPool);
}
private static <T> boolean containsOrEmpty(T item, Collection<T> collection) {
return collection.isEmpty() || collection.contains(item);
}
/**
* Converts internal raft node state into public local partition state.
*/
private static LocalPartitionStateEnum convertState(State nodeState) {
switch (nodeState) {
case STATE_LEADER:
case STATE_TRANSFERRING:
case STATE_CANDIDATE:
case STATE_FOLLOWER:
return HEALTHY;
case STATE_ERROR:
return BROKEN;
case STATE_UNINITIALIZED:
return INITIALIZING;
case STATE_SHUTTING:
case STATE_SHUTDOWN:
case STATE_END:
return LocalPartitionStateEnum.UNAVAILABLE;
default:
// Unrecognized state, better safe than sorry.
return BROKEN;
}
}
/**
* Checks that resulting states contain all partitions IDs from the request.
*
* @param foundPartitionIds Found partition IDs.
* @param requestedPartitionIds Requested partition IDs.
* @throws PartitionsNotFoundException if some IDs are missing.
*/
private static void checkPartitions(
Set<Integer> foundPartitionIds,
Set<Integer> requestedPartitionIds
) throws PartitionsNotFoundException {
if (!requestedPartitionIds.equals(foundPartitionIds)) {
Set<Integer> missingPartitionIds = CollectionUtils.difference(requestedPartitionIds, foundPartitionIds);
throw new PartitionsNotFoundException(missingPartitionIds);
}
}
/**
* Replaces some healthy states with a {@link LocalPartitionStateEnum#CATCHING_UP}, it can only be done once the state of all peers is
* known.
*/
private static Map<TablePartitionId, LocalPartitionStateByNode> normalizeLocal(
Map<TablePartitionId, LocalPartitionStateMessageByNode> result,
Catalog catalog
) {
Map<TablePartitionId, LocalPartitionStateByNode> map = new HashMap<>();
for (Map.Entry<TablePartitionId, LocalPartitionStateMessageByNode> entry : result.entrySet()) {
TablePartitionId tablePartitionId = entry.getKey();
LocalPartitionStateMessageByNode messageByNode = entry.getValue();
// noinspection OptionalGetWithoutIsPresent
long maxLogIndex = messageByNode.values().stream()
.mapToLong(LocalPartitionStateMessage::logIndex)
.max()
.getAsLong();
Map<String, LocalPartitionState> nodeToStateMap = messageByNode.entrySet().stream()
.collect(toMap(Map.Entry::getKey, nodeToState ->
toLocalPartitionState(nodeToState, maxLogIndex, tablePartitionId, catalog))
);
map.put(tablePartitionId, new LocalPartitionStateByNode(nodeToStateMap));
}
return map;
}
private static LocalPartitionState toLocalPartitionState(
Map.Entry<String, LocalPartitionStateMessage> nodeToMessage,
long maxLogIndex,
TablePartitionId tablePartitionId,
Catalog catalog
) {
LocalPartitionStateMessage stateMsg = nodeToMessage.getValue();
LocalPartitionStateEnum stateEnum = stateMsg.state();
if (stateMsg.state() == HEALTHY && maxLogIndex - stateMsg.logIndex() >= CATCH_UP_THRESHOLD) {
stateEnum = CATCHING_UP;
}
// Tables, returned from local states request, are always present in the required version of the catalog.
CatalogTableDescriptor tableDescriptor = catalog.table(tablePartitionId.tableId());
String zoneName = catalog.zone(tableDescriptor.zoneId()).name();
return new LocalPartitionState(tableDescriptor.name(), zoneName, tablePartitionId.partitionId(), stateEnum);
}
private static Map<TablePartitionId, GlobalPartitionState> assembleGlobal(
Map<TablePartitionId, LocalPartitionStateByNode> localResult,
Catalog catalog
) {
Map<TablePartitionId, GlobalPartitionState> result = localResult.entrySet().stream()
.collect(toMap(Map.Entry::getKey, entry -> {
TablePartitionId tablePartitionId = entry.getKey();
LocalPartitionStateByNode map = entry.getValue();
return assembleGlobalStateFromLocal(catalog, tablePartitionId, map);
}));
localResult.keySet().stream()
.map(TablePartitionId::tableId)
.distinct()
.forEach(tableId -> {
int zoneId = catalog.table(tableId).zoneId();
CatalogZoneDescriptor zoneDescriptor = catalog.zone(zoneId);
int partitions = zoneDescriptor.partitions();
// Make missing partitions explicitly unavailable.
for (int partitionId = 0; partitionId < partitions; partitionId++) {
TablePartitionId tablePartitionId = new TablePartitionId(tableId, partitionId);
result.computeIfAbsent(tablePartitionId, key ->
new GlobalPartitionState(catalog.table(key.tableId()).name(), zoneDescriptor.name(), key.partitionId(),
GlobalPartitionStateEnum.UNAVAILABLE)
);
}
});
return result;
}
private static GlobalPartitionState assembleGlobalStateFromLocal(
Catalog catalog,
TablePartitionId tablePartitionId,
LocalPartitionStateByNode map
) {
// Tables, returned from local states request, are always present in the required version of the catalog.
int zoneId = catalog.table(tablePartitionId.tableId()).zoneId();
CatalogZoneDescriptor zoneDescriptor = catalog.zone(zoneId);
int replicas = zoneDescriptor.replicas();
int quorum = replicas / 2 + 1;
Map<LocalPartitionStateEnum, List<LocalPartitionState>> groupedStates = map.values().stream()
.collect(groupingBy(localPartitionState -> localPartitionState.state));
GlobalPartitionStateEnum globalStateEnum;
int healthyReplicas = groupedStates.getOrDefault(HEALTHY, emptyList()).size();
if (healthyReplicas == replicas) {
globalStateEnum = AVAILABLE;
} else if (healthyReplicas >= quorum) {
globalStateEnum = DEGRADED;
} else if (healthyReplicas > 0) {
globalStateEnum = READ_ONLY;
} else {
globalStateEnum = GlobalPartitionStateEnum.UNAVAILABLE;
}
LocalPartitionState anyLocalState = map.values().iterator().next();
return new GlobalPartitionState(anyLocalState.tableName, zoneDescriptor.name(), tablePartitionId.partitionId(), globalStateEnum);
}
}