blob: 17e1fedd95251894a1b3be6c202c9381f22c0dff [file] [log] [blame]
/**
* Licensed to the Apache Software Foundation (ASF) under one
* or more contributor license agreements. See the NOTICE file
* distributed with this work for additional information
* regarding copyright ownership. The ASF licenses this file
* to you under the Apache License, Version 2.0 (the
* "License"); you may not use this file except in compliance
* with the License. You may obtain a copy of the License at
* <p>
* http://www.apache.org/licenses/LICENSE-2.0
* <p>
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/
package org.apache.hadoop.hdds.scm.node;
import java.io.IOException;
import java.util.Optional;
import org.apache.hadoop.hdds.protocol.DatanodeDetails;
import org.apache.hadoop.hdds.protocol.proto.HddsProtos;
import org.apache.hadoop.hdds.scm.container.ContainerException;
import org.apache.hadoop.hdds.scm.container.ContainerInfo;
import org.apache.hadoop.hdds.scm.container.ContainerManager;
import org.apache.hadoop.hdds.scm.container.ContainerNotFoundException;
import org.apache.hadoop.hdds.scm.node.states.NodeNotFoundException;
import org.apache.hadoop.hdds.scm.pipeline.PipelineManager;
import org.apache.hadoop.hdds.scm.pipeline.PipelineNotFoundException;
import org.apache.hadoop.hdds.server.events.EventHandler;
import org.apache.hadoop.hdds.server.events.EventPublisher;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
import static org.apache.hadoop.hdds.scm.events.SCMEvents.CLOSE_CONTAINER;
/**
* Handles Dead Node event.
*/
public class DeadNodeHandler implements EventHandler<DatanodeDetails> {
private final NodeManager nodeManager;
private final PipelineManager pipelineManager;
private final ContainerManager containerManager;
private static final Logger LOG =
LoggerFactory.getLogger(DeadNodeHandler.class);
public DeadNodeHandler(final NodeManager nodeManager,
final PipelineManager pipelineManager,
final ContainerManager containerManager) {
this.nodeManager = nodeManager;
this.pipelineManager = pipelineManager;
this.containerManager = containerManager;
}
@Override
public void onMessage(final DatanodeDetails datanodeDetails,
final EventPublisher publisher) {
try {
/*
* We should have already destroyed all the pipelines on this datanode
* when it was marked as stale. Destroy pipeline should also have closed
* all the containers on this datanode.
*
* Ideally we should not have any pipeline or OPEN containers now.
*
* To be on a safer side, we double check here and take appropriate
* action.
*/
destroyPipelines(datanodeDetails);
closeContainers(datanodeDetails, publisher);
// Remove the container replicas associated with the dead node.
removeContainerReplicas(datanodeDetails);
} catch (NodeNotFoundException ex) {
// This should not happen, we cannot get a dead node event for an
// unregistered datanode!
LOG.error("DeadNode event for a unregistered node: {}!", datanodeDetails);
}
}
/**
* Destroys all the pipelines on the given datanode if there are any.
*
* @param datanodeDetails DatanodeDetails
*/
private void destroyPipelines(final DatanodeDetails datanodeDetails) {
Optional.ofNullable(nodeManager.getPipelines(datanodeDetails))
.ifPresent(pipelines ->
pipelines.forEach(id -> {
try {
pipelineManager.finalizeAndDestroyPipeline(
pipelineManager.getPipeline(id), false);
} catch (PipelineNotFoundException ignore) {
// Pipeline is not there in pipeline manager,
// should we care?
} catch (IOException ex) {
LOG.warn("Exception while finalizing pipeline {}",
id, ex);
}
}));
}
/**
* Sends CloseContainerCommand to all the open containers on the
* given datanode.
*
* @param datanodeDetails DatanodeDetails
* @param publisher EventPublisher
* @throws NodeNotFoundException
*/
private void closeContainers(final DatanodeDetails datanodeDetails,
final EventPublisher publisher)
throws NodeNotFoundException {
nodeManager.getContainers(datanodeDetails)
.forEach(id -> {
try {
final ContainerInfo container = containerManager.getContainer(id);
if (container.getState() == HddsProtos.LifeCycleState.OPEN) {
publisher.fireEvent(CLOSE_CONTAINER, id);
}
} catch (ContainerNotFoundException cnfe) {
LOG.warn("Container {} is not managed by ContainerManager.",
id, cnfe);
}
});
}
/**
* Removes the ContainerReplica of the dead datanode from the containers
* which are hosted by that datanode.
*
* @param datanodeDetails DatanodeDetails
* @throws NodeNotFoundException
*/
private void removeContainerReplicas(final DatanodeDetails datanodeDetails)
throws NodeNotFoundException {
nodeManager.getContainers(datanodeDetails)
.forEach(id -> {
try {
final ContainerInfo container = containerManager.getContainer(id);
// Identify and remove the ContainerReplica of dead node
containerManager.getContainerReplicas(id)
.stream()
.filter(r -> r.getDatanodeDetails().equals(datanodeDetails))
.findFirst()
.ifPresent(replica -> {
try {
containerManager.removeContainerReplica(id, replica);
} catch (ContainerException ex) {
LOG.warn("Exception while removing container replica #{} " +
"of container {}.", replica, container, ex);
}
});
} catch (ContainerNotFoundException cnfe) {
LOG.warn("Container {} is not managed by ContainerManager.",
id, cnfe);
}
});
}
}