blob: 2c96856362c0326d2b6ec99e2ea81158cbc7e9b8 [file] [log] [blame]
/**
* Licensed to the Apache Software Foundation (ASF) under one or more
* contributor license
* agreements. See the NOTICE file distributed with this work for additional
* information regarding
* copyright ownership. The ASF licenses this file to you under the Apache
* License, Version 2.0 (the
* "License"); you may not use this file except in compliance with the
* License. You may obtain a
* copy of the License at
*
* <p>http://www.apache.org/licenses/LICENSE-2.0
*
* <p>Unless required by applicable law or agreed to in writing, software
* distributed under the
* License is distributed on an "AS IS" BASIS, WITHOUT WARRANTIES OR
* CONDITIONS OF ANY KIND, either
* express or implied. See the License for the specific language governing
* permissions and
* limitations under the License.
*/
package org.apache.hadoop.hdds.scm.server;
import com.google.common.annotations.VisibleForTesting;
import com.google.common.base.Preconditions;
import com.google.protobuf.BlockingService;
import org.apache.hadoop.hdds.conf.OzoneConfiguration;
import org.apache.hadoop.hdds.protocol.DatanodeDetails;
import org.apache.hadoop.hdds.protocol.proto.HddsProtos;
import org.apache.hadoop.hdds.protocol.proto.StorageContainerDatanodeProtocolProtos;
import org.apache.hadoop.hdds.protocol.proto
.StorageContainerDatanodeProtocolProtos.SCMHeartbeatRequestProto;
import org.apache.hadoop.hdds.protocol.proto
.StorageContainerDatanodeProtocolProtos.ContainerReportsProto;
import org.apache.hadoop.hdds.protocol.proto
.StorageContainerDatanodeProtocolProtos.PipelineReportsProto;
import org.apache.hadoop.hdds.protocol.proto
.StorageContainerDatanodeProtocolProtos.SCMHeartbeatResponseProto;
import org.apache.hadoop.hdds.protocol.proto
.StorageContainerDatanodeProtocolProtos.SCMVersionRequestProto;
import org.apache.hadoop.hdds.protocol.proto
.StorageContainerDatanodeProtocolProtos.SCMVersionResponseProto;
import org.apache.hadoop.hdds.protocol.proto
.StorageContainerDatanodeProtocolProtos.SCMRegisteredResponseProto;
import org.apache.hadoop.hdds.protocol.proto
.StorageContainerDatanodeProtocolProtos.ReregisterCommandProto;
import org.apache.hadoop.hdds.protocol.proto
.StorageContainerDatanodeProtocolProtos.SCMCommandProto;
import org.apache.hadoop.hdds.protocol.proto
.StorageContainerDatanodeProtocolProtos.NodeReportProto;
import static org.apache.hadoop.hdds.protocol.proto
.StorageContainerDatanodeProtocolProtos.SCMCommandProto
.Type.closeContainerCommand;
import static org.apache.hadoop.hdds.protocol.proto
.StorageContainerDatanodeProtocolProtos.SCMCommandProto
.Type.deleteBlocksCommand;
import static org.apache.hadoop.hdds.protocol.proto
.StorageContainerDatanodeProtocolProtos.SCMCommandProto.Type
.replicateContainerCommand;
import static org.apache.hadoop.hdds.protocol.proto
.StorageContainerDatanodeProtocolProtos.SCMCommandProto
.Type.reregisterCommand;
import org.apache.hadoop.hdds.scm.HddsServerUtil;
import org.apache.hadoop.hdds.scm.events.SCMEvents;
import org.apache.hadoop.hdds.scm.server.SCMDatanodeHeartbeatDispatcher
.ReportFromDatanode;
import org.apache.hadoop.hdds.scm.server.SCMDatanodeHeartbeatDispatcher
.PipelineReportFromDatanode;
import org.apache.hadoop.hdds.server.events.EventPublisher;
import org.apache.hadoop.io.IOUtils;
import org.apache.hadoop.ipc.ProtobufRpcEngine;
import org.apache.hadoop.ipc.RPC;
import org.apache.hadoop.ozone.protocol.StorageContainerDatanodeProtocol;
import org.apache.hadoop.ozone.protocol.commands.CloseContainerCommand;
import org.apache.hadoop.ozone.protocol.commands.DeleteBlocksCommand;
import org.apache.hadoop.ozone.protocol.commands.RegisteredCommand;
import org.apache.hadoop.ozone.protocol.commands.ReplicateContainerCommand;
import org.apache.hadoop.ozone.protocol.commands.SCMCommand;
import org.apache.hadoop.ozone.protocolPB.StorageContainerDatanodeProtocolPB;
import org.apache.hadoop.ozone.protocolPB
.StorageContainerDatanodeProtocolServerSideTranslatorPB;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
import java.io.IOException;
import java.net.InetSocketAddress;
import java.util.LinkedList;
import java.util.List;
import java.util.stream.Collectors;
import static org.apache.hadoop.hdds.scm.ScmConfigKeys.OZONE_SCM_DATANODE_ADDRESS_KEY;
import static org.apache.hadoop.hdds.scm.ScmConfigKeys.OZONE_SCM_HANDLER_COUNT_DEFAULT;
import static org.apache.hadoop.hdds.scm.ScmConfigKeys.OZONE_SCM_HANDLER_COUNT_KEY;
import static org.apache.hadoop.hdds.scm.events.SCMEvents.PIPELINE_REPORT;
import static org.apache.hadoop.hdds.scm.server.StorageContainerManager.startRpcServer;
import static org.apache.hadoop.hdds.server.ServerUtils.updateRPCListenAddress;
/**
* Protocol Handler for Datanode Protocol.
*/
public class SCMDatanodeProtocolServer implements
StorageContainerDatanodeProtocol {
private static final Logger LOG = LoggerFactory.getLogger(
SCMDatanodeProtocolServer.class);
/**
* The RPC server that listens to requests from DataNodes.
*/
private final RPC.Server datanodeRpcServer;
private final StorageContainerManager scm;
private final InetSocketAddress datanodeRpcAddress;
private final SCMDatanodeHeartbeatDispatcher heartbeatDispatcher;
private final EventPublisher eventPublisher;
public SCMDatanodeProtocolServer(final OzoneConfiguration conf,
StorageContainerManager scm, EventPublisher eventPublisher)
throws IOException {
Preconditions.checkNotNull(scm, "SCM cannot be null");
Preconditions.checkNotNull(eventPublisher, "EventPublisher cannot be null");
this.scm = scm;
this.eventPublisher = eventPublisher;
final int handlerCount =
conf.getInt(OZONE_SCM_HANDLER_COUNT_KEY,
OZONE_SCM_HANDLER_COUNT_DEFAULT);
heartbeatDispatcher = new SCMDatanodeHeartbeatDispatcher(
scm.getScmNodeManager(), eventPublisher);
RPC.setProtocolEngine(conf, StorageContainerDatanodeProtocolPB.class,
ProtobufRpcEngine.class);
BlockingService dnProtoPbService =
StorageContainerDatanodeProtocolProtos
.StorageContainerDatanodeProtocolService
.newReflectiveBlockingService(
new StorageContainerDatanodeProtocolServerSideTranslatorPB(
this));
InetSocketAddress datanodeRpcAddr =
HddsServerUtil.getScmDataNodeBindAddress(conf);
datanodeRpcServer =
startRpcServer(
conf,
datanodeRpcAddr,
StorageContainerDatanodeProtocolPB.class,
dnProtoPbService,
handlerCount);
datanodeRpcAddress =
updateRPCListenAddress(
conf, OZONE_SCM_DATANODE_ADDRESS_KEY, datanodeRpcAddr,
datanodeRpcServer);
}
public void start() {
LOG.info(
StorageContainerManager.buildRpcServerStartMessage(
"RPC server for DataNodes", datanodeRpcAddress));
datanodeRpcServer.start();
}
public InetSocketAddress getDatanodeRpcAddress() {
return datanodeRpcAddress;
}
@Override
public SCMVersionResponseProto getVersion(SCMVersionRequestProto
versionRequest)
throws IOException {
return scm.getScmNodeManager().getVersion(versionRequest)
.getProtobufMessage();
}
@Override
public SCMRegisteredResponseProto register(
HddsProtos.DatanodeDetailsProto datanodeDetailsProto,
NodeReportProto nodeReport,
ContainerReportsProto containerReportsProto,
PipelineReportsProto pipelineReportsProto)
throws IOException {
DatanodeDetails datanodeDetails = DatanodeDetails
.getFromProtoBuf(datanodeDetailsProto);
// TODO : Return the list of Nodes that forms the SCM HA.
RegisteredCommand registeredCommand = scm.getScmNodeManager()
.register(datanodeDetails, nodeReport, pipelineReportsProto);
if (registeredCommand.getError()
== SCMRegisteredResponseProto.ErrorCode.success) {
scm.getContainerManager().processContainerReports(datanodeDetails,
containerReportsProto, true);
eventPublisher.fireEvent(SCMEvents.NODE_REGISTRATION_CONT_REPORT,
new NodeRegistrationContainerReport(datanodeDetails,
containerReportsProto));
eventPublisher.fireEvent(PIPELINE_REPORT,
new PipelineReportFromDatanode(datanodeDetails,
pipelineReportsProto));
}
return getRegisteredResponse(registeredCommand);
}
@VisibleForTesting
public static SCMRegisteredResponseProto getRegisteredResponse(
RegisteredCommand cmd) {
return SCMRegisteredResponseProto.newBuilder()
// TODO : Fix this later when we have multiple SCM support.
// .setAddressList(addressList)
.setErrorCode(cmd.getError())
.setClusterID(cmd.getClusterID())
.setDatanodeUUID(cmd.getDatanodeUUID())
.build();
}
@Override
public SCMHeartbeatResponseProto sendHeartbeat(
SCMHeartbeatRequestProto heartbeat) throws IOException {
List<SCMCommandProto> cmdResponses = new LinkedList<>();
for (SCMCommand cmd : heartbeatDispatcher.dispatch(heartbeat)) {
cmdResponses.add(getCommandResponse(cmd));
}
return SCMHeartbeatResponseProto.newBuilder()
.setDatanodeUUID(heartbeat.getDatanodeDetails().getUuid())
.addAllCommands(cmdResponses).build();
}
/**
* Returns a SCMCommandRepose from the SCM Command.
*
* @param cmd - Cmd
* @return SCMCommandResponseProto
* @throws IOException
*/
@VisibleForTesting
public SCMCommandProto getCommandResponse(SCMCommand cmd)
throws IOException {
SCMCommandProto.Builder builder =
SCMCommandProto.newBuilder();
switch (cmd.getType()) {
case reregisterCommand:
return builder
.setCommandType(reregisterCommand)
.setReregisterCommandProto(ReregisterCommandProto
.getDefaultInstance())
.build();
case deleteBlocksCommand:
// Once SCM sends out the deletion message, increment the count.
// this is done here instead of when SCM receives the ACK, because
// DN might not be able to response the ACK for sometime. In case
// it times out, SCM needs to re-send the message some more times.
List<Long> txs =
((DeleteBlocksCommand) cmd)
.blocksTobeDeleted()
.stream()
.map(tx -> tx.getTxID())
.collect(Collectors.toList());
scm.getScmBlockManager().getDeletedBlockLog().incrementCount(txs);
return builder
.setCommandType(deleteBlocksCommand)
.setDeleteBlocksCommandProto(((DeleteBlocksCommand) cmd).getProto())
.build();
case closeContainerCommand:
return builder
.setCommandType(closeContainerCommand)
.setCloseContainerCommandProto(
((CloseContainerCommand) cmd).getProto())
.build();
case replicateContainerCommand:
return builder
.setCommandType(replicateContainerCommand)
.setReplicateContainerCommandProto(
((ReplicateContainerCommand)cmd).getProto())
.build();
default:
throw new IllegalArgumentException("Not implemented");
}
}
public void join() throws InterruptedException {
LOG.trace("Join RPC server for DataNodes");
datanodeRpcServer.join();
}
public void stop() {
try {
LOG.info("Stopping the RPC server for DataNodes");
datanodeRpcServer.stop();
} catch (Exception ex) {
LOG.error(" datanodeRpcServer stop failed.", ex);
}
IOUtils.cleanupWithLogger(LOG, scm.getScmNodeManager());
}
/**
* Wrapper class for events with the datanode origin.
*/
public static class NodeRegistrationContainerReport extends
ReportFromDatanode<ContainerReportsProto> {
public NodeRegistrationContainerReport(DatanodeDetails datanodeDetails,
ContainerReportsProto report) {
super(datanodeDetails, report);
}
}
}