blob: eb3cf7ddbdd22171f3c8b3ed63b54183aa0a3d1a [file] [log] [blame]
/**
* Licensed to the Apache Software Foundation (ASF) under one
* or more contributor license agreements. See the NOTICE file
* distributed with this work for additional information
* regarding copyright ownership. The ASF licenses this file
* to you under the Apache License, Version 2.0 (the
* "License"); you may not use this file except in compliance
* with the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/
package org.apache.hadoop.hdds.scm.container;
import com.google.common.base.Preconditions;
import org.apache.hadoop.hdds.client.ECReplicationConfig;
import org.apache.hadoop.hdds.protocol.DatanodeDetails;
import org.apache.hadoop.hdds.protocol.proto.HddsProtos;
import org.apache.hadoop.hdds.protocol.proto.HddsProtos.LifeCycleEvent;
import org.apache.hadoop.hdds.protocol.proto
.StorageContainerDatanodeProtocolProtos.ContainerReplicaProto;
import org.apache.hadoop.hdds.protocol.proto
.StorageContainerDatanodeProtocolProtos.ContainerReplicaProto.State;
import org.apache.hadoop.hdds.scm.events.SCMEvents;
import org.apache.hadoop.hdds.scm.ha.SCMContext;
import org.apache.hadoop.hdds.server.events.EventPublisher;
import org.apache.hadoop.ozone.protocol.commands.CommandForDatanode;
import org.apache.hadoop.ozone.protocol.commands.DeleteContainerCommand;
import org.apache.hadoop.ozone.common.statemachine.InvalidStateTransitionException;
import org.apache.hadoop.ozone.protocol.commands.SCMCommand;
import org.apache.ratis.protocol.exceptions.NotLeaderException;
import org.slf4j.Logger;
import java.io.IOException;
import java.util.ArrayList;
import java.util.List;
import java.util.Set;
import java.util.UUID;
import java.util.concurrent.TimeoutException;
import java.util.function.Supplier;
/**
* Base class for all the container report handlers.
*/
public class AbstractContainerReportHandler {
private final ContainerManager containerManager;
private final SCMContext scmContext;
private final Logger logger;
/**
* Constructs AbstractContainerReportHandler instance with the
* given ContainerManager instance.
*
* @param containerManager ContainerManager
* @param logger Logger to be used for logging
*/
AbstractContainerReportHandler(final ContainerManager containerManager,
final SCMContext scmContext,
final Logger logger) {
Preconditions.checkNotNull(containerManager);
Preconditions.checkNotNull(scmContext);
Preconditions.checkNotNull(logger);
this.containerManager = containerManager;
this.scmContext = scmContext;
this.logger = logger;
}
/**
* Process the given ContainerReplica received from specified datanode.
*
* @param datanodeDetails DatanodeDetails for the DN
* @param replicaProto Protobuf representing the replicas
* @param publisher EventPublisher instance
* @throws IOException
* @throws InvalidStateTransitionException
* @throws TimeoutException
*/
protected void processContainerReplica(final DatanodeDetails datanodeDetails,
final ContainerReplicaProto replicaProto, final EventPublisher publisher)
throws IOException, InvalidStateTransitionException, TimeoutException {
ContainerInfo container = getContainerManager().getContainer(
ContainerID.valueOf(replicaProto.getContainerID()));
processContainerReplica(
datanodeDetails, container, replicaProto, publisher);
}
/**
* Process the given ContainerReplica received from specified datanode.
*
* @param datanodeDetails DatanodeDetails of the node which reported
* this replica
* @param containerInfo ContainerInfo represending the container
* @param replicaProto ContainerReplica
* @param publisher EventPublisher instance
*
* @throws IOException In case of any Exception while processing the report
* @throws TimeoutException In case of timeout while updating container state
*/
protected void processContainerReplica(final DatanodeDetails datanodeDetails,
final ContainerInfo containerInfo,
final ContainerReplicaProto replicaProto, final EventPublisher publisher)
throws IOException, InvalidStateTransitionException, TimeoutException {
final ContainerID containerId = containerInfo.containerID();
if (logger.isDebugEnabled()) {
logger.debug("Processing replica of container {} from datanode {}",
containerId, datanodeDetails);
}
// Synchronized block should be replaced by container lock,
// once we have introduced lock inside ContainerInfo.
synchronized (containerInfo) {
updateContainerStats(datanodeDetails, containerInfo, replicaProto);
if (!updateContainerState(datanodeDetails, containerInfo, replicaProto,
publisher)) {
updateContainerReplica(datanodeDetails, containerId, replicaProto);
}
}
}
/**
* Update the container stats if it's lagging behind the stats in reported
* replica.
*
* @param containerInfo ContainerInfo representing the container
* @param replicaProto Container Replica information
* @throws ContainerNotFoundException If the container is not present
*/
private void updateContainerStats(final DatanodeDetails datanodeDetails,
final ContainerInfo containerInfo,
final ContainerReplicaProto replicaProto)
throws ContainerNotFoundException {
if (isHealthy(replicaProto::getState)) {
if (containerInfo.getSequenceId() <
replicaProto.getBlockCommitSequenceId()) {
containerInfo.updateSequenceId(
replicaProto.getBlockCommitSequenceId());
}
if (containerInfo.getReplicationConfig().getReplicationType()
== HddsProtos.ReplicationType.EC) {
updateECContainerStats(containerInfo, replicaProto, datanodeDetails);
} else {
updateRatisContainerStats(containerInfo, replicaProto, datanodeDetails);
}
}
}
private void updateRatisContainerStats(ContainerInfo containerInfo,
ContainerReplicaProto newReplica, DatanodeDetails newSource)
throws ContainerNotFoundException {
List<ContainerReplica> otherReplicas =
getOtherReplicas(containerInfo.containerID(), newSource);
long usedBytes = newReplica.getUsed();
long keyCount = newReplica.getKeyCount();
for (ContainerReplica r : otherReplicas) {
usedBytes = calculateUsage(containerInfo, usedBytes, r.getBytesUsed());
keyCount = calculateUsage(containerInfo, keyCount, r.getKeyCount());
}
updateContainerUsedAndKeys(containerInfo, usedBytes, keyCount);
}
private void updateECContainerStats(ContainerInfo containerInfo,
ContainerReplicaProto newReplica, DatanodeDetails newSource)
throws ContainerNotFoundException {
int dataNum =
((ECReplicationConfig)containerInfo.getReplicationConfig()).getData();
// The first EC index and the parity indexes must all be the same size
// while the other data indexes may be smaller due to partial stripes.
// When calculating the stats, we only use the first data and parity and
// ignore the others. We only need to run the check if we are processing
// the first data or parity replicas.
if (newReplica.getReplicaIndex() == 1
|| newReplica.getReplicaIndex() > dataNum) {
List<ContainerReplica> otherReplicas =
getOtherReplicas(containerInfo.containerID(), newSource);
long usedBytes = newReplica.getUsed();
long keyCount = newReplica.getKeyCount();
for (ContainerReplica r : otherReplicas) {
if (r.getReplicaIndex() > 1 && r.getReplicaIndex() <= dataNum) {
// Ignore all data replicas except the first for stats
continue;
}
usedBytes = calculateUsage(containerInfo, usedBytes, r.getBytesUsed());
keyCount = calculateUsage(containerInfo, keyCount, r.getKeyCount());
}
updateContainerUsedAndKeys(containerInfo, usedBytes, keyCount);
}
}
private long calculateUsage(ContainerInfo containerInfo, long lastValue,
long thisValue) {
if (containerInfo.getState().equals(HddsProtos.LifeCycleState.OPEN)) {
// Open containers are generally growing in key count and size, the
// overall size should be the min of all reported replicas.
return Math.min(lastValue, thisValue);
} else {
// Containers which are not open can only shrink in size, so use the
// largest values reported.
return Math.max(lastValue, thisValue);
}
}
private void updateContainerUsedAndKeys(ContainerInfo containerInfo,
long usedBytes, long keyCount) {
if (containerInfo.getUsedBytes() != usedBytes) {
containerInfo.setUsedBytes(usedBytes);
}
if (containerInfo.getNumberOfKeys() != keyCount) {
containerInfo.setNumberOfKeys(keyCount);
}
}
private List<ContainerReplica> getOtherReplicas(ContainerID containerId,
DatanodeDetails exclude) throws ContainerNotFoundException {
List<ContainerReplica> filteredReplicas = new ArrayList<>();
Set<ContainerReplica> replicas
= containerManager.getContainerReplicas(containerId);
for (ContainerReplica r : replicas) {
if (!r.getDatanodeDetails().equals(exclude)) {
filteredReplicas.add(r);
}
}
return filteredReplicas;
}
/**
* Updates the container state based on the given replica state.
*
* @param datanode Datanode from which the report is received
* @param container ContainerInfo representing the the container
* @param replica ContainerReplica
* @boolean true - replica should be ignored in the next process
* @throws IOException In case of Exception
* @throws TimeoutException In case of timeout while updating container state
*/
private boolean updateContainerState(final DatanodeDetails datanode,
final ContainerInfo container,
final ContainerReplicaProto replica,
final EventPublisher publisher)
throws IOException, InvalidStateTransitionException, TimeoutException {
final ContainerID containerId = container.containerID();
boolean ignored = false;
switch (container.getState()) {
case OPEN:
/*
* If the state of a container is OPEN, datanodes cannot report
* any other state.
*/
if (replica.getState() != State.OPEN) {
logger.warn("Container {} is in OPEN state, but the datanode {} " +
"reports an {} replica.", containerId,
datanode, replica.getState());
// Should we take some action?
}
break;
case CLOSING:
/*
* When the container is in CLOSING state the replicas can be in any
* of the following states:
*
* - OPEN
* - CLOSING
* - QUASI_CLOSED
* - CLOSED
*
* If all the replica are either in OPEN or CLOSING state, do nothing.
*
* If the replica is in QUASI_CLOSED state, move the container to
* QUASI_CLOSED state.
*
* If the replica is in CLOSED state, mark the container as CLOSED.
*
*/
if (replica.getState() == State.QUASI_CLOSED) {
logger.info("Moving container {} to QUASI_CLOSED state, datanode {} " +
"reported QUASI_CLOSED replica.", containerId, datanode);
containerManager.updateContainerState(containerId,
LifeCycleEvent.QUASI_CLOSE);
}
if (replica.getState() == State.CLOSED) {
logger.info("Moving container {} to CLOSED state, datanode {} " +
"reported CLOSED replica.", containerId, datanode);
Preconditions.checkArgument(replica.getBlockCommitSequenceId()
== container.getSequenceId());
containerManager.updateContainerState(containerId,
LifeCycleEvent.CLOSE);
}
break;
case QUASI_CLOSED:
/*
* The container is in QUASI_CLOSED state, this means that at least
* one of the replica was QUASI_CLOSED.
*
* Now replicas can be in any of the following state.
*
* 1. OPEN
* 2. CLOSING
* 3. QUASI_CLOSED
* 4. CLOSED
*
* If at least one of the replica is in CLOSED state, mark the
* container as CLOSED.
*
*/
if (replica.getState() == State.CLOSED) {
logger.info("Moving container {} to CLOSED state, datanode {} " +
"reported CLOSED replica.", containerId, datanode);
Preconditions.checkArgument(replica.getBlockCommitSequenceId()
== container.getSequenceId());
containerManager.updateContainerState(containerId,
LifeCycleEvent.FORCE_CLOSE);
}
break;
case CLOSED:
/*
* The container is already in closed state. do nothing.
*/
break;
case DELETING:
/*
* The container is under deleting. do nothing.
*/
break;
case DELETED:
/*
* The container is deleted. delete the replica.
*/
deleteReplica(containerId, datanode, publisher, "DELETED");
ignored = true;
break;
default:
break;
}
return ignored;
}
private void updateContainerReplica(final DatanodeDetails datanodeDetails,
final ContainerID containerId,
final ContainerReplicaProto replicaProto)
throws ContainerNotFoundException, ContainerReplicaNotFoundException {
final ContainerReplica replica = ContainerReplica.newBuilder()
.setContainerID(containerId)
.setContainerState(replicaProto.getState())
.setDatanodeDetails(datanodeDetails)
.setOriginNodeId(UUID.fromString(replicaProto.getOriginNodeId()))
.setSequenceId(replicaProto.getBlockCommitSequenceId())
.setKeyCount(replicaProto.getKeyCount())
.setReplicaIndex(replicaProto.getReplicaIndex())
.setBytesUsed(replicaProto.getUsed())
.build();
if (replica.getState().equals(State.DELETED)) {
containerManager.removeContainerReplica(containerId, replica);
} else {
containerManager.updateContainerReplica(containerId, replica);
}
}
/**
* Returns true if the container replica is HEALTHY. <br>
* A replica is considered healthy if it's not in UNHEALTHY,
* INVALID or DELETED state.
*
* @param replicaState State of the container replica.
* @return true if healthy, false otherwise
*/
private boolean isHealthy(final Supplier<State> replicaState) {
return replicaState.get() != State.UNHEALTHY
&& replicaState.get() != State.INVALID
&& replicaState.get() != State.DELETED;
}
/**
* Return ContainerManager.
* @return {@link ContainerManager}
*/
protected ContainerManager getContainerManager() {
return containerManager;
}
protected void deleteReplica(ContainerID containerID, DatanodeDetails dn,
EventPublisher publisher, String reason) {
SCMCommand<?> command = new DeleteContainerCommand(
containerID.getId(), true);
try {
command.setTerm(scmContext.getTermOfLeader());
} catch (NotLeaderException nle) {
logger.warn("Skip sending delete container command," +
" since not leader SCM", nle);
return;
}
publisher.fireEvent(SCMEvents.DATANODE_COMMAND,
new CommandForDatanode<>(dn.getUuid(), command));
logger.info("Sending delete container command for " + reason +
" container {} to datanode {}", containerID.getId(), dn);
}
}