blob: be09e455419bdd5ef5edf57b7ef2d1b288643664 [file] [log] [blame]
/**
* Licensed to the Apache Software Foundation (ASF) under one
* or more contributor license agreements. See the NOTICE file
* distributed with this work for additional information
* regarding copyright ownership. The ASF licenses this file
* to you under the Apache License, Version 2.0 (the
* "License"); you may not use this file except in compliance
* with the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/
package org.apache.hadoop.hdds.scm.container.replication;
import com.google.common.collect.ImmutableList;
import org.apache.hadoop.hdds.client.ECReplicationConfig;
import org.apache.hadoop.hdds.conf.ConfigurationSource;
import org.apache.hadoop.hdds.conf.StorageUnit;
import org.apache.hadoop.hdds.protocol.DatanodeDetails;
import org.apache.hadoop.hdds.protocol.proto.StorageContainerDatanodeProtocolProtos;
import org.apache.hadoop.hdds.scm.PlacementPolicy;
import org.apache.hadoop.hdds.scm.ScmConfigKeys;
import org.apache.hadoop.hdds.scm.container.ContainerID;
import org.apache.hadoop.hdds.scm.container.ContainerInfo;
import org.apache.hadoop.hdds.scm.container.ContainerReplica;
import org.apache.hadoop.hdds.scm.node.NodeManager;
import org.apache.hadoop.ozone.protocol.commands.ReconstructECContainersCommand;
import org.apache.hadoop.ozone.protocol.commands.ReplicateContainerCommand;
import org.apache.hadoop.ozone.protocol.commands.SCMCommand;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
import java.io.IOException;
import java.util.ArrayList;
import java.util.HashMap;
import java.util.Iterator;
import java.util.List;
import java.util.Map;
import java.util.Set;
import java.util.stream.Collectors;
import static java.util.Collections.emptyMap;
/**
* Handles the EC Under replication processing and forming the respective SCM
* commands.
*/
public class ECUnderReplicationHandler implements UnhealthyReplicationHandler {
public static final Logger LOG =
LoggerFactory.getLogger(ECUnderReplicationHandler.class);
private final ECContainerHealthCheck ecContainerHealthCheck =
new ECContainerHealthCheck();
private final PlacementPolicy containerPlacement;
private final long currentContainerSize;
private final NodeManager nodeManager;
public ECUnderReplicationHandler(
final PlacementPolicy containerPlacement, final ConfigurationSource conf,
NodeManager nodeManager) {
this.containerPlacement = containerPlacement;
this.currentContainerSize = (long) conf
.getStorageSize(ScmConfigKeys.OZONE_SCM_CONTAINER_SIZE,
ScmConfigKeys.OZONE_SCM_CONTAINER_SIZE_DEFAULT, StorageUnit.BYTES);
this.nodeManager = nodeManager;
}
/**
* Identify a new set of datanode(s) to reconstruct the container and form the
* SCM command to send it to DN. In the case of decommission, it will just
* generate the replicate commands instead of reconstruction commands.
*
* @param replicas - Set of available container replicas.
* @param pendingOps - Inflight replications and deletion ops.
* @param result - Health check result.
* @param remainingMaintenanceRedundancy - represents that how many nodes go
* into maintenance.
* @return Returns the key value pair of destination dn where the command gets
* executed and the command itself. If an empty list is returned, it indicates
* the container is no longer unhealthy and can be removed from the unhealthy
* queue. Any exception indicates that the container is still unhealthy and
* should be retried later.
*/
@Override
public Map<DatanodeDetails, SCMCommand<?>> processAndCreateCommands(
final Set<ContainerReplica> replicas,
final List<ContainerReplicaOp> pendingOps,
final ContainerHealthResult result,
final int remainingMaintenanceRedundancy) throws IOException {
ContainerInfo container = result.getContainerInfo();
ECReplicationConfig repConfig =
(ECReplicationConfig) container.getReplicationConfig();
final ECContainerReplicaCount replicaCount =
new ECContainerReplicaCount(container, replicas, pendingOps,
remainingMaintenanceRedundancy);
ContainerHealthResult currentUnderRepRes = ecContainerHealthCheck
.checkHealth(container, replicas, pendingOps,
remainingMaintenanceRedundancy);
LOG.debug("Handling under-replicated EC container: {}", container);
if (currentUnderRepRes
.getHealthState() != ContainerHealthResult.HealthState
.UNDER_REPLICATED) {
LOG.info("The container {} state changed and it's not in under"
+ " replication any more. Current state is: {}",
container.getContainerID(), currentUnderRepRes);
return emptyMap();
}
List<DatanodeDetails> excludedNodes = replicas.stream()
.map(ContainerReplica::getDatanodeDetails)
.collect(Collectors.toList());
ContainerHealthResult.UnderReplicatedHealthResult containerHealthResult =
((ContainerHealthResult.UnderReplicatedHealthResult)
currentUnderRepRes);
if (containerHealthResult.isSufficientlyReplicatedAfterPending()) {
LOG.info("The container {} with replicas {} is sufficiently replicated",
container.getContainerID(), replicaCount.getReplicas());
return emptyMap();
}
if (replicaCount.isUnrecoverable()) {
LOG.warn("The container {} is unrecoverable. The available replicas" +
" are: {}.", container.containerID(), replicaCount.getReplicas());
return emptyMap();
}
final ContainerID id = container.containerID();
final Map<DatanodeDetails, SCMCommand<?>> commands = new HashMap<>();
try {
// State is UNDER_REPLICATED
final List<DatanodeDetails> deletionInFlight = new ArrayList<>();
for (ContainerReplicaOp op : pendingOps) {
if (op.getOpType() == ContainerReplicaOp.PendingOpType.DELETE) {
deletionInFlight.add(op.getTarget());
}
}
List<Integer> missingIndexes = replicaCount.unavailableIndexes(true);
// We got the missing indexes, this is excluded any decommissioning
// indexes. Find the good source nodes.
if (missingIndexes.size() > 0) {
Map<Integer, ContainerReplica> sources = replicas.stream().filter(r -> r
.getState() == StorageContainerDatanodeProtocolProtos
.ContainerReplicaProto.State.CLOSED)
// Exclude stale and dead nodes. This is particularly important for
// maintenance nodes, as the replicas will remain present in the
// container manager, even when they go dead.
.filter(r -> ReplicationManager
.getNodeStatus(r.getDatanodeDetails(), nodeManager).isHealthy())
.filter(r -> !deletionInFlight.contains(r.getDatanodeDetails()))
.collect(
Collectors.toMap(ContainerReplica::getReplicaIndex, x -> x));
LOG.debug("Missing indexes detected for the container {}." +
" The missing indexes are {}", id, missingIndexes);
// We have source nodes.
if (sources.size() >= repConfig.getData()) {
final List<DatanodeDetails> selectedDatanodes = getTargetDatanodes(
excludedNodes, container, missingIndexes.size());
excludedNodes.addAll(selectedDatanodes);
List<ReconstructECContainersCommand.DatanodeDetailsAndReplicaIndex>
sourceDatanodesWithIndex = new ArrayList<>();
for (ContainerReplica srcReplica : sources.values()) {
sourceDatanodesWithIndex.add(
new ReconstructECContainersCommand
.DatanodeDetailsAndReplicaIndex(
srcReplica.getDatanodeDetails(),
srcReplica.getReplicaIndex()));
}
final ReconstructECContainersCommand reconstructionCommand =
new ReconstructECContainersCommand(id.getProtobuf().getId(),
sourceDatanodesWithIndex, selectedDatanodes,
int2byte(missingIndexes),
repConfig);
// Keeping the first target node as coordinator.
commands.put(selectedDatanodes.get(0), reconstructionCommand);
} else {
LOG.warn("Cannot proceed for EC container reconstruction for {}, due"
+ " to insufficient source replicas found. Number of source "
+ "replicas needed: {}. Number of available source replicas are:"
+ " {}. Available sources are: {}", container.containerID(),
repConfig.getData(), sources.size(), sources);
}
}
Set<Integer> decomIndexes = replicaCount.decommissioningOnlyIndexes(true);
if (decomIndexes.size() > 0) {
final List<DatanodeDetails> selectedDatanodes =
getTargetDatanodes(excludedNodes, container, decomIndexes.size());
excludedNodes.addAll(selectedDatanodes);
Iterator<DatanodeDetails> iterator = selectedDatanodes.iterator();
// In this case we need to do one to one copy.
for (ContainerReplica replica : replicas) {
if (decomIndexes.contains(replica.getReplicaIndex())) {
if (!iterator.hasNext()) {
LOG.warn("Couldn't find enough targets. Available source"
+ " nodes: {}, the target nodes: {}, excluded nodes: {} and"
+ " the decommission indexes: {}",
replicas, selectedDatanodes, excludedNodes, decomIndexes);
break;
}
DatanodeDetails decommissioningSrcNode
= replica.getDatanodeDetails();
final ReplicateContainerCommand replicateCommand =
new ReplicateContainerCommand(id.getProtobuf().getId(),
ImmutableList.of(decommissioningSrcNode));
// For EC containers, we need to track the replica index which is
// to be replicated, so add it to the command.
replicateCommand.setReplicaIndex(replica.getReplicaIndex());
DatanodeDetails target = iterator.next();
commands.put(target, replicateCommand);
}
}
}
} catch (IOException | IllegalStateException ex) {
LOG.warn("Exception while processing for creating the EC reconstruction" +
" container commands for {}.",
id, ex);
throw ex;
}
if (commands.size() == 0) {
LOG.warn("Container {} is under replicated, but no commands were " +
"created to correct it", id);
}
return commands;
}
private List<DatanodeDetails> getTargetDatanodes(
List<DatanodeDetails> excludedNodes, ContainerInfo container,
int requiredNodes) throws IOException {
// We should ensure that the target datanode has enough space
// for a complete container to be created, but since the container
// size may be changed smaller than origin, we should be defensive.
final long dataSizeRequired =
Math.max(container.getUsedBytes(), currentContainerSize);
return containerPlacement
.chooseDatanodes(excludedNodes, null, requiredNodes, 0,
dataSizeRequired);
}
private static byte[] int2byte(List<Integer> src) {
byte[] dst = new byte[src.size()];
for (int i = 0; i < src.size(); i++) {
dst[i] = src.get(i).byteValue();
}
return dst;
}
}