blob: 2227df6156370dbea02ffa6f245448ced3f01175 [file] [log] [blame]
/**
* Licensed to the Apache Software Foundation (ASF) under one
* or more contributor license agreements. See the NOTICE file
* distributed with this work for additional information
* regarding copyright ownership. The ASF licenses this file
* to you under the Apache License, Version 2.0 (the
* "License"); you may not use this file except in compliance
* with the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/
package org.apache.hadoop.hdds.scm.container;
import org.apache.hadoop.hdds.protocol.DatanodeDetails;
import org.apache.hadoop.hdds.protocol.proto
.StorageContainerDatanodeProtocolProtos.ContainerReplicaProto;
import org.apache.hadoop.hdds.protocol.proto
.StorageContainerDatanodeProtocolProtos.ContainerReportsProto;
import org.apache.hadoop.hdds.scm.block.PendingDeleteStatusList;
import org.apache.hadoop.hdds.scm.events.SCMEvents;
import org.apache.hadoop.hdds.scm.node.NodeManager;
import org.apache.hadoop.hdds.scm.node.states.NodeNotFoundException;
import org.apache.hadoop.hdds.scm.server.SCMDatanodeHeartbeatDispatcher
.ContainerReportFromDatanode;
import org.apache.hadoop.hdds.server.events.EventHandler;
import org.apache.hadoop.hdds.server.events.EventPublisher;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
import java.io.IOException;
import java.util.HashSet;
import java.util.List;
import java.util.Set;
import java.util.stream.Collectors;
/**
* Handles container reports from datanode.
*/
public class ContainerReportHandler extends AbstractContainerReportHandler
implements EventHandler<ContainerReportFromDatanode> {
private static final Logger LOG =
LoggerFactory.getLogger(ContainerReportHandler.class);
private final NodeManager nodeManager;
private final ContainerManager containerManager;
/**
* Constructs ContainerReportHandler instance with the
* given NodeManager and ContainerManager instance.
*
* @param nodeManager NodeManager instance
* @param containerManager ContainerManager instance
*/
public ContainerReportHandler(final NodeManager nodeManager,
final ContainerManager containerManager) {
super(containerManager, LOG);
this.nodeManager = nodeManager;
this.containerManager = containerManager;
}
/**
* Process the container reports from datanodes.
*
* @param reportFromDatanode Container Report
* @param publisher EventPublisher reference
*/
@Override
public void onMessage(final ContainerReportFromDatanode reportFromDatanode,
final EventPublisher publisher) {
final DatanodeDetails datanodeDetails =
reportFromDatanode.getDatanodeDetails();
final ContainerReportsProto containerReport =
reportFromDatanode.getReport();
try {
final List<ContainerReplicaProto> replicas =
containerReport.getReportsList();
final Set<ContainerID> containersInSCM =
nodeManager.getContainers(datanodeDetails);
final Set<ContainerID> containersInDn = replicas.parallelStream()
.map(ContainerReplicaProto::getContainerID)
.map(ContainerID::valueof).collect(Collectors.toSet());
final Set<ContainerID> missingReplicas = new HashSet<>(containersInSCM);
missingReplicas.removeAll(containersInDn);
processContainerReplicas(datanodeDetails, replicas);
processMissingReplicas(datanodeDetails, missingReplicas);
updateDeleteTransaction(datanodeDetails, replicas, publisher);
/*
* Update the latest set of containers for this datanode in
* NodeManager
*/
nodeManager.setContainers(datanodeDetails, containersInDn);
containerManager.notifyContainerReportProcessing(true, true);
} catch (NodeNotFoundException ex) {
containerManager.notifyContainerReportProcessing(true, false);
LOG.error("Received container report from unknown datanode {} {}",
datanodeDetails, ex);
}
}
/**
* Processes the ContainerReport.
*
* @param datanodeDetails Datanode from which this report was received
* @param replicas list of ContainerReplicaProto
*/
private void processContainerReplicas(final DatanodeDetails datanodeDetails,
final List<ContainerReplicaProto> replicas) {
for (ContainerReplicaProto replicaProto : replicas) {
try {
processContainerReplica(datanodeDetails, replicaProto);
} catch (ContainerNotFoundException e) {
LOG.error("Received container report for an unknown container" +
" {} from datanode {}.", replicaProto.getContainerID(),
datanodeDetails, e);
} catch (IOException e) {
LOG.error("Exception while processing container report for container" +
" {} from datanode {}.", replicaProto.getContainerID(),
datanodeDetails, e);
}
}
}
/**
* Process the missing replica on the given datanode.
*
* @param datanodeDetails DatanodeDetails
* @param missingReplicas ContainerID which are missing on the given datanode
*/
private void processMissingReplicas(final DatanodeDetails datanodeDetails,
final Set<ContainerID> missingReplicas) {
for (ContainerID id : missingReplicas) {
try {
containerManager.getContainerReplicas(id).stream()
.filter(replica -> replica.getDatanodeDetails()
.equals(datanodeDetails)).findFirst()
.ifPresent(replica -> {
try {
containerManager.removeContainerReplica(id, replica);
} catch (ContainerNotFoundException |
ContainerReplicaNotFoundException ignored) {
// This should not happen, but even if it happens, not an issue
}
});
} catch (ContainerNotFoundException e) {
LOG.warn("Cannot remove container replica, container {} not found.",
id, e);
}
}
}
/**
* Updates the Delete Transaction Id for the given datanode.
*
* @param datanodeDetails DatanodeDetails
* @param replicas List of ContainerReplicaProto
* @param publisher EventPublisher reference
*/
private void updateDeleteTransaction(final DatanodeDetails datanodeDetails,
final List<ContainerReplicaProto> replicas,
final EventPublisher publisher) {
final PendingDeleteStatusList pendingDeleteStatusList =
new PendingDeleteStatusList(datanodeDetails);
for (ContainerReplicaProto replica : replicas) {
try {
final ContainerInfo containerInfo = containerManager.getContainer(
ContainerID.valueof(replica.getContainerID()));
if (containerInfo.getDeleteTransactionId() >
replica.getDeleteTransactionId()) {
pendingDeleteStatusList.addPendingDeleteStatus(
replica.getDeleteTransactionId(),
containerInfo.getDeleteTransactionId(),
containerInfo.getContainerID());
}
} catch (ContainerNotFoundException cnfe) {
LOG.warn("Cannot update pending delete transaction for " +
"container #{}. Reason: container missing.",
replica.getContainerID());
}
}
if (pendingDeleteStatusList.getNumPendingDeletes() > 0) {
publisher.fireEvent(SCMEvents.PENDING_DELETE_STATUS,
pendingDeleteStatusList);
}
}
}