blob: 5540d737cb92d15df3599641d052a1f34bded9d7 [file] [log] [blame]
/**
* Licensed to the Apache Software Foundation (ASF) under one
* or more contributor license agreements. See the NOTICE file
* distributed with this work for additional information
* regarding copyright ownership. The ASF licenses this file
* to you under the Apache License, Version 2.0 (the
* "License"); you may not use this file except in compliance
* with the License. You may obtain a copy of the License at
* <p>
* http://www.apache.org/licenses/LICENSE-2.0
* <p>
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/
package org.apache.hadoop.hdds.scm.container;
import java.io.IOException;
import java.util.ArrayList;
import java.util.Collections;
import java.util.Iterator;
import java.util.LinkedHashMap;
import java.util.List;
import java.util.Map;
import java.util.Set;
import java.util.StringJoiner;
import java.util.UUID;
import java.util.concurrent.ConcurrentHashMap;
import java.util.function.Consumer;
import java.util.function.Predicate;
import java.util.stream.Collectors;
import org.apache.hadoop.hdds.conf.Config;
import org.apache.hadoop.hdds.conf.ConfigGroup;
import org.apache.hadoop.hdds.conf.ConfigType;
import org.apache.hadoop.hdds.protocol.DatanodeDetails;
import org.apache.hadoop.hdds.protocol.proto.HddsProtos.LifeCycleState;
import org.apache.hadoop.hdds.protocol.proto.StorageContainerDatanodeProtocolProtos.ContainerReplicaProto.State;
import org.apache.hadoop.hdds.scm.container.placement.algorithms.ContainerPlacementPolicy;
import org.apache.hadoop.hdds.scm.events.SCMEvents;
import org.apache.hadoop.hdds.server.events.EventPublisher;
import org.apache.hadoop.metrics2.MetricsCollector;
import org.apache.hadoop.metrics2.MetricsInfo;
import org.apache.hadoop.metrics2.MetricsSource;
import org.apache.hadoop.metrics2.lib.DefaultMetricsSystem;
import org.apache.hadoop.ozone.lock.LockManager;
import org.apache.hadoop.ozone.protocol.commands.CloseContainerCommand;
import org.apache.hadoop.ozone.protocol.commands.CommandForDatanode;
import org.apache.hadoop.ozone.protocol.commands.DeleteContainerCommand;
import org.apache.hadoop.ozone.protocol.commands.ReplicateContainerCommand;
import org.apache.hadoop.ozone.protocol.commands.SCMCommand;
import org.apache.hadoop.util.ExitUtil;
import org.apache.hadoop.util.Time;
import com.google.common.annotations.VisibleForTesting;
import com.google.protobuf.GeneratedMessage;
import edu.umd.cs.findbugs.annotations.SuppressFBWarnings;
import static org.apache.hadoop.hdds.conf.ConfigTag.OZONE;
import static org.apache.hadoop.hdds.conf.ConfigTag.SCM;
import org.apache.ratis.util.Preconditions;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
/**
* Replication Manager (RM) is the one which is responsible for making sure
* that the containers are properly replicated. Replication Manager deals only
* with Quasi Closed / Closed container.
*/
public class ReplicationManager implements MetricsSource {
private static final Logger LOG =
LoggerFactory.getLogger(ReplicationManager.class);
public static final String METRICS_SOURCE_NAME = "SCMReplicationManager";
/**
* Reference to the ContainerManager.
*/
private final ContainerManager containerManager;
/**
* PlacementPolicy which is used to identify where a container
* should be replicated.
*/
private final ContainerPlacementPolicy containerPlacement;
/**
* EventPublisher to fire Replicate and Delete container events.
*/
private final EventPublisher eventPublisher;
/**
* Used for locking a container using its ID while processing it.
*/
private final LockManager<ContainerID> lockManager;
/**
* This is used for tracking container replication commands which are issued
* by ReplicationManager and not yet complete.
*/
private final Map<ContainerID, List<InflightAction>> inflightReplication;
/**
* This is used for tracking container deletion commands which are issued
* by ReplicationManager and not yet complete.
*/
private final Map<ContainerID, List<InflightAction>> inflightDeletion;
/**
* ReplicationManager specific configuration.
*/
private final ReplicationManagerConfiguration conf;
/**
* ReplicationMonitor thread is the one which wakes up at configured
* interval and processes all the containers.
*/
private Thread replicationMonitor;
/**
* Flag used for checking if the ReplicationMonitor thread is running or
* not.
*/
private volatile boolean running;
/**
* Constructs ReplicationManager instance with the given configuration.
*
* @param conf OzoneConfiguration
* @param containerManager ContainerManager
* @param containerPlacement ContainerPlacementPolicy
* @param eventPublisher EventPublisher
*/
public ReplicationManager(final ReplicationManagerConfiguration conf,
final ContainerManager containerManager,
final ContainerPlacementPolicy containerPlacement,
final EventPublisher eventPublisher,
final LockManager<ContainerID> lockManager) {
this.containerManager = containerManager;
this.containerPlacement = containerPlacement;
this.eventPublisher = eventPublisher;
this.lockManager = lockManager;
this.conf = conf;
this.running = false;
this.inflightReplication = new ConcurrentHashMap<>();
this.inflightDeletion = new ConcurrentHashMap<>();
}
/**
* Starts Replication Monitor thread.
*/
public synchronized void start() {
if (!isRunning()) {
DefaultMetricsSystem.instance().register(METRICS_SOURCE_NAME,
"SCM Replication manager (closed container replication) related "
+ "metrics",
this);
LOG.info("Starting Replication Monitor Thread.");
running = true;
replicationMonitor = new Thread(this::run);
replicationMonitor.setName("ReplicationMonitor");
replicationMonitor.setDaemon(true);
replicationMonitor.start();
} else {
LOG.info("Replication Monitor Thread is already running.");
}
}
/**
* Returns true if the Replication Monitor Thread is running.
*
* @return true if running, false otherwise
*/
public boolean isRunning() {
if (!running) {
synchronized (this) {
return replicationMonitor != null
&& replicationMonitor.isAlive();
}
}
return true;
}
/**
* Process all the containers immediately.
*/
@VisibleForTesting
@SuppressFBWarnings(value="NN_NAKED_NOTIFY",
justification="Used only for testing")
public synchronized void processContainersNow() {
notify();
}
/**
* Stops Replication Monitor thread.
*/
public synchronized void stop() {
if (running) {
LOG.info("Stopping Replication Monitor Thread.");
inflightReplication.clear();
inflightDeletion.clear();
running = false;
notify();
} else {
LOG.info("Replication Monitor Thread is not running.");
}
}
/**
* ReplicationMonitor thread runnable. This wakes up at configured
* interval and processes all the containers in the system.
*/
private synchronized void run() {
try {
while (running) {
final long start = Time.monotonicNow();
final Set<ContainerID> containerIds =
containerManager.getContainerIDs();
containerIds.forEach(this::processContainer);
LOG.info("Replication Monitor Thread took {} milliseconds for" +
" processing {} containers.", Time.monotonicNow() - start,
containerIds.size());
wait(conf.getInterval());
}
} catch (Throwable t) {
// When we get runtime exception, we should terminate SCM.
LOG.error("Exception in Replication Monitor Thread.", t);
ExitUtil.terminate(1, t);
}
}
/**
* Process the given container.
*
* @param id ContainerID
*/
private void processContainer(ContainerID id) {
lockManager.lock(id);
try {
final ContainerInfo container = containerManager.getContainer(id);
final Set<ContainerReplica> replicas = containerManager
.getContainerReplicas(container.containerID());
final LifeCycleState state = container.getState();
/*
* We don't take any action if the container is in OPEN state.
*/
if (state == LifeCycleState.OPEN) {
return;
}
/*
* If the container is in CLOSING state, the replicas can either
* be in OPEN or in CLOSING state. In both of this cases
* we have to resend close container command to the datanodes.
*/
if (state == LifeCycleState.CLOSING) {
replicas.forEach(replica -> sendCloseCommand(
container, replica.getDatanodeDetails(), false));
return;
}
/*
* If the container is in QUASI_CLOSED state, check and close the
* container if possible.
*/
if (state == LifeCycleState.QUASI_CLOSED &&
canForceCloseContainer(container, replicas)) {
forceCloseContainer(container, replicas);
return;
}
/*
* Before processing the container we have to reconcile the
* inflightReplication and inflightDeletion actions.
*
* We remove the entry from inflightReplication and inflightDeletion
* list, if the operation is completed or if it has timed out.
*/
updateInflightAction(container, inflightReplication,
action -> replicas.stream()
.anyMatch(r -> r.getDatanodeDetails().equals(action.datanode)));
updateInflightAction(container, inflightDeletion,
action -> replicas.stream()
.noneMatch(r -> r.getDatanodeDetails().equals(action.datanode)));
/*
* We don't have to take any action if the container is healthy.
*
* According to ReplicationMonitor container is considered healthy if
* the container is either in QUASI_CLOSED or in CLOSED state and has
* exact number of replicas in the same state.
*/
if (isContainerHealthy(container, replicas)) {
return;
}
/*
* Check if the container is under replicated and take appropriate
* action.
*/
if (isContainerUnderReplicated(container, replicas)) {
handleUnderReplicatedContainer(container, replicas);
return;
}
/*
* Check if the container is over replicated and take appropriate
* action.
*/
if (isContainerOverReplicated(container, replicas)) {
handleOverReplicatedContainer(container, replicas);
return;
}
/*
* The container is neither under nor over replicated and the container
* is not healthy. This means that the container has unhealthy/corrupted
* replica.
*/
handleUnstableContainer(container, replicas);
} catch (ContainerNotFoundException ex) {
LOG.warn("Missing container {}.", id);
} finally {
lockManager.unlock(id);
}
}
/**
* Reconciles the InflightActions for a given container.
*
* @param container Container to update
* @param inflightActions inflightReplication (or) inflightDeletion
* @param filter filter to check if the operation is completed
*/
private void updateInflightAction(final ContainerInfo container,
final Map<ContainerID, List<InflightAction>> inflightActions,
final Predicate<InflightAction> filter) {
final ContainerID id = container.containerID();
final long deadline = Time.monotonicNow() - conf.getEventTimeout();
if (inflightActions.containsKey(id)) {
final List<InflightAction> actions = inflightActions.get(id);
actions.removeIf(action -> action.time < deadline);
actions.removeIf(filter);
if (actions.isEmpty()) {
inflightActions.remove(id);
}
}
}
/**
* Returns true if the container is healthy according to ReplicationMonitor.
*
* According to ReplicationMonitor container is considered healthy if
* it has exact number of replicas in the same state as the container.
*
* @param container Container to check
* @param replicas Set of ContainerReplicas
* @return true if the container is healthy, false otherwise
*/
private boolean isContainerHealthy(final ContainerInfo container,
final Set<ContainerReplica> replicas) {
return container.getReplicationFactor().getNumber() == replicas.size() &&
replicas.stream().allMatch(
r -> compareState(container.getState(), r.getState()));
}
/**
* Checks if the container is under replicated or not.
*
* @param container Container to check
* @param replicas Set of ContainerReplicas
* @return true if the container is under replicated, false otherwise
*/
private boolean isContainerUnderReplicated(final ContainerInfo container,
final Set<ContainerReplica> replicas) {
return container.getReplicationFactor().getNumber() >
getReplicaCount(container.containerID(), replicas);
}
/**
* Checks if the container is over replicated or not.
*
* @param container Container to check
* @param replicas Set of ContainerReplicas
* @return true if the container if over replicated, false otherwise
*/
private boolean isContainerOverReplicated(final ContainerInfo container,
final Set<ContainerReplica> replicas) {
return container.getReplicationFactor().getNumber() <
getReplicaCount(container.containerID(), replicas);
}
/**
* Returns the replication count of the given container. This also
* considers inflight replication and deletion.
*
* @param id ContainerID
* @param replicas Set of existing replicas
* @return number of estimated replicas for this container
*/
private int getReplicaCount(final ContainerID id,
final Set<ContainerReplica> replicas) {
return replicas.size()
+ inflightReplication.getOrDefault(id, Collections.emptyList()).size()
- inflightDeletion.getOrDefault(id, Collections.emptyList()).size();
}
/**
* Returns true if more than 50% of the container replicas with unique
* originNodeId are in QUASI_CLOSED state.
*
* @param container Container to check
* @param replicas Set of ContainerReplicas
* @return true if we can force close the container, false otherwise
*/
private boolean canForceCloseContainer(final ContainerInfo container,
final Set<ContainerReplica> replicas) {
Preconditions.assertTrue(container.getState() ==
LifeCycleState.QUASI_CLOSED);
final int replicationFactor = container.getReplicationFactor().getNumber();
final long uniqueQuasiClosedReplicaCount = replicas.stream()
.filter(r -> r.getState() == State.QUASI_CLOSED)
.map(ContainerReplica::getOriginDatanodeId)
.distinct()
.count();
return uniqueQuasiClosedReplicaCount > (replicationFactor / 2);
}
/**
* Force close the container replica(s) with highest sequence Id.
*
* <p>
* Note: We should force close the container only if >50% (quorum)
* of replicas with unique originNodeId are in QUASI_CLOSED state.
* </p>
*
* @param container ContainerInfo
* @param replicas Set of ContainerReplicas
*/
private void forceCloseContainer(final ContainerInfo container,
final Set<ContainerReplica> replicas) {
Preconditions.assertTrue(container.getState() ==
LifeCycleState.QUASI_CLOSED);
final List<ContainerReplica> quasiClosedReplicas = replicas.stream()
.filter(r -> r.getState() == State.QUASI_CLOSED)
.collect(Collectors.toList());
final Long sequenceId = quasiClosedReplicas.stream()
.map(ContainerReplica::getSequenceId)
.max(Long::compare)
.orElse(-1L);
LOG.info("Force closing container {} with BCSID {}," +
" which is in QUASI_CLOSED state.",
container.containerID(), sequenceId);
quasiClosedReplicas.stream()
.filter(r -> sequenceId != -1L)
.filter(replica -> replica.getSequenceId().equals(sequenceId))
.forEach(replica -> sendCloseCommand(
container, replica.getDatanodeDetails(), true));
}
/**
* If the given container is under replicated, identify a new set of
* datanode(s) to replicate the container using ContainerPlacementPolicy
* and send replicate container command to the identified datanode(s).
*
* @param container ContainerInfo
* @param replicas Set of ContainerReplicas
*/
private void handleUnderReplicatedContainer(final ContainerInfo container,
final Set<ContainerReplica> replicas) {
LOG.debug("Handling underreplicated container: {}",
container.getContainerID());
try {
final ContainerID id = container.containerID();
final List<DatanodeDetails> deletionInFlight = inflightDeletion
.getOrDefault(id, Collections.emptyList())
.stream()
.map(action -> action.datanode)
.collect(Collectors.toList());
final List<DatanodeDetails> source = replicas.stream()
.filter(r ->
r.getState() == State.QUASI_CLOSED ||
r.getState() == State.CLOSED)
.filter(r -> !deletionInFlight.contains(r.getDatanodeDetails()))
.sorted((r1, r2) -> r2.getSequenceId().compareTo(r1.getSequenceId()))
.map(ContainerReplica::getDatanodeDetails)
.collect(Collectors.toList());
if (source.size() > 0) {
final int replicationFactor = container
.getReplicationFactor().getNumber();
final int delta = replicationFactor - getReplicaCount(id, replicas);
final List<DatanodeDetails> excludeList = replicas.stream()
.map(ContainerReplica::getDatanodeDetails)
.collect(Collectors.toList());
List<InflightAction> actionList = inflightReplication.get(id);
if (actionList != null) {
actionList.stream().map(r -> r.datanode)
.forEach(excludeList::add);
}
final List<DatanodeDetails> selectedDatanodes = containerPlacement
.chooseDatanodes(excludeList, null, delta,
container.getUsedBytes());
LOG.info("Container {} is under replicated. Expected replica count" +
" is {}, but found {}.", id, replicationFactor,
replicationFactor - delta);
for (DatanodeDetails datanode : selectedDatanodes) {
sendReplicateCommand(container, datanode, source);
}
} else {
LOG.warn("Cannot replicate container {}, no healthy replica found.",
container.containerID());
}
} catch (IOException ex) {
LOG.warn("Exception while replicating container {}.",
container.getContainerID(), ex);
}
}
/**
* If the given container is over replicated, identify the datanode(s)
* to delete the container and send delete container command to the
* identified datanode(s).
*
* @param container ContainerInfo
* @param replicas Set of ContainerReplicas
*/
private void handleOverReplicatedContainer(final ContainerInfo container,
final Set<ContainerReplica> replicas) {
final ContainerID id = container.containerID();
final int replicationFactor = container.getReplicationFactor().getNumber();
// Dont consider inflight replication while calculating excess here.
final int excess = replicas.size() - replicationFactor -
inflightDeletion.getOrDefault(id, Collections.emptyList()).size();
if (excess > 0) {
LOG.info("Container {} is over replicated. Expected replica count" +
" is {}, but found {}.", id, replicationFactor,
replicationFactor + excess);
final Map<UUID, ContainerReplica> uniqueReplicas =
new LinkedHashMap<>();
replicas.stream()
.filter(r -> compareState(container.getState(), r.getState()))
.forEach(r -> uniqueReplicas
.putIfAbsent(r.getOriginDatanodeId(), r));
// Retain one healthy replica per origin node Id.
final List<ContainerReplica> eligibleReplicas = new ArrayList<>(replicas);
eligibleReplicas.removeAll(uniqueReplicas.values());
final List<ContainerReplica> unhealthyReplicas = eligibleReplicas
.stream()
.filter(r -> !compareState(container.getState(), r.getState()))
.collect(Collectors.toList());
//Move the unhealthy replicas to the front of eligible replicas to delete
eligibleReplicas.removeAll(unhealthyReplicas);
eligibleReplicas.addAll(0, unhealthyReplicas);
for (int i = 0; i < excess; i++) {
sendDeleteCommand(container,
eligibleReplicas.get(i).getDatanodeDetails(), true);
}
}
}
/**
* Handles unstable container.
* A container is inconsistent if any of the replica state doesn't
* match the container state. We have to take appropriate action
* based on state of the replica.
*
* @param container ContainerInfo
* @param replicas Set of ContainerReplicas
*/
private void handleUnstableContainer(final ContainerInfo container,
final Set<ContainerReplica> replicas) {
// Find unhealthy replicas
List<ContainerReplica> unhealthyReplicas = replicas.stream()
.filter(r -> !compareState(container.getState(), r.getState()))
.collect(Collectors.toList());
Iterator<ContainerReplica> iterator = unhealthyReplicas.iterator();
while (iterator.hasNext()) {
final ContainerReplica replica = iterator.next();
final State state = replica.getState();
if (state == State.OPEN || state == State.CLOSING) {
sendCloseCommand(container, replica.getDatanodeDetails(), false);
iterator.remove();
}
if (state == State.QUASI_CLOSED) {
// Send force close command if the BCSID matches
if (container.getSequenceId() == replica.getSequenceId()) {
sendCloseCommand(container, replica.getDatanodeDetails(), true);
iterator.remove();
}
}
}
// Now we are left with the replicas which are either unhealthy or
// the BCSID doesn't match. These replicas should be deleted.
/*
* If we have unhealthy replicas we go under replicated and then
* replicate the healthy copy.
*
* We also make sure that we delete only one unhealthy replica at a time.
*
* If there are two unhealthy replica:
* - Delete first unhealthy replica
* - Re-replicate the healthy copy
* - Delete second unhealthy replica
* - Re-replicate the healthy copy
*
* Note: Only one action will be executed in a single ReplicationMonitor
* iteration. So to complete all the above actions we need four
* ReplicationMonitor iterations.
*/
unhealthyReplicas.stream().findFirst().ifPresent(replica ->
sendDeleteCommand(container, replica.getDatanodeDetails(), false));
}
/**
* Sends close container command for the given container to the given
* datanode.
*
* @param container Container to be closed
* @param datanode The datanode on which the container
* has to be closed
* @param force Should be set to true if we want to close a
* QUASI_CLOSED container
*/
private void sendCloseCommand(final ContainerInfo container,
final DatanodeDetails datanode,
final boolean force) {
LOG.info("Sending close container command for container {}" +
" to datanode {}.", container.containerID(), datanode);
CloseContainerCommand closeContainerCommand =
new CloseContainerCommand(container.getContainerID(),
container.getPipelineID(), force);
eventPublisher.fireEvent(SCMEvents.DATANODE_COMMAND,
new CommandForDatanode<>(datanode.getUuid(), closeContainerCommand));
}
/**
* Sends replicate container command for the given container to the given
* datanode.
*
* @param container Container to be replicated
* @param datanode The destination datanode to replicate
* @param sources List of source nodes from where we can replicate
*/
private void sendReplicateCommand(final ContainerInfo container,
final DatanodeDetails datanode,
final List<DatanodeDetails> sources) {
LOG.info("Sending replicate container command for container {}" +
" to datanode {}", container.containerID(), datanode);
final ContainerID id = container.containerID();
final ReplicateContainerCommand replicateCommand =
new ReplicateContainerCommand(id.getId(), sources);
inflightReplication.computeIfAbsent(id, k -> new ArrayList<>());
sendAndTrackDatanodeCommand(datanode, replicateCommand,
action -> inflightReplication.get(id).add(action));
}
/**
* Sends delete container command for the given container to the given
* datanode.
*
* @param container Container to be deleted
* @param datanode The datanode on which the replica should be deleted
* @param force Should be set to true to delete an OPEN replica
*/
private void sendDeleteCommand(final ContainerInfo container,
final DatanodeDetails datanode,
final boolean force) {
LOG.info("Sending delete container command for container {}" +
" to datanode {}", container.containerID(), datanode);
final ContainerID id = container.containerID();
final DeleteContainerCommand deleteCommand =
new DeleteContainerCommand(id.getId(), force);
inflightDeletion.computeIfAbsent(id, k -> new ArrayList<>());
sendAndTrackDatanodeCommand(datanode, deleteCommand,
action -> inflightDeletion.get(id).add(action));
}
/**
* Creates CommandForDatanode with the given SCMCommand and fires
* DATANODE_COMMAND event to event queue.
*
* Tracks the command using the given tracker.
*
* @param datanode Datanode to which the command has to be sent
* @param command SCMCommand to be sent
* @param tracker Tracker which tracks the inflight actions
* @param <T> Type of SCMCommand
*/
private <T extends GeneratedMessage> void sendAndTrackDatanodeCommand(
final DatanodeDetails datanode,
final SCMCommand<T> command,
final Consumer<InflightAction> tracker) {
final CommandForDatanode<T> datanodeCommand =
new CommandForDatanode<>(datanode.getUuid(), command);
eventPublisher.fireEvent(SCMEvents.DATANODE_COMMAND, datanodeCommand);
tracker.accept(new InflightAction(datanode, Time.monotonicNow()));
}
/**
* Compares the container state with the replica state.
*
* @param containerState ContainerState
* @param replicaState ReplicaState
* @return true if the state matches, false otherwise
*/
private static boolean compareState(final LifeCycleState containerState,
final State replicaState) {
switch (containerState) {
case OPEN:
return replicaState == State.OPEN;
case CLOSING:
return replicaState == State.CLOSING;
case QUASI_CLOSED:
return replicaState == State.QUASI_CLOSED;
case CLOSED:
return replicaState == State.CLOSED;
case DELETING:
return false;
case DELETED:
return false;
default:
return false;
}
}
@Override
public void getMetrics(MetricsCollector collector, boolean all) {
collector.addRecord(ReplicationManager.class.getSimpleName())
.addGauge(ReplicationManagerMetrics.INFLIGHT_REPLICATION,
inflightReplication.size())
.addGauge(ReplicationManagerMetrics.INFLIGHT_DELETION,
inflightDeletion.size())
.endRecord();
}
/**
* Wrapper class to hold the InflightAction with its start time.
*/
private static final class InflightAction {
private final DatanodeDetails datanode;
private final long time;
private InflightAction(final DatanodeDetails datanode,
final long time) {
this.datanode = datanode;
this.time = time;
}
}
/**
* Configuration used by the Replication Manager.
*/
@ConfigGroup(prefix = "hdds.scm.replication")
public static class ReplicationManagerConfiguration {
/**
* The frequency in which ReplicationMonitor thread should run.
*/
private long interval = 5 * 60 * 1000;
/**
* Timeout for container replication & deletion command issued by
* ReplicationManager.
*/
private long eventTimeout = 10 * 60 * 1000;
@Config(key = "thread.interval",
type = ConfigType.TIME,
defaultValue = "300s",
tags = {SCM, OZONE},
description = "When a heartbeat from the data node arrives on SCM, "
+ "It is queued for processing with the time stamp of when the "
+ "heartbeat arrived. There is a heartbeat processing thread "
+ "inside "
+ "SCM that runs at a specified interval. This value controls how "
+ "frequently this thread is run.\n\n"
+ "There are some assumptions build into SCM such as this "
+ "value should allow the heartbeat processing thread to run at "
+ "least three times more frequently than heartbeats and at least "
+ "five times more than stale node detection time. "
+ "If you specify a wrong value, SCM will gracefully refuse to "
+ "run. "
+ "For more info look at the node manager tests in SCM.\n"
+ "\n"
+ "In short, you don't need to change this."
)
public void setInterval(long interval) {
this.interval = interval;
}
@Config(key = "event.timeout",
type = ConfigType.TIME,
defaultValue = "10m",
tags = {SCM, OZONE},
description = "Timeout for the container replication/deletion commands "
+ "sent to datanodes. After this timeout the command will be "
+ "retried.")
public void setEventTimeout(long eventTimeout) {
this.eventTimeout = eventTimeout;
}
public long getInterval() {
return interval;
}
public long getEventTimeout() {
return eventTimeout;
}
}
/**
* Metric name definitions for Replication manager.
*/
public enum ReplicationManagerMetrics implements MetricsInfo {
INFLIGHT_REPLICATION("Tracked inflight container replication requests."),
INFLIGHT_DELETION("Tracked inflight container deletion requests.");
private final String desc;
ReplicationManagerMetrics(String desc) {
this.desc = desc;
}
@Override
public String description() {
return desc;
}
@Override
public String toString() {
return new StringJoiner(", ", this.getClass().getSimpleName() + "{", "}")
.add("name=" + name())
.add("description=" + desc)
.toString();
}
}
}