blob: 8be7803a6707810bcd0f94dc4c78dcda60a3b395 [file] [log] [blame]
/*
* Licensed to the Apache Software Foundation (ASF) under one or more
* contributor license agreements. See the NOTICE file distributed with this
* work for additional information regarding copyright ownership. The ASF
* licenses this file to you under the Apache License, Version 2.0 (the
* "License"); you may not use this file except in compliance with the License.
* You may obtain a copy of the License at
* <p>
* <p>http://www.apache.org/licenses/LICENSE-2.0
* <p>
* <p>Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS, WITHOUT
* WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. See the
* License for the specific language governing permissions and limitations under
* the License.
*/
package org.apache.hadoop.hdds.scm.container;
import java.io.IOException;
import org.apache.hadoop.hdds.protocol.proto.HddsProtos;
import org.apache.hadoop.hdds.scm.container.common.helpers.ContainerInfo;
import org.apache.hadoop.hdds.scm.container.common.helpers.ContainerWithPipeline;
import org.apache.hadoop.hdds.scm.container.common.helpers.Pipeline;
import org.apache.hadoop.hdds.server.events.EventHandler;
import org.apache.hadoop.hdds.server.events.EventPublisher;
import org.apache.hadoop.hdds.server.events.IdentifiableEventPayload;
import org.apache.hadoop.ozone.protocol.commands.CloseContainerCommand;
import org.apache.hadoop.ozone.protocol.commands.CommandForDatanode;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
import static org.apache.hadoop.hdds.scm.events.SCMEvents.DATANODE_COMMAND;
import static org.apache.hadoop.hdds.scm.events.SCMEvents.CLOSE_CONTAINER_RETRYABLE_REQ;
/**
* In case of a node failure, volume failure, volume out of spapce, node
* out of space etc, CLOSE_CONTAINER will be triggered.
* CloseContainerEventHandler is the handler for CLOSE_CONTAINER.
* When a close container event is fired, a close command for the container
* should be sent to all the datanodes in the pipeline and containerStateManager
* needs to update the container state to Closing.
*/
public class CloseContainerEventHandler implements EventHandler<ContainerID> {
public static final Logger LOG =
LoggerFactory.getLogger(CloseContainerEventHandler.class);
private final ContainerManager containerManager;
public CloseContainerEventHandler(ContainerManager containerManager) {
this.containerManager = containerManager;
}
@Override
public void onMessage(ContainerID containerID, EventPublisher publisher) {
LOG.info("Close container Event triggered for container : {}",
containerID.getId());
ContainerWithPipeline containerWithPipeline;
ContainerInfo info;
try {
containerWithPipeline =
containerManager.getContainerWithPipeline(containerID.getId());
info = containerWithPipeline.getContainerInfo();
if (info == null) {
LOG.error("Failed to update the container state. Container with id : {}"
+ " does not exist", containerID.getId());
return;
}
} catch (IOException e) {
LOG.error("Failed to update the container state. Container with id : {} "
+ "does not exist", containerID.getId(), e);
return;
}
HddsProtos.LifeCycleState state = info.getState();
try {
switch (state) {
case ALLOCATED:
// We cannot close a container in ALLOCATED state, moving the
// container to CREATING state, this should eventually
// timeout and the container will be moved to DELETING state.
LOG.debug("Closing container {} in {} state", containerID, state);
containerManager.updateContainerState(containerID.getId(),
HddsProtos.LifeCycleEvent.CREATE);
break;
case CREATING:
// We cannot close a container in CREATING state, it will eventually
// timeout and moved to DELETING state.
LOG.debug("Closing container {} in {} state", containerID, state);
break;
case OPEN:
containerManager.updateContainerState(containerID.getId(),
HddsProtos.LifeCycleEvent.FINALIZE);
fireCloseContainerEvents(containerWithPipeline, info, publisher);
break;
case CLOSING:
fireCloseContainerEvents(containerWithPipeline, info, publisher);
break;
case CLOSED:
case DELETING:
case DELETED:
LOG.info(
"container with id : {} is in {} state and need not be closed.",
containerID.getId(), info.getState());
break;
default:
throw new IOException(
"Invalid container state for container " + containerID);
}
} catch (IOException ex) {
LOG.error("Failed to update the container state for" + "container : {}"
+ containerID, ex);
}
}
private void fireCloseContainerEvents(
ContainerWithPipeline containerWithPipeline, ContainerInfo info,
EventPublisher publisher) {
ContainerID containerID = info.containerID();
// fire events.
CloseContainerCommand closeContainerCommand =
new CloseContainerCommand(containerID.getId(),
info.getReplicationType(), info.getPipelineID());
Pipeline pipeline = containerWithPipeline.getPipeline();
pipeline.getMachines().stream().map(
datanode -> new CommandForDatanode<>(datanode.getUuid(),
closeContainerCommand)).forEach((command) -> {
publisher.fireEvent(DATANODE_COMMAND, command);
});
publisher.fireEvent(CLOSE_CONTAINER_RETRYABLE_REQ,
new CloseContainerRetryableReq(containerID));
LOG.trace("Issuing {} on Pipeline {} for container", closeContainerCommand,
pipeline, containerID);
}
/**
* Class to create retryable event. Prevents redundant requests for same
* container Id.
*/
public static class CloseContainerRetryableReq implements
IdentifiableEventPayload {
private ContainerID containerID;
public CloseContainerRetryableReq(ContainerID containerID) {
this.containerID = containerID;
}
public ContainerID getContainerID() {
return containerID;
}
@Override
public long getId() {
return containerID.getId();
}
}
}