| /** |
| * Licensed to the Apache Software Foundation (ASF) under one |
| * or more contributor license agreements. See the NOTICE file |
| * distributed with this work for additional information |
| * regarding copyright ownership. The ASF licenses this file |
| * to you under the Apache License, Version 2.0 (the |
| * "License"); you may not use this file except in compliance |
| * with the License. You may obtain a copy of the License at |
| * <p> |
| * http://www.apache.org/licenses/LICENSE-2.0 |
| * <p> |
| * Unless required by applicable law or agreed to in writing, software |
| * distributed under the License is distributed on an "AS IS" BASIS, |
| * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. |
| * See the License for the specific language governing permissions and |
| * limitations under the License. |
| */ |
| |
| package org.apache.hadoop.hdds.scm.container; |
| |
| import java.io.IOException; |
| import java.time.Duration; |
| import java.util.ArrayList; |
| import java.util.Collections; |
| import java.util.HashSet; |
| import java.util.Iterator; |
| import java.util.LinkedHashMap; |
| import java.util.List; |
| import java.util.Map; |
| import java.util.Set; |
| import java.util.StringJoiner; |
| import java.util.UUID; |
| import java.util.concurrent.ConcurrentHashMap; |
| import java.util.function.Consumer; |
| import java.util.function.Predicate; |
| import java.util.stream.Collectors; |
| |
| import org.apache.hadoop.hdds.conf.Config; |
| import org.apache.hadoop.hdds.conf.ConfigGroup; |
| import org.apache.hadoop.hdds.conf.ConfigType; |
| import org.apache.hadoop.hdds.protocol.DatanodeDetails; |
| import org.apache.hadoop.hdds.protocol.proto.HddsProtos; |
| import org.apache.hadoop.hdds.protocol.proto.HddsProtos.LifeCycleState; |
| import org.apache.hadoop.hdds.protocol.proto.HddsProtos.NodeState; |
| import org.apache.hadoop.hdds.protocol.proto.StorageContainerDatanodeProtocolProtos.ContainerReplicaProto.State; |
| import org.apache.hadoop.hdds.scm.ContainerPlacementStatus; |
| import org.apache.hadoop.hdds.scm.PlacementPolicy; |
| import org.apache.hadoop.hdds.scm.events.SCMEvents; |
| import org.apache.hadoop.hdds.scm.node.NodeManager; |
| import org.apache.hadoop.hdds.scm.node.NodeStatus; |
| import org.apache.hadoop.hdds.scm.node.states.NodeNotFoundException; |
| import org.apache.hadoop.hdds.scm.safemode.SCMSafeModeManager.SafeModeStatus; |
| import org.apache.hadoop.hdds.server.events.EventHandler; |
| import org.apache.hadoop.hdds.server.events.EventPublisher; |
| import org.apache.hadoop.metrics2.MetricsCollector; |
| import org.apache.hadoop.metrics2.MetricsInfo; |
| import org.apache.hadoop.metrics2.MetricsSource; |
| import org.apache.hadoop.metrics2.lib.DefaultMetricsSystem; |
| import org.apache.hadoop.ozone.lock.LockManager; |
| import org.apache.hadoop.ozone.protocol.commands.CloseContainerCommand; |
| import org.apache.hadoop.ozone.protocol.commands.CommandForDatanode; |
| import org.apache.hadoop.ozone.protocol.commands.DeleteContainerCommand; |
| import org.apache.hadoop.ozone.protocol.commands.ReplicateContainerCommand; |
| import org.apache.hadoop.ozone.protocol.commands.SCMCommand; |
| import org.apache.hadoop.util.ExitUtil; |
| import org.apache.hadoop.util.Time; |
| |
| import com.google.common.annotations.VisibleForTesting; |
| import com.google.protobuf.GeneratedMessage; |
| import edu.umd.cs.findbugs.annotations.SuppressFBWarnings; |
| import static org.apache.hadoop.hdds.conf.ConfigTag.OZONE; |
| import static org.apache.hadoop.hdds.conf.ConfigTag.SCM; |
| import org.apache.ratis.util.Preconditions; |
| import org.slf4j.Logger; |
| import org.slf4j.LoggerFactory; |
| |
| /** |
| * Replication Manager (RM) is the one which is responsible for making sure |
| * that the containers are properly replicated. Replication Manager deals only |
| * with Quasi Closed / Closed container. |
| */ |
| public class ReplicationManager |
| implements MetricsSource, EventHandler<SafeModeStatus> { |
| |
| public static final Logger LOG = |
| LoggerFactory.getLogger(ReplicationManager.class); |
| |
| public static final String METRICS_SOURCE_NAME = "SCMReplicationManager"; |
| |
| /** |
| * Reference to the ContainerManager. |
| */ |
| private final ContainerManager containerManager; |
| |
| /** |
| * PlacementPolicy which is used to identify where a container |
| * should be replicated. |
| */ |
| private final PlacementPolicy containerPlacement; |
| |
| /** |
| * EventPublisher to fire Replicate and Delete container events. |
| */ |
| private final EventPublisher eventPublisher; |
| |
| /** |
| * Used for locking a container using its ID while processing it. |
| */ |
| private final LockManager<ContainerID> lockManager; |
| |
| /** |
| * Used to lookup the health of a nodes or the nodes operational state. |
| */ |
| private final NodeManager nodeManager; |
| |
| /** |
| * This is used for tracking container replication commands which are issued |
| * by ReplicationManager and not yet complete. |
| */ |
| private final Map<ContainerID, List<InflightAction>> inflightReplication; |
| |
| /** |
| * This is used for tracking container deletion commands which are issued |
| * by ReplicationManager and not yet complete. |
| */ |
| private final Map<ContainerID, List<InflightAction>> inflightDeletion; |
| |
| /** |
| * ReplicationManager specific configuration. |
| */ |
| private final ReplicationManagerConfiguration conf; |
| |
| /** |
| * ReplicationMonitor thread is the one which wakes up at configured |
| * interval and processes all the containers. |
| */ |
| private Thread replicationMonitor; |
| |
| /** |
| * Flag used for checking if the ReplicationMonitor thread is running or |
| * not. |
| */ |
| private volatile boolean running; |
| |
| /** |
| * Minimum number of replica in a healthy state for maintenance. |
| */ |
| private int minHealthyForMaintenance; |
| |
| /** |
| * Constructs ReplicationManager instance with the given configuration. |
| * |
| * @param conf OzoneConfiguration |
| * @param containerManager ContainerManager |
| * @param containerPlacement PlacementPolicy |
| * @param eventPublisher EventPublisher |
| */ |
| public ReplicationManager(final ReplicationManagerConfiguration conf, |
| final ContainerManager containerManager, |
| final PlacementPolicy containerPlacement, |
| final EventPublisher eventPublisher, |
| final LockManager<ContainerID> lockManager, |
| final NodeManager nodeManager) { |
| this.containerManager = containerManager; |
| this.containerPlacement = containerPlacement; |
| this.eventPublisher = eventPublisher; |
| this.lockManager = lockManager; |
| this.nodeManager = nodeManager; |
| this.conf = conf; |
| this.running = false; |
| this.inflightReplication = new ConcurrentHashMap<>(); |
| this.inflightDeletion = new ConcurrentHashMap<>(); |
| this.minHealthyForMaintenance = conf.getMaintenanceReplicaMinimum(); |
| } |
| |
| /** |
| * Starts Replication Monitor thread. |
| */ |
| public synchronized void start() { |
| |
| if (!isRunning()) { |
| DefaultMetricsSystem.instance().register(METRICS_SOURCE_NAME, |
| "SCM Replication manager (closed container replication) related " |
| + "metrics", |
| this); |
| LOG.info("Starting Replication Monitor Thread."); |
| running = true; |
| replicationMonitor = new Thread(this::run); |
| replicationMonitor.setName("ReplicationMonitor"); |
| replicationMonitor.setDaemon(true); |
| replicationMonitor.start(); |
| } else { |
| LOG.info("Replication Monitor Thread is already running."); |
| } |
| } |
| |
| /** |
| * Returns true if the Replication Monitor Thread is running. |
| * |
| * @return true if running, false otherwise |
| */ |
| public boolean isRunning() { |
| if (!running) { |
| synchronized (this) { |
| return replicationMonitor != null |
| && replicationMonitor.isAlive(); |
| } |
| } |
| return true; |
| } |
| |
| /** |
| * Process all the containers immediately. |
| */ |
| @VisibleForTesting |
| @SuppressFBWarnings(value="NN_NAKED_NOTIFY", |
| justification="Used only for testing") |
| public synchronized void processContainersNow() { |
| notifyAll(); |
| } |
| |
| /** |
| * Stops Replication Monitor thread. |
| */ |
| public synchronized void stop() { |
| if (running) { |
| LOG.info("Stopping Replication Monitor Thread."); |
| inflightReplication.clear(); |
| inflightDeletion.clear(); |
| running = false; |
| DefaultMetricsSystem.instance().unregisterSource(METRICS_SOURCE_NAME); |
| notifyAll(); |
| } else { |
| LOG.info("Replication Monitor Thread is not running."); |
| } |
| } |
| |
| /** |
| * ReplicationMonitor thread runnable. This wakes up at configured |
| * interval and processes all the containers in the system. |
| */ |
| private synchronized void run() { |
| try { |
| while (running) { |
| final long start = Time.monotonicNow(); |
| final Set<ContainerID> containerIds = |
| containerManager.getContainerIDs(); |
| containerIds.forEach(this::processContainer); |
| |
| LOG.info("Replication Monitor Thread took {} milliseconds for" + |
| " processing {} containers.", Time.monotonicNow() - start, |
| containerIds.size()); |
| |
| wait(conf.getInterval()); |
| } |
| } catch (Throwable t) { |
| // When we get runtime exception, we should terminate SCM. |
| LOG.error("Exception in Replication Monitor Thread.", t); |
| ExitUtil.terminate(1, t); |
| } |
| } |
| |
| /** |
| * Process the given container. |
| * |
| * @param id ContainerID |
| */ |
| private void processContainer(ContainerID id) { |
| lockManager.writeLock(id); |
| try { |
| final ContainerInfo container = containerManager.getContainer(id); |
| final Set<ContainerReplica> replicas = containerManager |
| .getContainerReplicas(container.containerID()); |
| final LifeCycleState state = container.getState(); |
| |
| /* |
| * We don't take any action if the container is in OPEN state and |
| * the container is healthy. If the container is not healthy, i.e. |
| * the replicas are not in OPEN state, send CLOSE_CONTAINER command. |
| */ |
| if (state == LifeCycleState.OPEN) { |
| if (!isOpenContainerHealthy(container, replicas)) { |
| eventPublisher.fireEvent(SCMEvents.CLOSE_CONTAINER, id); |
| } |
| return; |
| } |
| |
| /* |
| * If the container is in CLOSING state, the replicas can either |
| * be in OPEN or in CLOSING state. In both of this cases |
| * we have to resend close container command to the datanodes. |
| */ |
| if (state == LifeCycleState.CLOSING) { |
| replicas.forEach(replica -> sendCloseCommand( |
| container, replica.getDatanodeDetails(), false)); |
| return; |
| } |
| |
| /* |
| * If the container is in QUASI_CLOSED state, check and close the |
| * container if possible. |
| */ |
| if (state == LifeCycleState.QUASI_CLOSED && |
| canForceCloseContainer(container, replicas)) { |
| forceCloseContainer(container, replicas); |
| return; |
| } |
| |
| /* |
| * Before processing the container we have to reconcile the |
| * inflightReplication and inflightDeletion actions. |
| * |
| * We remove the entry from inflightReplication and inflightDeletion |
| * list, if the operation is completed or if it has timed out. |
| */ |
| updateInflightAction(container, inflightReplication, |
| action -> replicas.stream() |
| .anyMatch(r -> r.getDatanodeDetails().equals(action.datanode))); |
| |
| updateInflightAction(container, inflightDeletion, |
| action -> replicas.stream() |
| .noneMatch(r -> r.getDatanodeDetails().equals(action.datanode))); |
| |
| /* |
| * If container is under deleting and all it's replicas are deleted, then |
| * make the container as CLEANED, or resend the delete replica command if |
| * needed. |
| */ |
| if (state == LifeCycleState.DELETING) { |
| handleContainerUnderDelete(container, replicas); |
| return; |
| } |
| |
| /** |
| * We don't need to take any action for a DELETE container - eventually |
| * it will be removed from SCM. |
| */ |
| if (state == LifeCycleState.DELETED) { |
| return; |
| } |
| |
| ContainerReplicaCount replicaSet = |
| getContainerReplicaCount(container, replicas); |
| ContainerPlacementStatus placementStatus = getPlacementStatus( |
| replicas, container.getReplicationFactor().getNumber()); |
| |
| /* |
| * We don't have to take any action if the container is healthy. |
| * |
| * According to ReplicationMonitor container is considered healthy if |
| * the container is either in QUASI_CLOSED or in CLOSED state and has |
| * exact number of replicas in the same state. |
| */ |
| if (isContainerEmpty(container, replicas)) { |
| /* |
| * If container is empty, schedule task to delete the container. |
| */ |
| deleteContainerReplicas(container, replicas); |
| return; |
| } |
| |
| /* |
| * Check if the container is under replicated and take appropriate |
| * action. |
| */ |
| if (!replicaSet.isSufficientlyReplicated() |
| || !placementStatus.isPolicySatisfied()) { |
| handleUnderReplicatedContainer(container, replicaSet, placementStatus); |
| return; |
| } |
| |
| /* |
| * Check if the container is over replicated and take appropriate |
| * action. |
| */ |
| if (replicaSet.isOverReplicated()) { |
| handleOverReplicatedContainer(container, replicaSet); |
| return; |
| } |
| |
| /* |
| If we get here, the container is not over replicated or under replicated |
| but it may be "unhealthy", which means it has one or more replica which |
| are not in the same state as the container itself. |
| */ |
| if (!replicaSet.isHealthy()) { |
| handleUnstableContainer(container, replicas); |
| } |
| |
| } catch (ContainerNotFoundException ex) { |
| LOG.warn("Missing container {}.", id); |
| } catch (Exception ex) { |
| LOG.warn("Process container {} error: ", id, ex); |
| } finally { |
| lockManager.writeUnlock(id); |
| } |
| } |
| |
| /** |
| * Reconciles the InflightActions for a given container. |
| * |
| * @param container Container to update |
| * @param inflightActions inflightReplication (or) inflightDeletion |
| * @param filter filter to check if the operation is completed |
| */ |
| private void updateInflightAction(final ContainerInfo container, |
| final Map<ContainerID, List<InflightAction>> inflightActions, |
| final Predicate<InflightAction> filter) { |
| final ContainerID id = container.containerID(); |
| final long deadline = Time.monotonicNow() - conf.getEventTimeout(); |
| if (inflightActions.containsKey(id)) { |
| final List<InflightAction> actions = inflightActions.get(id); |
| |
| Iterator<InflightAction> iter = actions.iterator(); |
| while(iter.hasNext()) { |
| try { |
| InflightAction a = iter.next(); |
| NodeState health = nodeManager.getNodeStatus(a.datanode) |
| .getHealth(); |
| if (health != NodeState.HEALTHY || a.time < deadline |
| || filter.test(a)) { |
| iter.remove(); |
| } |
| } catch (NodeNotFoundException e) { |
| // Should not happen, but if it does, just remove the action as the |
| // node somehow does not exist; |
| iter.remove(); |
| } |
| } |
| if (actions.isEmpty()) { |
| inflightActions.remove(id); |
| } |
| } |
| } |
| |
| /** |
| * Returns the number replica which are pending creation for the given |
| * container ID. |
| * @param id The ContainerID for which to check the pending replica |
| * @return The number of inflight additions or zero if none |
| */ |
| private int getInflightAdd(final ContainerID id) { |
| return inflightReplication.getOrDefault(id, Collections.emptyList()).size(); |
| } |
| |
| /** |
| * Returns the number replica which are pending delete for the given |
| * container ID. |
| * @param id The ContainerID for which to check the pending replica |
| * @return The number of inflight deletes or zero if none |
| */ |
| private int getInflightDel(final ContainerID id) { |
| return inflightDeletion.getOrDefault(id, Collections.emptyList()).size(); |
| } |
| |
| /** |
| * Returns true if the container is empty and CLOSED. |
| * |
| * @param container Container to check |
| * @param replicas Set of ContainerReplicas |
| * @return true if the container is empty, false otherwise |
| */ |
| private boolean isContainerEmpty(final ContainerInfo container, |
| final Set<ContainerReplica> replicas) { |
| return container.getState() == LifeCycleState.CLOSED && |
| (container.getUsedBytes() == 0 && container.getNumberOfKeys() == 0) && |
| replicas.stream().allMatch(r -> r.getState() == State.CLOSED && |
| r.getBytesUsed() == 0 && r.getKeyCount() == 0); |
| } |
| |
| /** |
| * Given a ContainerID, lookup the ContainerInfo and then return a |
| * ContainerReplicaCount object for the container. |
| * @param containerID The ID of the container |
| * @return ContainerReplicaCount for the given container |
| * @throws ContainerNotFoundException |
| */ |
| public ContainerReplicaCount getContainerReplicaCount(ContainerID containerID) |
| throws ContainerNotFoundException { |
| ContainerInfo container = containerManager.getContainer(containerID); |
| return getContainerReplicaCount(container); |
| } |
| |
| /** |
| * Given a container, obtain the set of known replica for it, and return a |
| * ContainerReplicaCount object. This object will contain the set of replica |
| * as well as all information required to determine if the container is over |
| * or under replicated, including the delta of replica required to repair the |
| * over or under replication. |
| * |
| * @param container The container to create a ContainerReplicaCount for |
| * @return ContainerReplicaCount representing the replicated state of the |
| * container. |
| * @throws ContainerNotFoundException |
| */ |
| public ContainerReplicaCount getContainerReplicaCount(ContainerInfo container) |
| throws ContainerNotFoundException { |
| lockManager.readLock(container.containerID()); |
| try { |
| final Set<ContainerReplica> replica = containerManager |
| .getContainerReplicas(container.containerID()); |
| return getContainerReplicaCount(container, replica); |
| } finally { |
| lockManager.readUnlock(container.containerID()); |
| } |
| } |
| |
| /** |
| * Given a container and its set of replicas, create and return a |
| * ContainerReplicaCount representing the container. |
| * |
| * @param container The container for which to construct a |
| * ContainerReplicaCount |
| * @param replica The set of existing replica for this container |
| * @return ContainerReplicaCount representing the current state of the |
| * container |
| */ |
| private ContainerReplicaCount getContainerReplicaCount( |
| ContainerInfo container, Set<ContainerReplica> replica) { |
| return new ContainerReplicaCount( |
| container, |
| replica, |
| getInflightAdd(container.containerID()), |
| getInflightDel(container.containerID()), |
| container.getReplicationFactor().getNumber(), |
| minHealthyForMaintenance); |
| } |
| |
| /** |
| * Returns true if more than 50% of the container replicas with unique |
| * originNodeId are in QUASI_CLOSED state. |
| * |
| * @param container Container to check |
| * @param replicas Set of ContainerReplicas |
| * @return true if we can force close the container, false otherwise |
| */ |
| private boolean canForceCloseContainer(final ContainerInfo container, |
| final Set<ContainerReplica> replicas) { |
| Preconditions.assertTrue(container.getState() == |
| LifeCycleState.QUASI_CLOSED); |
| final int replicationFactor = container.getReplicationFactor().getNumber(); |
| final long uniqueQuasiClosedReplicaCount = replicas.stream() |
| .filter(r -> r.getState() == State.QUASI_CLOSED) |
| .map(ContainerReplica::getOriginDatanodeId) |
| .distinct() |
| .count(); |
| return uniqueQuasiClosedReplicaCount > (replicationFactor / 2); |
| } |
| |
| /** |
| * Delete the container and its replicas. |
| * |
| * @param container ContainerInfo |
| * @param replicas Set of ContainerReplicas |
| */ |
| private void deleteContainerReplicas(final ContainerInfo container, |
| final Set<ContainerReplica> replicas) throws IOException { |
| Preconditions.assertTrue(container.getState() == |
| LifeCycleState.CLOSED); |
| Preconditions.assertTrue(container.getNumberOfKeys() == 0); |
| Preconditions.assertTrue(container.getUsedBytes() == 0); |
| |
| replicas.stream().forEach(rp -> { |
| Preconditions.assertTrue(rp.getState() == State.CLOSED); |
| Preconditions.assertTrue(rp.getBytesUsed() == 0); |
| Preconditions.assertTrue(rp.getKeyCount() == 0); |
| sendDeleteCommand(container, rp.getDatanodeDetails(), false); |
| }); |
| containerManager.updateContainerState(container.containerID(), |
| HddsProtos.LifeCycleEvent.DELETE); |
| LOG.debug("Deleting empty container {} replicas,", container.containerID()); |
| } |
| |
| /** |
| * Handle the container which is under delete. |
| * |
| * @param container ContainerInfo |
| * @param replicas Set of ContainerReplicas |
| */ |
| private void handleContainerUnderDelete(final ContainerInfo container, |
| final Set<ContainerReplica> replicas) throws IOException { |
| if (replicas.size() == 0) { |
| containerManager.updateContainerState(container.containerID(), |
| HddsProtos.LifeCycleEvent.CLEANUP); |
| LOG.debug("Container {} state changes to DELETED", |
| container.containerID()); |
| } else { |
| // Check whether to resend the delete replica command |
| final List<DatanodeDetails> deletionInFlight = inflightDeletion |
| .getOrDefault(container.containerID(), Collections.emptyList()) |
| .stream() |
| .map(action -> action.datanode) |
| .collect(Collectors.toList()); |
| Set<ContainerReplica> filteredReplicas = replicas.stream().filter( |
| r -> !deletionInFlight.contains(r.getDatanodeDetails())) |
| .collect(Collectors.toSet()); |
| // Resend the delete command |
| if (filteredReplicas.size() > 0) { |
| filteredReplicas.stream().forEach(rp -> { |
| sendDeleteCommand(container, rp.getDatanodeDetails(), false); |
| }); |
| LOG.debug("Resend delete Container {} command", |
| container.containerID()); |
| } |
| } |
| } |
| |
| /** |
| * Force close the container replica(s) with highest sequence Id. |
| * |
| * <p> |
| * Note: We should force close the container only if >50% (quorum) |
| * of replicas with unique originNodeId are in QUASI_CLOSED state. |
| * </p> |
| * |
| * @param container ContainerInfo |
| * @param replicas Set of ContainerReplicas |
| */ |
| private void forceCloseContainer(final ContainerInfo container, |
| final Set<ContainerReplica> replicas) { |
| Preconditions.assertTrue(container.getState() == |
| LifeCycleState.QUASI_CLOSED); |
| |
| final List<ContainerReplica> quasiClosedReplicas = replicas.stream() |
| .filter(r -> r.getState() == State.QUASI_CLOSED) |
| .collect(Collectors.toList()); |
| |
| final Long sequenceId = quasiClosedReplicas.stream() |
| .map(ContainerReplica::getSequenceId) |
| .max(Long::compare) |
| .orElse(-1L); |
| |
| LOG.info("Force closing container {} with BCSID {}," + |
| " which is in QUASI_CLOSED state.", |
| container.containerID(), sequenceId); |
| |
| quasiClosedReplicas.stream() |
| .filter(r -> sequenceId != -1L) |
| .filter(replica -> replica.getSequenceId().equals(sequenceId)) |
| .forEach(replica -> sendCloseCommand( |
| container, replica.getDatanodeDetails(), true)); |
| } |
| |
| /** |
| * If the given container is under replicated, identify a new set of |
| * datanode(s) to replicate the container using PlacementPolicy |
| * and send replicate container command to the identified datanode(s). |
| * |
| * @param container ContainerInfo |
| * @param replicaSet An instance of ContainerReplicaCount, containing the |
| * current replica count and inflight adds and deletes |
| */ |
| private void handleUnderReplicatedContainer(final ContainerInfo container, |
| final ContainerReplicaCount replicaSet, |
| final ContainerPlacementStatus placementStatus) { |
| LOG.debug("Handling under-replicated container: {}", |
| container.getContainerID()); |
| Set<ContainerReplica> replicas = replicaSet.getReplica(); |
| try { |
| |
| if (replicaSet.isSufficientlyReplicated() |
| && placementStatus.isPolicySatisfied()) { |
| LOG.info("The container {} with replicas {} is sufficiently "+ |
| "replicated and is not mis-replicated", |
| container.getContainerID(), replicaSet); |
| return; |
| } |
| int repDelta = replicaSet.additionalReplicaNeeded(); |
| final ContainerID id = container.containerID(); |
| final List<DatanodeDetails> deletionInFlight = inflightDeletion |
| .getOrDefault(id, Collections.emptyList()) |
| .stream() |
| .map(action -> action.datanode) |
| .collect(Collectors.toList()); |
| final List<DatanodeDetails> replicationInFlight = inflightReplication |
| .getOrDefault(id, Collections.emptyList()) |
| .stream() |
| .map(action -> action.datanode) |
| .collect(Collectors.toList()); |
| final List<DatanodeDetails> source = replicas.stream() |
| .filter(r -> |
| r.getState() == State.QUASI_CLOSED || |
| r.getState() == State.CLOSED) |
| // Exclude stale and dead nodes. This is particularly important for |
| // maintenance nodes, as the replicas will remain present in the |
| // container manager, even when they go dead. |
| .filter(r -> |
| getNodeStatus(r.getDatanodeDetails()).isHealthy()) |
| .filter(r -> !deletionInFlight.contains(r.getDatanodeDetails())) |
| .sorted((r1, r2) -> r2.getSequenceId().compareTo(r1.getSequenceId())) |
| .map(ContainerReplica::getDatanodeDetails) |
| .collect(Collectors.toList()); |
| if (source.size() > 0) { |
| final int replicationFactor = container |
| .getReplicationFactor().getNumber(); |
| // Want to check if the container is mis-replicated after considering |
| // inflight add and delete. |
| // Create a new list from source (healthy replicas minus pending delete) |
| List<DatanodeDetails> targetReplicas = new ArrayList<>(source); |
| // Then add any pending additions |
| targetReplicas.addAll(replicationInFlight); |
| final ContainerPlacementStatus inFlightplacementStatus = |
| containerPlacement.validateContainerPlacement( |
| targetReplicas, replicationFactor); |
| final int misRepDelta = inFlightplacementStatus.misReplicationCount(); |
| final int replicasNeeded |
| = repDelta < misRepDelta ? misRepDelta : repDelta; |
| if (replicasNeeded <= 0) { |
| LOG.debug("Container {} meets replication requirement with " + |
| "inflight replicas", id); |
| return; |
| } |
| |
| final List<DatanodeDetails> excludeList = replicas.stream() |
| .map(ContainerReplica::getDatanodeDetails) |
| .collect(Collectors.toList()); |
| excludeList.addAll(replicationInFlight); |
| final List<DatanodeDetails> selectedDatanodes = containerPlacement |
| .chooseDatanodes(excludeList, null, replicasNeeded, |
| container.getUsedBytes()); |
| if (repDelta > 0) { |
| LOG.info("Container {} is under replicated. Expected replica count" + |
| " is {}, but found {}.", id, replicationFactor, |
| replicationFactor - repDelta); |
| } |
| int newMisRepDelta = misRepDelta; |
| if (misRepDelta > 0) { |
| LOG.info("Container: {}. {}", |
| id, placementStatus.misReplicatedReason()); |
| // Check if the new target nodes (original plus newly selected nodes) |
| // makes the placement policy valid. |
| targetReplicas.addAll(selectedDatanodes); |
| newMisRepDelta = containerPlacement.validateContainerPlacement( |
| targetReplicas, replicationFactor).misReplicationCount(); |
| } |
| if (repDelta > 0 || newMisRepDelta < misRepDelta) { |
| // Only create new replicas if we are missing a replicas or |
| // the number of pending mis-replication has improved. No point in |
| // creating new replicas for mis-replicated containers unless it |
| // improves things. |
| for (DatanodeDetails datanode : selectedDatanodes) { |
| sendReplicateCommand(container, datanode, source); |
| } |
| } else { |
| LOG.warn("Container {} is mis-replicated, requiring {} additional " + |
| "replicas. After selecting new nodes, mis-replication has not " + |
| "improved. No additional replicas will be scheduled", |
| id, misRepDelta); |
| } |
| } else { |
| LOG.warn("Cannot replicate container {}, no healthy replica found.", |
| container.containerID()); |
| } |
| } catch (IOException | IllegalStateException ex) { |
| LOG.warn("Exception while replicating container {}.", |
| container.getContainerID(), ex); |
| } |
| } |
| |
| /** |
| * If the given container is over replicated, identify the datanode(s) |
| * to delete the container and send delete container command to the |
| * identified datanode(s). |
| * |
| * @param container ContainerInfo |
| * @param replicaSet An instance of ContainerReplicaCount, containing the |
| * current replica count and inflight adds and deletes |
| */ |
| private void handleOverReplicatedContainer(final ContainerInfo container, |
| final ContainerReplicaCount replicaSet) { |
| |
| final Set<ContainerReplica> replicas = replicaSet.getReplica(); |
| final ContainerID id = container.containerID(); |
| final int replicationFactor = container.getReplicationFactor().getNumber(); |
| int excess = replicaSet.additionalReplicaNeeded() * -1; |
| if (excess > 0) { |
| |
| LOG.info("Container {} is over replicated. Expected replica count" + |
| " is {}, but found {}.", id, replicationFactor, |
| replicationFactor + excess); |
| |
| final List<ContainerReplica> eligibleReplicas = new ArrayList<>(replicas); |
| |
| final Map<UUID, ContainerReplica> uniqueReplicas = |
| new LinkedHashMap<>(); |
| |
| if (container.getState() != LifeCycleState.CLOSED) { |
| replicas.stream() |
| .filter(r -> compareState(container.getState(), r.getState())) |
| .forEach(r -> uniqueReplicas |
| .putIfAbsent(r.getOriginDatanodeId(), r)); |
| |
| eligibleReplicas.removeAll(uniqueReplicas.values()); |
| } |
| // Replica which are maintenance or decommissioned are not eligible to |
| // be removed, as they do not count toward over-replication and they |
| // also many not be available |
| eligibleReplicas.removeIf(r -> |
| r.getDatanodeDetails().getPersistedOpState() != |
| HddsProtos.NodeOperationalState.IN_SERVICE); |
| |
| final List<ContainerReplica> unhealthyReplicas = eligibleReplicas |
| .stream() |
| .filter(r -> !compareState(container.getState(), r.getState())) |
| .collect(Collectors.toList()); |
| |
| // If there are unhealthy replicas, then we should remove them even if it |
| // makes the container violate the placement policy, as excess unhealthy |
| // containers are not really useful. It will be corrected later as a |
| // mis-replicated container will be seen as under-replicated. |
| for (ContainerReplica r : unhealthyReplicas) { |
| if (excess > 0) { |
| sendDeleteCommand(container, r.getDatanodeDetails(), true); |
| excess -= 1; |
| } else { |
| break; |
| } |
| } |
| // After removing all unhealthy replicas, if the container is still over |
| // replicated then we need to check if it is already mis-replicated. |
| // If it is, we do no harm by removing excess replicas. However, if it is |
| // not mis-replicated, then we can only remove replicas if they don't |
| // make the container become mis-replicated. |
| if (excess > 0) { |
| eligibleReplicas.removeAll(unhealthyReplicas); |
| Set<ContainerReplica> eligibleSet = new HashSet<>(eligibleReplicas); |
| ContainerPlacementStatus ps = |
| getPlacementStatus(eligibleSet, replicationFactor); |
| for (ContainerReplica r : eligibleReplicas) { |
| if (excess <= 0) { |
| break; |
| } |
| // First remove the replica we are working on from the set, and then |
| // check if the set is now mis-replicated. |
| eligibleSet.remove(r); |
| ContainerPlacementStatus nowPS = |
| getPlacementStatus(eligibleSet, replicationFactor); |
| if ((!ps.isPolicySatisfied() |
| && nowPS.actualPlacementCount() == ps.actualPlacementCount()) |
| || (ps.isPolicySatisfied() && nowPS.isPolicySatisfied())) { |
| // Remove the replica if the container was already unsatisfied |
| // and losing this replica keep actual placement count unchanged. |
| // OR if losing this replica still keep satisfied |
| sendDeleteCommand(container, r.getDatanodeDetails(), true); |
| excess -= 1; |
| continue; |
| } |
| // If we decided not to remove this replica, put it back into the set |
| eligibleSet.add(r); |
| } |
| if (excess > 0) { |
| LOG.info("The container {} is over replicated with {} excess " + |
| "replica. The excess replicas cannot be removed without " + |
| "violating the placement policy", container, excess); |
| } |
| } |
| } |
| } |
| |
| /** |
| * Given a set of ContainerReplica, transform it to a list of DatanodeDetails |
| * and then check if the list meets the container placement policy. |
| * @param replicas List of containerReplica |
| * @param replicationFactor Expected Replication Factor of the containe |
| * @return ContainerPlacementStatus indicating if the policy is met or not |
| */ |
| private ContainerPlacementStatus getPlacementStatus( |
| Set<ContainerReplica> replicas, int replicationFactor) { |
| List<DatanodeDetails> replicaDns = replicas.stream() |
| .map(c -> c.getDatanodeDetails()).collect(Collectors.toList()); |
| return containerPlacement.validateContainerPlacement( |
| replicaDns, replicationFactor); |
| } |
| |
| /** |
| * Handles unstable container. |
| * A container is inconsistent if any of the replica state doesn't |
| * match the container state. We have to take appropriate action |
| * based on state of the replica. |
| * |
| * @param container ContainerInfo |
| * @param replicas Set of ContainerReplicas |
| */ |
| private void handleUnstableContainer(final ContainerInfo container, |
| final Set<ContainerReplica> replicas) { |
| // Find unhealthy replicas |
| List<ContainerReplica> unhealthyReplicas = replicas.stream() |
| .filter(r -> !compareState(container.getState(), r.getState())) |
| .collect(Collectors.toList()); |
| |
| Iterator<ContainerReplica> iterator = unhealthyReplicas.iterator(); |
| while (iterator.hasNext()) { |
| final ContainerReplica replica = iterator.next(); |
| final State state = replica.getState(); |
| if (state == State.OPEN || state == State.CLOSING) { |
| sendCloseCommand(container, replica.getDatanodeDetails(), false); |
| iterator.remove(); |
| } |
| |
| if (state == State.QUASI_CLOSED) { |
| // Send force close command if the BCSID matches |
| if (container.getSequenceId() == replica.getSequenceId()) { |
| sendCloseCommand(container, replica.getDatanodeDetails(), true); |
| iterator.remove(); |
| } |
| } |
| } |
| |
| // Now we are left with the replicas which are either unhealthy or |
| // the BCSID doesn't match. These replicas should be deleted. |
| |
| /* |
| * If we have unhealthy replicas we go under replicated and then |
| * replicate the healthy copy. |
| * |
| * We also make sure that we delete only one unhealthy replica at a time. |
| * |
| * If there are two unhealthy replica: |
| * - Delete first unhealthy replica |
| * - Re-replicate the healthy copy |
| * - Delete second unhealthy replica |
| * - Re-replicate the healthy copy |
| * |
| * Note: Only one action will be executed in a single ReplicationMonitor |
| * iteration. So to complete all the above actions we need four |
| * ReplicationMonitor iterations. |
| */ |
| |
| unhealthyReplicas.stream().findFirst().ifPresent(replica -> |
| sendDeleteCommand(container, replica.getDatanodeDetails(), false)); |
| |
| } |
| |
| /** |
| * Sends close container command for the given container to the given |
| * datanode. |
| * |
| * @param container Container to be closed |
| * @param datanode The datanode on which the container |
| * has to be closed |
| * @param force Should be set to true if we want to close a |
| * QUASI_CLOSED container |
| */ |
| private void sendCloseCommand(final ContainerInfo container, |
| final DatanodeDetails datanode, |
| final boolean force) { |
| |
| LOG.info("Sending close container command for container {}" + |
| " to datanode {}.", container.containerID(), datanode); |
| |
| CloseContainerCommand closeContainerCommand = |
| new CloseContainerCommand(container.getContainerID(), |
| container.getPipelineID(), force); |
| eventPublisher.fireEvent(SCMEvents.DATANODE_COMMAND, |
| new CommandForDatanode<>(datanode.getUuid(), closeContainerCommand)); |
| } |
| |
| /** |
| * Sends replicate container command for the given container to the given |
| * datanode. |
| * |
| * @param container Container to be replicated |
| * @param datanode The destination datanode to replicate |
| * @param sources List of source nodes from where we can replicate |
| */ |
| private void sendReplicateCommand(final ContainerInfo container, |
| final DatanodeDetails datanode, |
| final List<DatanodeDetails> sources) { |
| |
| LOG.info("Sending replicate container command for container {}" + |
| " to datanode {}", container.containerID(), datanode); |
| |
| final ContainerID id = container.containerID(); |
| final ReplicateContainerCommand replicateCommand = |
| new ReplicateContainerCommand(id.getId(), sources); |
| inflightReplication.computeIfAbsent(id, k -> new ArrayList<>()); |
| sendAndTrackDatanodeCommand(datanode, replicateCommand, |
| action -> inflightReplication.get(id).add(action)); |
| } |
| |
| /** |
| * Sends delete container command for the given container to the given |
| * datanode. |
| * |
| * @param container Container to be deleted |
| * @param datanode The datanode on which the replica should be deleted |
| * @param force Should be set to true to delete an OPEN replica |
| */ |
| private void sendDeleteCommand(final ContainerInfo container, |
| final DatanodeDetails datanode, |
| final boolean force) { |
| |
| LOG.info("Sending delete container command for container {}" + |
| " to datanode {}", container.containerID(), datanode); |
| |
| final ContainerID id = container.containerID(); |
| final DeleteContainerCommand deleteCommand = |
| new DeleteContainerCommand(id.getId(), force); |
| inflightDeletion.computeIfAbsent(id, k -> new ArrayList<>()); |
| sendAndTrackDatanodeCommand(datanode, deleteCommand, |
| action -> inflightDeletion.get(id).add(action)); |
| } |
| |
| /** |
| * Creates CommandForDatanode with the given SCMCommand and fires |
| * DATANODE_COMMAND event to event queue. |
| * |
| * Tracks the command using the given tracker. |
| * |
| * @param datanode Datanode to which the command has to be sent |
| * @param command SCMCommand to be sent |
| * @param tracker Tracker which tracks the inflight actions |
| * @param <T> Type of SCMCommand |
| */ |
| private <T extends GeneratedMessage> void sendAndTrackDatanodeCommand( |
| final DatanodeDetails datanode, |
| final SCMCommand<T> command, |
| final Consumer<InflightAction> tracker) { |
| final CommandForDatanode<T> datanodeCommand = |
| new CommandForDatanode<>(datanode.getUuid(), command); |
| eventPublisher.fireEvent(SCMEvents.DATANODE_COMMAND, datanodeCommand); |
| tracker.accept(new InflightAction(datanode, Time.monotonicNow())); |
| } |
| |
| /** |
| * Wrap the call to nodeManager.getNodeStatus, catching any |
| * NodeNotFoundException and instead throwing an IllegalStateException. |
| * @param dn The datanodeDetails to obtain the NodeStatus for |
| * @return NodeStatus corresponding to the given Datanode. |
| */ |
| private NodeStatus getNodeStatus(DatanodeDetails dn) { |
| try { |
| return nodeManager.getNodeStatus(dn); |
| } catch (NodeNotFoundException e) { |
| throw new IllegalStateException("Unable to find NodeStatus for "+dn, e); |
| } |
| } |
| |
| /** |
| * Compares the container state with the replica state. |
| * |
| * @param containerState ContainerState |
| * @param replicaState ReplicaState |
| * @return true if the state matches, false otherwise |
| */ |
| public static boolean compareState(final LifeCycleState containerState, |
| final State replicaState) { |
| switch (containerState) { |
| case OPEN: |
| return replicaState == State.OPEN; |
| case CLOSING: |
| return replicaState == State.CLOSING; |
| case QUASI_CLOSED: |
| return replicaState == State.QUASI_CLOSED; |
| case CLOSED: |
| return replicaState == State.CLOSED; |
| case DELETING: |
| return false; |
| case DELETED: |
| return false; |
| default: |
| return false; |
| } |
| } |
| |
| /** |
| * An open container is healthy if all its replicas are in the same state as |
| * the container. |
| * @param container The container to check |
| * @param replicas The replicas belonging to the container |
| * @return True if the container is healthy, false otherwise |
| */ |
| private boolean isOpenContainerHealthy( |
| ContainerInfo container, Set<ContainerReplica> replicas) { |
| LifeCycleState state = container.getState(); |
| return replicas.stream() |
| .allMatch(r -> ReplicationManager.compareState(state, r.getState())); |
| } |
| |
| @Override |
| public void getMetrics(MetricsCollector collector, boolean all) { |
| collector.addRecord(ReplicationManager.class.getSimpleName()) |
| .addGauge(ReplicationManagerMetrics.INFLIGHT_REPLICATION, |
| inflightReplication.size()) |
| .addGauge(ReplicationManagerMetrics.INFLIGHT_DELETION, |
| inflightDeletion.size()) |
| .endRecord(); |
| } |
| |
| @Override |
| public void onMessage(SafeModeStatus status, |
| EventPublisher publisher) { |
| if (!status.isInSafeMode() && !this.isRunning()) { |
| this.start(); |
| } |
| } |
| |
| /** |
| * Wrapper class to hold the InflightAction with its start time. |
| */ |
| private static final class InflightAction { |
| |
| private final DatanodeDetails datanode; |
| private final long time; |
| |
| private InflightAction(final DatanodeDetails datanode, |
| final long time) { |
| this.datanode = datanode; |
| this.time = time; |
| } |
| } |
| |
| /** |
| * Configuration used by the Replication Manager. |
| */ |
| @ConfigGroup(prefix = "hdds.scm.replication") |
| public static class ReplicationManagerConfiguration { |
| /** |
| * The frequency in which ReplicationMonitor thread should run. |
| */ |
| @Config(key = "thread.interval", |
| type = ConfigType.TIME, |
| defaultValue = "300s", |
| tags = {SCM, OZONE}, |
| description = "There is a replication monitor thread running inside " + |
| "SCM which takes care of replicating the containers in the " + |
| "cluster. This property is used to configure the interval in " + |
| "which that thread runs." |
| ) |
| private long interval = Duration.ofSeconds(300).toMillis(); |
| |
| /** |
| * Timeout for container replication & deletion command issued by |
| * ReplicationManager. |
| */ |
| @Config(key = "event.timeout", |
| type = ConfigType.TIME, |
| defaultValue = "30m", |
| tags = {SCM, OZONE}, |
| description = "Timeout for the container replication/deletion commands " |
| + "sent to datanodes. After this timeout the command will be " |
| + "retried.") |
| private long eventTimeout = Duration.ofMinutes(30).toMillis(); |
| public void setInterval(Duration interval) { |
| this.interval = interval.toMillis(); |
| } |
| |
| public void setEventTimeout(Duration timeout) { |
| this.eventTimeout = timeout.toMillis(); |
| } |
| |
| /** |
| * The number of container replica which must be available for a node to |
| * enter maintenance. |
| */ |
| @Config(key = "maintenance.replica.minimum", |
| type = ConfigType.INT, |
| defaultValue = "2", |
| tags = {SCM, OZONE}, |
| description = "The minimum number of container replicas which must " + |
| " be available for a node to enter maintenance. If putting a " + |
| " node into maintenance reduces the available replicas for any " + |
| " container below this level, the node will remain in the " + |
| " entering maintenance state until a new replica is created.") |
| private int maintenanceReplicaMinimum = 2; |
| |
| public void setMaintenanceReplicaMinimum(int replicaCount) { |
| this.maintenanceReplicaMinimum = replicaCount; |
| } |
| |
| public long getInterval() { |
| return interval; |
| } |
| |
| public long getEventTimeout() { |
| return eventTimeout; |
| } |
| |
| public int getMaintenanceReplicaMinimum() { |
| return maintenanceReplicaMinimum; |
| } |
| } |
| |
| /** |
| * Metric name definitions for Replication manager. |
| */ |
| public enum ReplicationManagerMetrics implements MetricsInfo { |
| |
| INFLIGHT_REPLICATION("Tracked inflight container replication requests."), |
| INFLIGHT_DELETION("Tracked inflight container deletion requests."); |
| |
| private final String desc; |
| |
| ReplicationManagerMetrics(String desc) { |
| this.desc = desc; |
| } |
| |
| @Override |
| public String description() { |
| return desc; |
| } |
| |
| @Override |
| public String toString() { |
| return new StringJoiner(", ", this.getClass().getSimpleName() + "{", "}") |
| .add("name=" + name()) |
| .add("description=" + desc) |
| .toString(); |
| } |
| } |
| } |