| /* |
| * Licensed to the Apache Software Foundation (ASF) under one |
| * or more contributor license agreements. See the NOTICE file |
| * distributed with this work for additional information |
| * regarding copyright ownership. The ASF licenses this file |
| * to you under the Apache License, Version 2.0 (the |
| * "License"); you may not use this file except in compliance |
| * with the License. You may obtain a copy of the License at |
| * |
| * http://www.apache.org/licenses/LICENSE-2.0 |
| * |
| * Unless required by applicable law or agreed to in writing, software |
| * distributed under the License is distributed on an "AS IS" BASIS, |
| * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. |
| * See the License for the specific language governing permissions and |
| * limitations under the License. |
| */ |
| |
| package org.apache.hadoop.ozone.container.common.states.endpoint; |
| |
| import com.google.common.base.Preconditions; |
| import com.google.protobuf.Descriptors; |
| import com.google.protobuf.GeneratedMessage; |
| import org.apache.hadoop.hdds.conf.ConfigurationSource; |
| import org.apache.hadoop.hdds.protocol.DatanodeDetails; |
| import org.apache.hadoop.hdds.protocol.proto.HddsProtos.DatanodeDetailsProto; |
| import org.apache.hadoop.hdds.protocol.proto |
| .StorageContainerDatanodeProtocolProtos.PipelineActionsProto; |
| import org.apache.hadoop.hdds.protocol.proto |
| .StorageContainerDatanodeProtocolProtos.PipelineAction; |
| import org.apache.hadoop.hdds.protocol.proto |
| .StorageContainerDatanodeProtocolProtos.ContainerActionsProto; |
| import org.apache.hadoop.hdds.protocol.proto |
| .StorageContainerDatanodeProtocolProtos.ContainerAction; |
| import org.apache.hadoop.hdds.protocol.proto |
| .StorageContainerDatanodeProtocolProtos.SCMHeartbeatRequestProto; |
| import org.apache.hadoop.hdds.protocol.proto |
| .StorageContainerDatanodeProtocolProtos.SCMCommandProto; |
| import org.apache.hadoop.hdds.protocol.proto |
| .StorageContainerDatanodeProtocolProtos.SCMHeartbeatResponseProto; |
| import org.apache.hadoop.ozone.container.common.helpers |
| .DeletedContainerBlocksSummary; |
| import org.apache.hadoop.ozone.container.common.statemachine |
| .EndpointStateMachine; |
| import org.apache.hadoop.ozone.container.common.statemachine |
| .EndpointStateMachine.EndPointStates; |
| import org.apache.hadoop.ozone.container.common.statemachine.StateContext; |
| import org.apache.hadoop.ozone.protocol.commands.CloseContainerCommand; |
| import org.apache.hadoop.ozone.protocol.commands.ClosePipelineCommand; |
| import org.apache.hadoop.ozone.protocol.commands.CreatePipelineCommand; |
| import org.apache.hadoop.ozone.protocol.commands.DeleteBlocksCommand; |
| import org.apache.hadoop.ozone.protocol.commands.DeleteContainerCommand; |
| import org.apache.hadoop.ozone.protocol.commands.ReplicateContainerCommand; |
| |
| import org.apache.hadoop.ozone.protocol.commands.SetNodeOperationalStateCommand; |
| import org.slf4j.Logger; |
| import org.slf4j.LoggerFactory; |
| |
| import java.io.IOException; |
| import java.time.ZonedDateTime; |
| import java.util.LinkedList; |
| import java.util.List; |
| import java.util.concurrent.Callable; |
| |
| import static org.apache.hadoop.hdds.HddsConfigKeys |
| .HDDS_CONTAINER_ACTION_MAX_LIMIT; |
| import static org.apache.hadoop.hdds.HddsConfigKeys |
| .HDDS_CONTAINER_ACTION_MAX_LIMIT_DEFAULT; |
| import static org.apache.hadoop.hdds.HddsConfigKeys |
| .HDDS_PIPELINE_ACTION_MAX_LIMIT; |
| import static org.apache.hadoop.hdds.HddsConfigKeys |
| .HDDS_PIPELINE_ACTION_MAX_LIMIT_DEFAULT; |
| |
| /** |
| * Heartbeat class for SCMs. |
| */ |
| public class HeartbeatEndpointTask |
| implements Callable<EndpointStateMachine.EndPointStates> { |
| static final Logger LOG = |
| LoggerFactory.getLogger(HeartbeatEndpointTask.class); |
| private final EndpointStateMachine rpcEndpoint; |
| private final ConfigurationSource conf; |
| private DatanodeDetailsProto datanodeDetailsProto; |
| private StateContext context; |
| private int maxContainerActionsPerHB; |
| private int maxPipelineActionsPerHB; |
| |
| /** |
| * Constructs a SCM heart beat. |
| * |
| * @param conf Config. |
| */ |
| public HeartbeatEndpointTask(EndpointStateMachine rpcEndpoint, |
| ConfigurationSource conf, StateContext context) { |
| this.rpcEndpoint = rpcEndpoint; |
| this.conf = conf; |
| this.context = context; |
| this.maxContainerActionsPerHB = conf.getInt(HDDS_CONTAINER_ACTION_MAX_LIMIT, |
| HDDS_CONTAINER_ACTION_MAX_LIMIT_DEFAULT); |
| this.maxPipelineActionsPerHB = conf.getInt(HDDS_PIPELINE_ACTION_MAX_LIMIT, |
| HDDS_PIPELINE_ACTION_MAX_LIMIT_DEFAULT); |
| } |
| |
| /** |
| * Get the container Node ID proto. |
| * |
| * @return ContainerNodeIDProto |
| */ |
| public DatanodeDetailsProto getDatanodeDetailsProto() { |
| return datanodeDetailsProto; |
| } |
| |
| /** |
| * Set container node ID proto. |
| * |
| * @param datanodeDetailsProto - the node id. |
| */ |
| public void setDatanodeDetailsProto(DatanodeDetailsProto |
| datanodeDetailsProto) { |
| this.datanodeDetailsProto = datanodeDetailsProto; |
| } |
| |
| /** |
| * Computes a result, or throws an exception if unable to do so. |
| * |
| * @return computed result |
| * @throws Exception if unable to compute a result |
| */ |
| @Override |
| public EndpointStateMachine.EndPointStates call() throws Exception { |
| rpcEndpoint.lock(); |
| SCMHeartbeatRequestProto.Builder requestBuilder = null; |
| try { |
| Preconditions.checkState(this.datanodeDetailsProto != null); |
| |
| requestBuilder = SCMHeartbeatRequestProto.newBuilder() |
| .setDatanodeDetails(datanodeDetailsProto); |
| addReports(requestBuilder); |
| addContainerActions(requestBuilder); |
| addPipelineActions(requestBuilder); |
| SCMHeartbeatRequestProto request = requestBuilder.build(); |
| if (LOG.isDebugEnabled()) { |
| LOG.debug("Sending heartbeat message :: {}", request.toString()); |
| } |
| SCMHeartbeatResponseProto response = rpcEndpoint.getEndPoint() |
| .sendHeartbeat(request); |
| processResponse(response, datanodeDetailsProto); |
| rpcEndpoint.setLastSuccessfulHeartbeat(ZonedDateTime.now()); |
| rpcEndpoint.zeroMissedCount(); |
| } catch (IOException ex) { |
| Preconditions.checkState(requestBuilder != null); |
| // put back the reports which failed to be sent |
| putBackIncrementalReports(requestBuilder); |
| rpcEndpoint.logIfNeeded(ex); |
| } finally { |
| rpcEndpoint.unlock(); |
| } |
| return rpcEndpoint.getState(); |
| } |
| |
| // TODO: Make it generic. |
| private void putBackIncrementalReports( |
| SCMHeartbeatRequestProto.Builder requestBuilder) { |
| List<GeneratedMessage> reports = new LinkedList<>(); |
| // We only put back CommandStatusReports and IncrementalContainerReport |
| // because those are incremental. Container/Node/PipelineReport are |
| // accumulative so we can keep only the latest of each. |
| if (requestBuilder.getCommandStatusReportsCount() != 0) { |
| reports.addAll(requestBuilder.getCommandStatusReportsList()); |
| } |
| if (requestBuilder.getIncrementalContainerReportCount() != 0) { |
| reports.addAll(requestBuilder.getIncrementalContainerReportList()); |
| } |
| context.putBackReports(reports, rpcEndpoint.getAddress()); |
| } |
| |
| /** |
| * Adds all the available reports to heartbeat. |
| * |
| * @param requestBuilder builder to which the report has to be added. |
| */ |
| private void addReports(SCMHeartbeatRequestProto.Builder requestBuilder) { |
| for (GeneratedMessage report : |
| context.getAllAvailableReports(rpcEndpoint.getAddress())) { |
| String reportName = report.getDescriptorForType().getFullName(); |
| for (Descriptors.FieldDescriptor descriptor : |
| SCMHeartbeatRequestProto.getDescriptor().getFields()) { |
| String heartbeatFieldName = descriptor.getMessageType().getFullName(); |
| if (heartbeatFieldName.equals(reportName)) { |
| if (descriptor.isRepeated()) { |
| requestBuilder.addRepeatedField(descriptor, report); |
| } else { |
| requestBuilder.setField(descriptor, report); |
| } |
| break; |
| } |
| } |
| } |
| } |
| |
| /** |
| * Adds all the pending ContainerActions to the heartbeat. |
| * |
| * @param requestBuilder builder to which the report has to be added. |
| */ |
| private void addContainerActions( |
| SCMHeartbeatRequestProto.Builder requestBuilder) { |
| List<ContainerAction> actions = context.getPendingContainerAction( |
| rpcEndpoint.getAddress(), maxContainerActionsPerHB); |
| if (!actions.isEmpty()) { |
| ContainerActionsProto cap = ContainerActionsProto.newBuilder() |
| .addAllContainerActions(actions) |
| .build(); |
| requestBuilder.setContainerActions(cap); |
| } |
| } |
| |
| /** |
| * Adds all the pending PipelineActions to the heartbeat. |
| * |
| * @param requestBuilder builder to which the report has to be added. |
| */ |
| private void addPipelineActions( |
| SCMHeartbeatRequestProto.Builder requestBuilder) { |
| List<PipelineAction> actions = context.getPendingPipelineAction( |
| rpcEndpoint.getAddress(), maxPipelineActionsPerHB); |
| if (!actions.isEmpty()) { |
| PipelineActionsProto pap = PipelineActionsProto.newBuilder() |
| .addAllPipelineActions(actions) |
| .build(); |
| requestBuilder.setPipelineActions(pap); |
| } |
| } |
| |
| /** |
| * Returns a builder class for HeartbeatEndpointTask task. |
| * @return Builder. |
| */ |
| public static Builder newBuilder() { |
| return new Builder(); |
| } |
| |
| /** |
| * Add this command to command processing Queue. |
| * |
| * @param response - SCMHeartbeat response. |
| */ |
| private void processResponse(SCMHeartbeatResponseProto response, |
| final DatanodeDetailsProto datanodeDetails) { |
| Preconditions.checkState(response.getDatanodeUUID() |
| .equalsIgnoreCase(datanodeDetails.getUuid()), |
| "Unexpected datanode ID in the response."); |
| // Verify the response is indeed for this datanode. |
| for (SCMCommandProto commandResponseProto : response |
| .getCommandsList()) { |
| switch (commandResponseProto.getCommandType()) { |
| case reregisterCommand: |
| if (rpcEndpoint.getState() == EndPointStates.HEARTBEAT) { |
| if (LOG.isDebugEnabled()) { |
| LOG.debug("Received SCM notification to register." |
| + " Interrupt HEARTBEAT and transit to REGISTER state."); |
| } |
| rpcEndpoint.setState(EndPointStates.REGISTER); |
| } else { |
| if (LOG.isDebugEnabled()) { |
| LOG.debug("Illegal state {} found, expecting {}.", |
| rpcEndpoint.getState().name(), EndPointStates.HEARTBEAT); |
| } |
| } |
| break; |
| case deleteBlocksCommand: |
| DeleteBlocksCommand db = DeleteBlocksCommand |
| .getFromProtobuf( |
| commandResponseProto.getDeleteBlocksCommandProto()); |
| if (commandResponseProto.hasTerm()) { |
| db.setTerm(commandResponseProto.getTerm()); |
| } |
| if (!db.blocksTobeDeleted().isEmpty()) { |
| if (LOG.isDebugEnabled()) { |
| LOG.debug(DeletedContainerBlocksSummary |
| .getFrom(db.blocksTobeDeleted()) |
| .toString()); |
| } |
| this.context.addCommand(db); |
| } |
| break; |
| case closeContainerCommand: |
| CloseContainerCommand closeContainer = |
| CloseContainerCommand.getFromProtobuf( |
| commandResponseProto.getCloseContainerCommandProto()); |
| if (commandResponseProto.hasTerm()) { |
| closeContainer.setTerm(commandResponseProto.getTerm()); |
| } |
| if (commandResponseProto.hasEncodedToken()) { |
| closeContainer.setEncodedToken( |
| commandResponseProto.getEncodedToken()); |
| } |
| if (LOG.isDebugEnabled()) { |
| LOG.debug("Received SCM container close request for container {}", |
| closeContainer.getContainerID()); |
| } |
| this.context.addCommand(closeContainer); |
| break; |
| case replicateContainerCommand: |
| ReplicateContainerCommand replicateContainerCommand = |
| ReplicateContainerCommand.getFromProtobuf( |
| commandResponseProto.getReplicateContainerCommandProto()); |
| if (commandResponseProto.hasTerm()) { |
| replicateContainerCommand.setTerm(commandResponseProto.getTerm()); |
| } |
| if (LOG.isDebugEnabled()) { |
| LOG.debug("Received SCM container replicate request for container {}", |
| replicateContainerCommand.getContainerID()); |
| } |
| this.context.addCommand(replicateContainerCommand); |
| break; |
| case deleteContainerCommand: |
| DeleteContainerCommand deleteContainerCommand = |
| DeleteContainerCommand.getFromProtobuf( |
| commandResponseProto.getDeleteContainerCommandProto()); |
| if (commandResponseProto.hasTerm()) { |
| deleteContainerCommand.setTerm(commandResponseProto.getTerm()); |
| } |
| if (LOG.isDebugEnabled()) { |
| LOG.debug("Received SCM delete container request for container {}", |
| deleteContainerCommand.getContainerID()); |
| } |
| this.context.addCommand(deleteContainerCommand); |
| break; |
| case createPipelineCommand: |
| CreatePipelineCommand createPipelineCommand = |
| CreatePipelineCommand.getFromProtobuf( |
| commandResponseProto.getCreatePipelineCommandProto()); |
| if (commandResponseProto.hasTerm()) { |
| createPipelineCommand.setTerm(commandResponseProto.getTerm()); |
| } |
| if (LOG.isDebugEnabled()) { |
| LOG.debug("Received SCM create pipeline request {}", |
| createPipelineCommand.getPipelineID()); |
| } |
| this.context.addCommand(createPipelineCommand); |
| break; |
| case closePipelineCommand: |
| ClosePipelineCommand closePipelineCommand = |
| ClosePipelineCommand.getFromProtobuf( |
| commandResponseProto.getClosePipelineCommandProto()); |
| if (commandResponseProto.hasTerm()) { |
| closePipelineCommand.setTerm(commandResponseProto.getTerm()); |
| } |
| if (LOG.isDebugEnabled()) { |
| LOG.debug("Received SCM close pipeline request {}", |
| closePipelineCommand.getPipelineID()); |
| } |
| this.context.addCommand(closePipelineCommand); |
| break; |
| case setNodeOperationalStateCommand: |
| SetNodeOperationalStateCommand setNodeOperationalStateCommand = |
| SetNodeOperationalStateCommand.getFromProtobuf( |
| commandResponseProto.getSetNodeOperationalStateCommandProto()); |
| if (commandResponseProto.hasTerm()) { |
| setNodeOperationalStateCommand.setTerm( |
| commandResponseProto.getTerm()); |
| } |
| if (LOG.isDebugEnabled()) { |
| LOG.debug("Received SCM set operational state command. State: {} " + |
| "Expiry: {}", setNodeOperationalStateCommand.getOpState(), |
| setNodeOperationalStateCommand.getStateExpiryEpochSeconds()); |
| } |
| this.context.addCommand(setNodeOperationalStateCommand); |
| break; |
| default: |
| throw new IllegalArgumentException("Unknown response : " |
| + commandResponseProto.getCommandType().name()); |
| } |
| } |
| } |
| |
| /** |
| * Builder class for HeartbeatEndpointTask. |
| */ |
| public static class Builder { |
| private EndpointStateMachine endPointStateMachine; |
| private ConfigurationSource conf; |
| private DatanodeDetails datanodeDetails; |
| private StateContext context; |
| |
| /** |
| * Constructs the builder class. |
| */ |
| public Builder() { |
| } |
| |
| /** |
| * Sets the endpoint state machine. |
| * |
| * @param rpcEndPoint - Endpoint state machine. |
| * @return Builder |
| */ |
| public Builder setEndpointStateMachine(EndpointStateMachine rpcEndPoint) { |
| this.endPointStateMachine = rpcEndPoint; |
| return this; |
| } |
| |
| /** |
| * Sets the Config. |
| * |
| * @param config - config |
| * @return Builder |
| */ |
| public Builder setConfig(ConfigurationSource config) { |
| this.conf = config; |
| return this; |
| } |
| |
| /** |
| * Sets the NodeID. |
| * |
| * @param dnDetails - NodeID proto |
| * @return Builder |
| */ |
| public Builder setDatanodeDetails(DatanodeDetails dnDetails) { |
| this.datanodeDetails = dnDetails; |
| return this; |
| } |
| |
| /** |
| * Sets the context. |
| * @param stateContext - State context. |
| * @return this. |
| */ |
| public Builder setContext(StateContext stateContext) { |
| this.context = stateContext; |
| return this; |
| } |
| |
| public HeartbeatEndpointTask build() { |
| if (endPointStateMachine == null) { |
| LOG.error("No endpoint specified."); |
| throw new IllegalArgumentException("A valid endpoint state machine is" + |
| " needed to construct HeartbeatEndpointTask task"); |
| } |
| |
| if (conf == null) { |
| LOG.error("No config specified."); |
| throw new IllegalArgumentException("A valid configration is needed to" + |
| " construct HeartbeatEndpointTask task"); |
| } |
| |
| if (datanodeDetails == null) { |
| LOG.error("No datanode specified."); |
| throw new IllegalArgumentException("A valid Node ID is needed to " + |
| "construct HeartbeatEndpointTask task"); |
| } |
| |
| HeartbeatEndpointTask task = new HeartbeatEndpointTask(this |
| .endPointStateMachine, this.conf, this.context); |
| task.setDatanodeDetailsProto(datanodeDetails.getProtoBufMessage()); |
| return task; |
| } |
| } |
| } |