blob: f62afc8b777e0c21c76042806e74c7ef4b111d9f [file] [log] [blame]
/**
* Licensed to the Apache Software Foundation (ASF) under one
* or more contributor license agreements. See the NOTICE file
* distributed with this work for additional information
* regarding copyright ownership. The ASF licenses this file
* to you under the Apache License, Version 2.0 (the
* "License"); you may not use this file except in compliance
* with the License. You may obtain a copy of the License at
* <p>
* http://www.apache.org/licenses/LICENSE-2.0
* <p>
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/
package org.apache.hadoop.hdds.scm.container.replication;
import org.apache.hadoop.hdds.client.ECReplicationConfig;
import org.apache.hadoop.hdds.protocol.proto.HddsProtos;
import org.apache.hadoop.hdds.scm.container.ContainerInfo;
import org.apache.hadoop.hdds.scm.container.ContainerReplica;
import java.util.ArrayList;
import java.util.Collections;
import java.util.HashMap;
import java.util.HashSet;
import java.util.List;
import java.util.Map;
import java.util.Set;
import java.util.stream.Collectors;
import static org.apache.hadoop.hdds.protocol.proto.HddsProtos.NodeOperationalState.DECOMMISSIONED;
import static org.apache.hadoop.hdds.protocol.proto.HddsProtos.NodeOperationalState.DECOMMISSIONING;
import static org.apache.hadoop.hdds.protocol.proto.HddsProtos.NodeOperationalState.ENTERING_MAINTENANCE;
import static org.apache.hadoop.hdds.protocol.proto.HddsProtos.NodeOperationalState.IN_MAINTENANCE;
/**
* This class provides a set of methods to test for over / under replication of
* EC containers, taking into account decommission / maintenance nodes,
* pending replications, pending deletes and the existing replicas.
*
* The intention for this class, is to wrap the logic used to detect over and
* under replication to allow other areas to easily check the status of a
* container.
*
* For calculating under replication:
*
* * Assume that decommission replicas are already lost, as they
* will eventually go away.
* * Any pending deletes are treated as if they have deleted
* * Pending adds are ignored as they may fail to create.
*
* Similar for over replication:
*
* * Assume decommissioned replicas are already lost.
* * Pending delete replicas will complete
* * Pending adds are ignored as they may not complete.
* * Maintenance copies are not considered until they are back to IN_SERVICE
*/
public class ECContainerReplicaCount implements ContainerReplicaCount {
private final ContainerInfo containerInfo;
private final ECReplicationConfig repConfig;
private final List<Integer> pendingAdd;
private final List<Integer> pendingDelete;
private final int remainingMaintenanceRedundancy;
private final Map<Integer, Integer> healthyIndexes = new HashMap<>();
private final Map<Integer, Integer> decommissionIndexes = new HashMap<>();
private final Map<Integer, Integer> maintenanceIndexes = new HashMap<>();
private final Set<ContainerReplica> replicas;
public ECContainerReplicaCount(ContainerInfo containerInfo,
Set<ContainerReplica> replicas,
List<ContainerReplicaOp> replicaPendingOps,
int remainingMaintenanceRedundancy) {
this.containerInfo = containerInfo;
this.replicas = replicas;
this.repConfig = (ECReplicationConfig)containerInfo.getReplicationConfig();
this.pendingAdd = new ArrayList<>();
this.pendingDelete = new ArrayList<>();
this.remainingMaintenanceRedundancy
= Math.min(repConfig.getParity(), remainingMaintenanceRedundancy);
for (ContainerReplicaOp op : replicaPendingOps) {
if (op.getOpType() == ContainerReplicaOp.PendingOpType.ADD) {
pendingAdd.add(op.getReplicaIndex());
} else if (op.getOpType() == ContainerReplicaOp.PendingOpType.DELETE) {
pendingDelete.add(op.getReplicaIndex());
}
}
for (ContainerReplica replica : replicas) {
HddsProtos.NodeOperationalState state =
replica.getDatanodeDetails().getPersistedOpState();
int index = replica.getReplicaIndex();
ensureIndexWithinBounds(index, "replicaSet");
if (state == DECOMMISSIONED || state == DECOMMISSIONING) {
int val = decommissionIndexes.getOrDefault(index, 0);
decommissionIndexes.put(index, val + 1);
} else if (state == IN_MAINTENANCE || state == ENTERING_MAINTENANCE) {
int val = maintenanceIndexes.getOrDefault(index, 0);
maintenanceIndexes.put(index, val + 1);
} else {
int val = healthyIndexes.getOrDefault(index, 0);
healthyIndexes.put(index, val + 1);
}
}
// Remove the pending delete replicas from the healthy set as we assume they
// will eventually be removed and reduce the count for this replica. If the
// count goes to zero, remove it from the map.
for (Integer i : pendingDelete) {
ensureIndexWithinBounds(i, "pendingDelete");
Integer count = healthyIndexes.get(i);
if (count != null) {
count = count - 1;
if (count < 1) {
healthyIndexes.remove(i);
} else {
healthyIndexes.put(i, count);
}
}
}
// Ensure any pending adds are within bounds
for (Integer i : pendingAdd) {
ensureIndexWithinBounds(i, "pendingAdd");
}
}
@Override
public ContainerInfo getContainer() {
return containerInfo;
}
@Override
public Set<ContainerReplica> getReplicas() {
return replicas;
}
@Override
public int getDecommissionCount() {
return decommissionIndexes.size();
}
@Override
public int getMaintenanceCount() {
return maintenanceIndexes.size();
}
/**
* Get a set containing all decommissioning indexes, or an empty set if none
* are decommissioning. Note it is possible for an index to be
* decommissioning, healthy and in maintenance, if there are multiple copies
* of it.
* @return Set of indexes in decommission
*/
public Set<Integer> decommissioningIndexes() {
return decommissionIndexes.keySet();
}
/**
* Get a set containing all decommissioning only indexes, or an empty set if
* none are decommissioning.
* @param includePendingAdd - removes the indexes from
* decommissioningOnlyIndexes if we already scheduled
* for reconstruction before.
* @return Set of indexes in decommission only.
*/
public Set<Integer> decommissioningOnlyIndexes(boolean includePendingAdd) {
Set<Integer> decommissioningOnlyIndexes = new HashSet<>();
for (Integer i : decommissionIndexes.keySet()) {
if (!healthyIndexes.containsKey(i)) {
decommissioningOnlyIndexes.add(i);
}
}
// Now we have a list of decommissionIndexes. Remove any pending add as they
// should eventually recover.
if (includePendingAdd) {
for (Integer i : pendingAdd) {
decommissioningOnlyIndexes.remove(i);
}
}
return decommissioningOnlyIndexes;
}
/**
* Get a set containing all maintenance indexes, or an empty set if none are
* in maintenance. Note it is possible for an index to be
* decommissioning, healthy and in maintenance, if there are multiple copies
* of it.
* @return Set of indexes in maintenance
*/
public Set<Integer> maintenanceIndexes() {
return maintenanceIndexes.keySet();
}
/**
* Return true if there are insufficient replicas to recover this container.
* Ie, less than EC Datanum containers are present.
* @return True if the container cannot be recovered, false otherwise.
*/
@Override
public boolean isUnrecoverable() {
Set<Integer> distinct = new HashSet<>();
distinct.addAll(healthyIndexes.keySet());
distinct.addAll(decommissionIndexes.keySet());
distinct.addAll(maintenanceIndexes.keySet());
return distinct.size() < repConfig.getData();
}
/**
* Returns an unsorted list of indexes which need additional copies to
* ensure the container is sufficiently replicated. These missing indexes will
* not be on maintenance nodes, or decommission nodes.
* Replicas pending delete are assumed to be removed.
* If includePendingAdd is true, any replicas pending add
* are assume to be created and omitted them from the returned list. If it is
* true, we assume the pendingAdd will complete, giving a view of the
* potential future state of the container.
* This list can be used to determine which replicas must be recovered via an
* EC reconstuction, rather than making copies of maintenance / decommission
* replicas
* @param includePendingAdd If true, treat pending add containers as if they
* have completed successfully.
* @return List of missing indexes which have no online copy.
*/
public List<Integer> unavailableIndexes(boolean includePendingAdd) {
if (isSufficientlyReplicated(false)) {
return Collections.emptyList();
}
Set<Integer> missing = new HashSet<>();
for (int i = 1; i <= repConfig.getRequiredNodes(); i++) {
if (!healthyIndexes.containsKey(i)) {
missing.add(i);
}
}
// Now we have a list of missing. Remove any pending add as they should
// eventually recover.
if (includePendingAdd) {
for (Integer i : pendingAdd) {
missing.remove(i);
}
}
// Remove any maintenance copies, as they are still available. What remains
// is the set of indexes we have no copy of, and hence must get re-created
for (Integer i : maintenanceIndexes.keySet()) {
missing.remove(i);
}
// Remove any decommission copies, as they are still available
for (Integer i : decommissionIndexes.keySet()) {
missing.remove(i);
}
return missing.stream().collect(Collectors.toList());
}
/**
* Returns an unsorted list of replicas that are on a maintenance node, but
* have no other copies on in_service nodes. This list can be used in
* conjunction with additionalMaintenanceCopiesNeeded, to select replicas to
* copy to ensure the maintenance redundancy goal is met.
* @return
*/
public List<Integer> maintenanceOnlyIndexes() {
List<Integer> maintenanceOnly = new ArrayList<>();
for (Integer i : maintenanceIndexes.keySet()) {
if (!healthyIndexes.containsKey(i)) {
maintenanceOnly.add(i);
}
}
return maintenanceOnly;
}
/**
* Get the number of additional replicas needed to make the container
* sufficiently replicated for maintenance. For EC-3-2, if there is a
* remainingMaintenanceRedundancy of 1, and two replicas in maintenance,
* this will return 1, indicating one of the maintenance replicas must be
* copied to an in-service node to meet the redundancy guarantee.
* @return
*/
public int additionalMaintenanceCopiesNeeded() {
List<Integer> maintenanceOnly = maintenanceOnlyIndexes();
return Math.max(0, maintenanceOnly.size() - getMaxMaintenance());
}
/**
* If any index has more than one copy that is not in maintenance or
* decommission, then the container is over replicated. If the
* includePendingDeletes flag is false we ignore replicas pending delete.
* If it is true, we assume inflight deletes have been removed, giving
* a view of the future state of the container if they complete successfully.
* Pending add are always ignored as they may fail to create.
* Note it is possible for a container to be both over and under replicated
* as it could have multiple copies of 1 index, but zero copies of another
* index.
* @param includePendingDelete If true, treat replicas pending delete as if
* they have deleted successfully.
* @return True if overReplicated, false otherwise.
*/
public boolean isOverReplicated(boolean includePendingDelete) {
final Map<Integer, Integer> availableIndexes
= getHealthyWithDelete(includePendingDelete);
for (Integer count : availableIndexes.values()) {
if (count > 1) {
return true;
}
}
return false;
}
@Override
public boolean isOverReplicated() {
return isOverReplicated(false);
}
/**
* Return an unsorted list of any replica indexes which have more than one
* replica and are therefore over-replicated. Maintenance replicas are ignored
* as if we have excess including maintenance, it may be due to replication
* which was needed to ensure sufficient redundancy for maintenance.
* Pending adds are ignored as they may fail to complete.
* If the includePendingDeletes flag is false we ignore replicas pending
* delete. If it is true, we assume inflight deletes have been removed, giving
* a view of the future state of the container if they complete successfully.
* Pending deletes are assumed to complete and any indexes returned from here
* will have the pending deletes already removed.
* @param includePendingDelete If true, treat replicas pending delete as if
* * they have deleted successfully.
* @return List of indexes which are over-replicated.
*/
public List<Integer> overReplicatedIndexes(boolean includePendingDelete) {
final Map<Integer, Integer> availableIndexes =
getHealthyWithDelete(includePendingDelete);
List<Integer> indexes = new ArrayList<>();
for (Map.Entry<Integer, Integer> entry : availableIndexes.entrySet()) {
if (entry.getValue() > 1) {
indexes.add(entry.getKey());
}
}
return indexes;
}
private Map<Integer, Integer> getHealthyWithDelete(boolean includeDelete) {
final Map<Integer, Integer> availableIndexes;
if (includeDelete) {
// Deletes are already removed from the healthy list so just use the
// healthy list
availableIndexes = Collections.unmodifiableMap(healthyIndexes);
} else {
availableIndexes = new HashMap<>(healthyIndexes);
pendingDelete.forEach(k -> availableIndexes.merge(k, 1, Integer::sum));
}
return availableIndexes;
}
/**
* The container is sufficiently replicated if the healthy indexes minus any
* pending deletes give a complete set of container indexes. If not, we must
* also check the maintenance indexes - the container is still sufficiently
* replicated if the complete set is made up of healthy + maintenance and
* there is still sufficient maintenance redundancy.
* If the includePendingAdd flag is set to true, this method treats replicas
* pending add as if they have completed and hence shows the potential future
* state of the container assuming they all complete.
* @param includePendingAdd If true, treat pending add containers as if they
* have completed successfully.
* @return True if sufficiently replicated, false otherwise.
*/
public boolean isSufficientlyReplicated(boolean includePendingAdd) {
final Map<Integer, Integer> onlineIndexes;
if (includePendingAdd) {
onlineIndexes = new HashMap<>(healthyIndexes);
pendingAdd.forEach(k -> onlineIndexes.merge(k, 1, Integer::sum));
} else {
onlineIndexes = Collections.unmodifiableMap(healthyIndexes);
}
if (hasFullSetOfIndexes(onlineIndexes)) {
return true;
}
// Check if the maintenance copies give a full set and also that we do not
// have too many in maintenance
Map<Integer, Integer> healthy = new HashMap<>(onlineIndexes);
maintenanceIndexes.forEach((k, v) -> healthy.merge(k, v, Integer::sum));
return hasFullSetOfIndexes(healthy) && onlineIndexes.size()
>= repConfig.getData() + remainingMaintenanceRedundancy;
}
@Override
public boolean isSufficientlyReplicated() {
return isSufficientlyReplicated(false);
}
/**
* Check if there is an entry in the map for all expected replica indexes,
* and also that the count against each index is greater than zero.
* @param indexSet A map representing the replica index and count of the
* replicas for that index.
* @return True if there is a full set of indexes, false otherwise.
*/
private boolean hasFullSetOfIndexes(Map<Integer, Integer> indexSet) {
return indexSet.size() == repConfig.getRequiredNodes();
}
/**
* Returns the maximum number of replicas that are allowed to be only on a
* maintenance node, with no other copies on in-service nodes.
* @return
*/
private int getMaxMaintenance() {
return Math.max(0, repConfig.getParity() - remainingMaintenanceRedundancy);
}
/**
* Validate to ensure that the replia index is between 1 and the max expected
* replica index for the replication config, eg 5 for 3-2, 9 for 6-3 etc.
* @param index The replica index to check.
* @Throws IllegalArgumentException if the index is out of bounds.
*/
private void ensureIndexWithinBounds(Integer index, String setName) {
if (index < 1 || index > repConfig.getRequiredNodes()) {
throw new IllegalArgumentException("Replica Index in " + setName
+ " for containerID " + containerInfo.getContainerID()
+ "must be between 1 and " + repConfig.getRequiredNodes()
+ ". But the given index is: " + index);
}
}
}