blob: b8e4e897dcaf125a95a4431adbedaa2d2a3385c7 [file] [log] [blame]
/*
* Licensed to the Apache Software Foundation (ASF) under one or more
* contributor license agreements. See the NOTICE file distributed with this
* work for additional information regarding copyright ownership. The ASF
* licenses this file to you under the Apache License, Version 2.0 (the
* "License"); you may not use this file except in compliance with the License.
* You may obtain a copy of the License at
* <p>
* http://www.apache.org/licenses/LICENSE-2.0
* <p>
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS, WITHOUT
* WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. See the
* License for the specific language governing permissions and limitations under
* the License.
*/
package org.apache.hadoop.hdds.scm.container;
import com.google.common.annotations.VisibleForTesting;
import com.google.common.base.Preconditions;
import org.apache.hadoop.conf.Configuration;
import org.apache.hadoop.conf.StorageUnit;
import org.apache.hadoop.hdds.protocol.DatanodeDetails;
import org.apache.hadoop.hdds.scm.ScmConfigKeys;
import org.apache.hadoop.hdds.scm.container.common.helpers.ContainerWithPipeline;
import org.apache.hadoop.hdds.scm.container.common.helpers.ContainerInfo;
import org.apache.hadoop.hdds.scm.container.common.helpers.Pipeline;
import org.apache.hadoop.hdds.scm.container.replication.ReplicationRequest;
import org.apache.hadoop.hdds.scm.container.states.ContainerState;
import org.apache.hadoop.hdds.scm.container.states.ContainerStateMap;
import org.apache.hadoop.hdds.scm.exceptions.SCMException;
import org.apache.hadoop.hdds.scm.pipelines.PipelineSelector;
import org.apache.hadoop.hdds.protocol.proto.HddsProtos;
import org.apache.hadoop.hdds.protocol.proto.HddsProtos.LifeCycleEvent;
import org.apache.hadoop.hdds.protocol.proto.HddsProtos.LifeCycleState;
import org.apache.hadoop.hdds.protocol.proto.HddsProtos.ReplicationFactor;
import org.apache.hadoop.hdds.protocol.proto.HddsProtos.ReplicationType;
import org.apache.hadoop.ozone.common.statemachine
.InvalidStateTransitionException;
import org.apache.hadoop.ozone.common.statemachine.StateMachine;
import org.apache.hadoop.util.Time;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
import java.io.Closeable;
import java.io.IOException;
import java.util.ArrayList;
import java.util.HashSet;
import java.util.List;
import java.util.Map;
import java.util.NavigableSet;
import java.util.Set;
import java.util.concurrent.ConcurrentHashMap;
import java.util.concurrent.atomic.AtomicLong;
import static org.apache.hadoop.hdds.scm.exceptions.SCMException.ResultCodes
.FAILED_TO_CHANGE_CONTAINER_STATE;
/**
* A container state manager keeps track of container states and returns
* containers that match various queries.
* <p>
* This state machine is driven by a combination of server and client actions.
* <p>
* This is how a create container happens: 1. When a container is created, the
* Server(or SCM) marks that Container as ALLOCATED state. In this state, SCM
* has chosen a pipeline for container to live on. However, the container is not
* created yet. This container along with the pipeline is returned to the
* client.
* <p>
* 2. The client when it sees the Container state as ALLOCATED understands that
* container needs to be created on the specified pipeline. The client lets the
* SCM know that saw this flag and is initiating the on the data nodes.
* <p>
* This is done by calling into notifyObjectCreation(ContainerName,
* BEGIN_CREATE) flag. When SCM gets this call, SCM puts the container state
* into CREATING. All this state means is that SCM told Client to create a
* container and client saw that request.
* <p>
* 3. Then client makes calls to datanodes directly, asking the datanodes to
* create the container. This is done with the help of pipeline that supports
* this container.
* <p>
* 4. Once the creation of the container is complete, the client will make
* another call to the SCM, this time specifying the containerName and the
* COMPLETE_CREATE as the Event.
* <p>
* 5. With COMPLETE_CREATE event, the container moves to an Open State. This is
* the state when clients can write to a container.
* <p>
* 6. If the client does not respond with the COMPLETE_CREATE event with a
* certain time, the state machine times out and triggers a delete operation of
* the container.
* <p>
* Please see the function initializeStateMachine below to see how this looks in
* code.
* <p>
* Reusing existing container :
* <p>
* The create container call is not made all the time, the system tries to use
* open containers as much as possible. So in those cases, it looks thru the
* list of open containers and will return containers that match the specific
* signature.
* <p>
* Please note : Logically there are 3 separate state machines in the case of
* containers.
* <p>
* The Create State Machine -- Commented extensively above.
* <p>
* Open/Close State Machine - Once the container is in the Open State,
* eventually it will be closed, once sufficient data has been written to it.
* <p>
* TimeOut Delete Container State Machine - if the container creating times out,
* then Container State manager decides to delete the container.
*/
public class ContainerStateManager implements Closeable {
private static final Logger LOG =
LoggerFactory.getLogger(ContainerStateManager.class);
private final StateMachine<HddsProtos.LifeCycleState,
HddsProtos.LifeCycleEvent> stateMachine;
private final long containerSize;
private final ConcurrentHashMap<ContainerState, ContainerID> lastUsedMap;
private final ContainerStateMap containers;
private final AtomicLong containerCount;
/**
* Constructs a Container State Manager that tracks all containers owned by
* SCM for the purpose of allocation of blocks.
* <p>
* TODO : Add Container Tags so we know which containers are owned by SCM.
*/
@SuppressWarnings("unchecked")
public ContainerStateManager(Configuration configuration,
ContainerManager containerManager, PipelineSelector pipelineSelector) {
// Initialize the container state machine.
Set<HddsProtos.LifeCycleState> finalStates = new HashSet();
// These are the steady states of a container.
finalStates.add(LifeCycleState.OPEN);
finalStates.add(LifeCycleState.CLOSED);
finalStates.add(LifeCycleState.DELETED);
this.stateMachine = new StateMachine<>(LifeCycleState.ALLOCATED,
finalStates);
initializeStateMachine();
this.containerSize = (long) configuration.getStorageSize(
ScmConfigKeys.OZONE_SCM_CONTAINER_SIZE,
ScmConfigKeys.OZONE_SCM_CONTAINER_SIZE_DEFAULT,
StorageUnit.BYTES);
lastUsedMap = new ConcurrentHashMap<>();
containerCount = new AtomicLong(0);
containers = new ContainerStateMap();
loadExistingContainers(containerManager, pipelineSelector);
}
private void loadExistingContainers(ContainerManager containerManager,
PipelineSelector pipelineSelector) {
List<ContainerInfo> containerList;
try {
containerList = containerManager.listContainer(0, Integer.MAX_VALUE);
// if there are no container to load, let us return.
if (containerList == null || containerList.size() == 0) {
LOG.info("No containers to load for this cluster.");
return;
}
} catch (IOException e) {
if (!e.getMessage().equals("No container exists in current db")) {
LOG.error("Could not list the containers", e);
}
return;
}
try {
long maxID = 0;
for (ContainerInfo container : containerList) {
containers.addContainer(container);
pipelineSelector.addContainerToPipeline(
container.getPipelineID(), container.getContainerID());
if (maxID < container.getContainerID()) {
maxID = container.getContainerID();
}
containerCount.set(maxID);
}
} catch (SCMException ex) {
LOG.error("Unable to create a container information. ", ex);
// Fix me, what is the proper shutdown procedure for SCM ??
// System.exit(1) // Should we exit here?
}
}
/**
* Return the info of all the containers kept by the in-memory mapping.
*
* @return the list of all container info.
*/
public List<ContainerInfo> getAllContainers() {
List<ContainerInfo> list = new ArrayList<>();
//No Locking needed since the return value is an immutable map.
containers.getContainerMap().forEach((key, value) -> list.add(value));
return list;
}
/*
*
* Event and State Transition Mapping:
*
* State: ALLOCATED ---------------> CREATING
* Event: CREATE
*
* State: CREATING ---------------> OPEN
* Event: CREATED
*
* State: OPEN ---------------> CLOSING
* Event: FINALIZE
*
* State: CLOSING ---------------> CLOSED
* Event: CLOSE
*
* State: CLOSED ----------------> DELETING
* Event: DELETE
*
* State: DELETING ----------------> DELETED
* Event: CLEANUP
*
* State: CREATING ---------------> DELETING
* Event: TIMEOUT
*
*
* Container State Flow:
*
* [ALLOCATED]---->[CREATING]------>[OPEN]-------->[CLOSING]------->[CLOSED]
* (CREATE) | (CREATED) (FINALIZE) (CLOSE) |
* | |
* | |
* |(TIMEOUT) (DELETE)|
* | |
* +-------------> [DELETING] <-------------------+
* |
* |
* (CLEANUP)|
* |
* [DELETED]
*/
private void initializeStateMachine() {
stateMachine.addTransition(LifeCycleState.ALLOCATED,
LifeCycleState.CREATING,
LifeCycleEvent.CREATE);
stateMachine.addTransition(LifeCycleState.CREATING,
LifeCycleState.OPEN,
LifeCycleEvent.CREATED);
stateMachine.addTransition(LifeCycleState.OPEN,
LifeCycleState.CLOSING,
LifeCycleEvent.FINALIZE);
stateMachine.addTransition(LifeCycleState.CLOSING,
LifeCycleState.CLOSED,
LifeCycleEvent.CLOSE);
stateMachine.addTransition(LifeCycleState.CLOSED,
LifeCycleState.DELETING,
LifeCycleEvent.DELETE);
stateMachine.addTransition(LifeCycleState.CREATING,
LifeCycleState.DELETING,
LifeCycleEvent.TIMEOUT);
stateMachine.addTransition(LifeCycleState.DELETING,
LifeCycleState.DELETED,
LifeCycleEvent.CLEANUP);
}
/**
* allocates a new container based on the type, replication etc.
*
* @param selector -- Pipeline selector class.
* @param type -- Replication type.
* @param replicationFactor - Replication replicationFactor.
* @return ContainerWithPipeline
* @throws IOException on Failure.
*/
public ContainerWithPipeline allocateContainer(PipelineSelector selector,
HddsProtos.ReplicationType type,
HddsProtos.ReplicationFactor replicationFactor, String owner)
throws IOException {
Pipeline pipeline = selector.getReplicationPipeline(type,
replicationFactor);
Preconditions.checkNotNull(pipeline, "Pipeline type=%s/"
+ "replication=%s couldn't be found for the new container. "
+ "Do you have enough nodes?", type, replicationFactor);
long containerID = containerCount.incrementAndGet();
ContainerInfo containerInfo = new ContainerInfo.Builder()
.setState(HddsProtos.LifeCycleState.ALLOCATED)
.setPipelineID(pipeline.getId())
// This is bytes allocated for blocks inside container, not the
// container size
.setAllocatedBytes(0)
.setUsedBytes(0)
.setNumberOfKeys(0)
.setStateEnterTime(Time.monotonicNow())
.setOwner(owner)
.setContainerID(containerID)
.setDeleteTransactionId(0)
.setReplicationFactor(replicationFactor)
.setReplicationType(pipeline.getType())
.build();
selector.addContainerToPipeline(pipeline.getId(), containerID);
Preconditions.checkNotNull(containerInfo);
containers.addContainer(containerInfo);
LOG.trace("New container allocated: {}", containerInfo);
return new ContainerWithPipeline(containerInfo, pipeline);
}
/**
* Update the Container State to the next state.
*
* @param info - ContainerInfo
* @param event - LifeCycle Event
* @return Updated ContainerInfo.
* @throws SCMException on Failure.
*/
public ContainerInfo updateContainerState(ContainerInfo
info, HddsProtos.LifeCycleEvent event) throws SCMException {
LifeCycleState newState;
try {
newState = this.stateMachine.getNextState(info.getState(), event);
} catch (InvalidStateTransitionException ex) {
String error = String.format("Failed to update container state %s, " +
"reason: invalid state transition from state: %s upon " +
"event: %s.",
info.getContainerID(), info.getState(), event);
LOG.error(error);
throw new SCMException(error, FAILED_TO_CHANGE_CONTAINER_STATE);
}
// This is a post condition after executing getNextState.
Preconditions.checkNotNull(newState);
containers.updateState(info, info.getState(), newState);
return containers.getContainerInfo(info);
}
/**
* Update the container State.
* @param info - Container Info
* @return ContainerInfo
* @throws SCMException - on Error.
*/
public ContainerInfo updateContainerInfo(ContainerInfo info)
throws SCMException {
containers.updateContainerInfo(info);
return containers.getContainerInfo(info);
}
/**
* Update deleteTransactionId for a container.
*
* @param deleteTransactionMap maps containerId to its new
* deleteTransactionID
*/
public void updateDeleteTransactionId(Map<Long, Long> deleteTransactionMap) {
for (Map.Entry<Long, Long> entry : deleteTransactionMap.entrySet()) {
containers.getContainerMap().get(ContainerID.valueof(entry.getKey()))
.updateDeleteTransactionId(entry.getValue());
}
}
/**
* Return a container matching the attributes specified.
*
* @param size - Space needed in the Container.
* @param owner - Owner of the container - A specific nameservice.
* @param type - Replication Type {StandAlone, Ratis}
* @param factor - Replication Factor {ONE, THREE}
* @param state - State of the Container-- {Open, Allocated etc.}
* @return ContainerInfo, null if there is no match found.
*/
public ContainerInfo getMatchingContainer(final long size,
String owner, ReplicationType type, ReplicationFactor factor,
LifeCycleState state) {
// Find containers that match the query spec, if no match return null.
NavigableSet<ContainerID> matchingSet =
containers.getMatchingContainerIDs(state, owner, factor, type);
if (matchingSet == null || matchingSet.size() == 0) {
return null;
}
// Get the last used container and find container above the last used
// container ID.
ContainerState key = new ContainerState(owner, type, factor);
ContainerID lastID = lastUsedMap.get(key);
if (lastID == null) {
lastID = matchingSet.first();
}
// There is a small issue here. The first time, we will skip the first
// container. But in most cases it will not matter.
NavigableSet<ContainerID> resultSet = matchingSet.tailSet(lastID, false);
if (resultSet.size() == 0) {
resultSet = matchingSet;
}
ContainerInfo selectedContainer =
findContainerWithSpace(size, resultSet, owner);
if (selectedContainer == null) {
// If we did not find any space in the tailSet, we need to look for
// space in the headset, we need to pass true to deal with the
// situation that we have a lone container that has space. That is we
// ignored the last used container under the assumption we can find
// other containers with space, but if have a single container that is
// not true. Hence we need to include the last used container as the
// last element in the sorted set.
resultSet = matchingSet.headSet(lastID, true);
selectedContainer = findContainerWithSpace(size, resultSet, owner);
}
// Update the allocated Bytes on this container.
if (selectedContainer != null) {
selectedContainer.updateAllocatedBytes(size);
}
return selectedContainer;
}
private ContainerInfo findContainerWithSpace(long size,
NavigableSet<ContainerID> searchSet, String owner) {
// Get the container with space to meet our request.
for (ContainerID id : searchSet) {
ContainerInfo containerInfo = containers.getContainerInfo(id);
if (containerInfo.getAllocatedBytes() + size <= this.containerSize) {
containerInfo.updateLastUsedTime();
ContainerState key = new ContainerState(owner,
containerInfo.getReplicationType(),
containerInfo.getReplicationFactor());
lastUsedMap.put(key, containerInfo.containerID());
return containerInfo;
}
}
return null;
}
/**
* Returns a set of ContainerIDs that match the Container.
*
* @param owner Owner of the Containers.
* @param type - Replication Type of the containers
* @param factor - Replication factor of the containers.
* @param state - Current State, like Open, Close etc.
* @return Set of containers that match the specific query parameters.
*/
public NavigableSet<ContainerID> getMatchingContainerIDs(
String owner, ReplicationType type, ReplicationFactor factor,
LifeCycleState state) {
return containers.getMatchingContainerIDs(state, owner,
factor, type);
}
/**
* Returns the containerInfo with pipeline for the given container id.
* @param selector -- Pipeline selector class.
* @param containerID id of the container
* @return ContainerInfo containerInfo
* @throws IOException
*/
public ContainerWithPipeline getContainer(PipelineSelector selector,
ContainerID containerID) {
ContainerInfo info = containers.getContainerInfo(containerID.getId());
Pipeline pipeline = selector.getPipeline(info.getPipelineID());
return new ContainerWithPipeline(info, pipeline);
}
/**
* Returns the containerInfo for the given container id.
* @param containerID id of the container
* @return ContainerInfo containerInfo
* @throws IOException
*/
public ContainerInfo getContainer(ContainerID containerID) {
return containers.getContainerInfo(containerID);
}
@Override
public void close() throws IOException {
}
/**
* Returns the latest list of DataNodes where replica for given containerId
* exist. Throws an SCMException if no entry is found for given containerId.
*
* @param containerID
* @return Set<DatanodeDetails>
*/
public Set<DatanodeDetails> getContainerReplicas(ContainerID containerID)
throws SCMException {
return containers.getContainerReplicas(containerID);
}
/**
* Add a container Replica for given DataNode.
*
* @param containerID
* @param dn
*/
public void addContainerReplica(ContainerID containerID, DatanodeDetails dn) {
containers.addContainerReplica(containerID, dn);
}
/**
* Remove a container Replica for given DataNode.
*
* @param containerID
* @param dn
* @return True of dataNode is removed successfully else false.
*/
public boolean removeContainerReplica(ContainerID containerID,
DatanodeDetails dn) throws SCMException {
return containers.removeContainerReplica(containerID, dn);
}
/**
* Compare the existing replication number with the expected one.
*/
public ReplicationRequest checkReplicationState(ContainerID containerID)
throws SCMException {
int existingReplicas = getContainerReplicas(containerID).size();
int expectedReplicas = getContainer(containerID)
.getReplicationFactor().getNumber();
if (existingReplicas != expectedReplicas) {
return new ReplicationRequest(containerID.getId(), existingReplicas,
expectedReplicas);
}
return null;
}
/**
* Checks if the container is open.
*/
public boolean isOpen(ContainerID containerID) {
Preconditions.checkNotNull(containerID);
ContainerInfo container = Preconditions
.checkNotNull(getContainer(containerID),
"Container can't be found " + containerID);
return container.isContainerOpen();
}
@VisibleForTesting
public ContainerStateMap getContainerStateMap() {
return containers;
}
}