blob: 9f6077b4f706f6e2db19e083b543d4253ce9ee9a [file] [log] [blame]
/**
* Licensed to the Apache Software Foundation (ASF) under one or more
* contributor license agreements. See the NOTICE file distributed with this
* work for additional information regarding copyright ownership. The ASF
* licenses this file to you under the Apache License, Version 2.0 (the
* "License"); you may not use this file except in compliance with the License.
* You may obtain a copy of the License at
* <p>
* http://www.apache.org/licenses/LICENSE-2.0
* <p>
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS, WITHOUT
* WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. See the
* License for the specific language governing permissions and limitations under
* the License.
*/
package org.apache.hadoop.hdds.scm.server;
import com.google.common.base.Preconditions;
import org.apache.hadoop.hdds.protocol.DatanodeDetails;
import org.apache.hadoop.hdds.protocol.proto
.StorageContainerDatanodeProtocolProtos.IncrementalContainerReportProto;
import org.apache.hadoop.hdds.protocol.proto
.StorageContainerDatanodeProtocolProtos.PipelineReportsProto;
import org.apache.hadoop.hdds.protocol.proto
.StorageContainerDatanodeProtocolProtos.PipelineActionsProto;
import org.apache.hadoop.hdds.protocol.proto
.StorageContainerDatanodeProtocolProtos.ContainerActionsProto;
import org.apache.hadoop.hdds.protocol.proto.
StorageContainerDatanodeProtocolProtos.CommandStatusReportsProto;
import org.apache.hadoop.hdds.protocol.proto
.StorageContainerDatanodeProtocolProtos.ContainerReportsProto;
import org.apache.hadoop.hdds.protocol.proto
.StorageContainerDatanodeProtocolProtos.NodeReportProto;
import org.apache.hadoop.hdds.protocol.proto
.StorageContainerDatanodeProtocolProtos.SCMHeartbeatRequestProto;
import org.apache.hadoop.hdds.scm.node.NodeManager;
import org.apache.hadoop.hdds.server.events.EventPublisher;
import org.apache.hadoop.ozone.protocol.commands.ReregisterCommand;
import org.apache.hadoop.ozone.protocol.commands.SCMCommand;
import com.google.protobuf.GeneratedMessage;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
import java.util.List;
import java.util.UUID;
import static org.apache.hadoop.hdds.scm.events.SCMEvents.CONTAINER_ACTIONS;
import static org.apache.hadoop.hdds.scm.events.SCMEvents.CONTAINER_REPORT;
import static org.apache.hadoop.hdds.scm.events.SCMEvents
.INCREMENTAL_CONTAINER_REPORT;
import static org.apache.hadoop.hdds.scm.events.SCMEvents.NODE_REPORT;
import static org.apache.hadoop.hdds.scm.events.SCMEvents.CMD_STATUS_REPORT;
import static org.apache.hadoop.hdds.scm.events.SCMEvents.PIPELINE_ACTIONS;
import static org.apache.hadoop.hdds.scm.events.SCMEvents.PIPELINE_REPORT;
/**
* This class is responsible for dispatching heartbeat from datanode to
* appropriate EventHandler at SCM.
*/
public final class SCMDatanodeHeartbeatDispatcher {
private static final Logger LOG =
LoggerFactory.getLogger(SCMDatanodeHeartbeatDispatcher.class);
private final NodeManager nodeManager;
private final EventPublisher eventPublisher;
public SCMDatanodeHeartbeatDispatcher(NodeManager nodeManager,
EventPublisher eventPublisher) {
Preconditions.checkNotNull(nodeManager);
Preconditions.checkNotNull(eventPublisher);
this.nodeManager = nodeManager;
this.eventPublisher = eventPublisher;
}
/**
* Dispatches heartbeat to registered event handlers.
*
* @param heartbeat heartbeat to be dispatched.
*
* @return list of SCMCommand
*/
public List<SCMCommand> dispatch(SCMHeartbeatRequestProto heartbeat) {
DatanodeDetails datanodeDetails =
DatanodeDetails.getFromProtoBuf(heartbeat.getDatanodeDetails());
List<SCMCommand> commands;
// If node is not registered, ask the node to re-register. Do not process
// Heartbeat for unregistered nodes.
if (!nodeManager.isNodeRegistered(datanodeDetails)) {
LOG.info("SCM received heartbeat from an unregistered datanode {}. " +
"Asking datanode to re-register.", datanodeDetails);
UUID dnID = datanodeDetails.getUuid();
nodeManager.addDatanodeCommand(dnID, new ReregisterCommand());
commands = nodeManager.getCommandQueue(dnID);
} else {
// should we dispatch heartbeat through eventPublisher?
commands = nodeManager.processHeartbeat(datanodeDetails);
if (heartbeat.hasNodeReport()) {
LOG.debug("Dispatching Node Report.");
eventPublisher.fireEvent(
NODE_REPORT,
new NodeReportFromDatanode(
datanodeDetails,
heartbeat.getNodeReport()));
}
if (heartbeat.hasContainerReport()) {
LOG.debug("Dispatching Container Report.");
eventPublisher.fireEvent(
CONTAINER_REPORT,
new ContainerReportFromDatanode(
datanodeDetails,
heartbeat.getContainerReport()));
}
final List<IncrementalContainerReportProto> icrs =
heartbeat.getIncrementalContainerReportList();
if (icrs.size() > 0) {
LOG.debug("Dispatching ICRs.");
for (IncrementalContainerReportProto icr : icrs) {
eventPublisher.fireEvent(INCREMENTAL_CONTAINER_REPORT,
new IncrementalContainerReportFromDatanode(
datanodeDetails, icr));
}
}
if (heartbeat.hasContainerActions()) {
LOG.debug("Dispatching Container Actions.");
eventPublisher.fireEvent(
CONTAINER_ACTIONS,
new ContainerActionsFromDatanode(
datanodeDetails,
heartbeat.getContainerActions()));
}
if (heartbeat.hasPipelineReports()) {
LOG.debug("Dispatching Pipeline Report.");
eventPublisher.fireEvent(
PIPELINE_REPORT,
new PipelineReportFromDatanode(
datanodeDetails,
heartbeat.getPipelineReports()));
}
if (heartbeat.hasPipelineActions()) {
LOG.debug("Dispatching Pipeline Actions.");
eventPublisher.fireEvent(
PIPELINE_ACTIONS,
new PipelineActionsFromDatanode(
datanodeDetails,
heartbeat.getPipelineActions()));
}
if (heartbeat.getCommandStatusReportsCount() != 0) {
for (CommandStatusReportsProto commandStatusReport : heartbeat
.getCommandStatusReportsList()) {
eventPublisher.fireEvent(
CMD_STATUS_REPORT,
new CommandStatusReportFromDatanode(
datanodeDetails,
commandStatusReport));
}
}
}
return commands;
}
/**
* Wrapper class for events with the datanode origin.
*/
public static class ReportFromDatanode<T extends GeneratedMessage> {
private final DatanodeDetails datanodeDetails;
private final T report;
public ReportFromDatanode(DatanodeDetails datanodeDetails, T report) {
this.datanodeDetails = datanodeDetails;
this.report = report;
}
public DatanodeDetails getDatanodeDetails() {
return datanodeDetails;
}
public T getReport() {
return report;
}
}
/**
* Node report event payload with origin.
*/
public static class NodeReportFromDatanode
extends ReportFromDatanode<NodeReportProto> {
public NodeReportFromDatanode(DatanodeDetails datanodeDetails,
NodeReportProto report) {
super(datanodeDetails, report);
}
}
/**
* Container report event payload with origin.
*/
public static class ContainerReportFromDatanode
extends ReportFromDatanode<ContainerReportsProto> {
public ContainerReportFromDatanode(DatanodeDetails datanodeDetails,
ContainerReportsProto report) {
super(datanodeDetails, report);
}
}
/**
* Incremental Container report event payload with origin.
*/
public static class IncrementalContainerReportFromDatanode
extends ReportFromDatanode<IncrementalContainerReportProto> {
public IncrementalContainerReportFromDatanode(
DatanodeDetails datanodeDetails,
IncrementalContainerReportProto report) {
super(datanodeDetails, report);
}
}
/**
* Container action event payload with origin.
*/
public static class ContainerActionsFromDatanode
extends ReportFromDatanode<ContainerActionsProto> {
public ContainerActionsFromDatanode(DatanodeDetails datanodeDetails,
ContainerActionsProto actions) {
super(datanodeDetails, actions);
}
}
/**
* Pipeline report event payload with origin.
*/
public static class PipelineReportFromDatanode
extends ReportFromDatanode<PipelineReportsProto> {
public PipelineReportFromDatanode(DatanodeDetails datanodeDetails,
PipelineReportsProto report) {
super(datanodeDetails, report);
}
}
/**
* Pipeline action event payload with origin.
*/
public static class PipelineActionsFromDatanode
extends ReportFromDatanode<PipelineActionsProto> {
public PipelineActionsFromDatanode(DatanodeDetails datanodeDetails,
PipelineActionsProto actions) {
super(datanodeDetails, actions);
}
}
/**
* Container report event payload with origin.
*/
public static class CommandStatusReportFromDatanode
extends ReportFromDatanode<CommandStatusReportsProto> {
public CommandStatusReportFromDatanode(DatanodeDetails datanodeDetails,
CommandStatusReportsProto report) {
super(datanodeDetails, report);
}
}
}