blob: f8ffc0253316c7c17a97094ee3a0cf0e4d4cd590 [file] [log] [blame]
/*
* Licensed to the Apache Software Foundation (ASF) under one or more
* contributor license agreements. See the NOTICE file distributed with this
* work for additional information regarding copyright ownership. The ASF
* licenses this file to you under the Apache License, Version 2.0 (the
* "License"); you may not use this file except in compliance with the License.
* You may obtain a copy of the License at
* <p>
* <p>http://www.apache.org/licenses/LICENSE-2.0
* <p>
* <p>Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS, WITHOUT
* WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. See the
* License for the specific language governing permissions and limitations under
* the License.
*/
package org.apache.hadoop.hdds.scm.container;
import java.io.IOException;
import java.util.ArrayList;
import java.util.Collection;
import java.util.Collections;
import java.util.Iterator;
import java.util.List;
import java.util.Map;
import java.util.NavigableSet;
import java.util.Objects;
import java.util.Set;
import java.util.concurrent.locks.Lock;
import java.util.concurrent.locks.ReentrantLock;
import java.util.stream.Collectors;
import org.apache.hadoop.hdds.conf.ConfigurationSource;
import org.apache.hadoop.hdds.protocol.proto.HddsProtos;
import org.apache.hadoop.hdds.protocol.proto.HddsProtos.LifeCycleState;
import org.apache.hadoop.hdds.protocol.proto.HddsProtos.ReplicationFactor;
import org.apache.hadoop.hdds.protocol.proto.HddsProtos.ReplicationType;
import org.apache.hadoop.hdds.scm.ScmConfigKeys;
import org.apache.hadoop.hdds.scm.container.metrics.SCMContainerManagerMetrics;
import org.apache.hadoop.hdds.scm.exceptions.SCMException;
import org.apache.hadoop.hdds.scm.pipeline.Pipeline;
import org.apache.hadoop.hdds.scm.pipeline.PipelineManager;
import org.apache.hadoop.hdds.scm.pipeline.PipelineNotFoundException;
import org.apache.hadoop.hdds.utils.db.BatchOperation;
import org.apache.hadoop.hdds.utils.db.BatchOperationHandler;
import org.apache.hadoop.hdds.utils.db.Table;
import org.apache.hadoop.hdds.utils.db.Table.KeyValue;
import org.apache.hadoop.hdds.utils.db.TableIterator;
import com.google.common.annotations.VisibleForTesting;
import com.google.common.base.Preconditions;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
import static org.apache.hadoop.hdds.scm.exceptions.SCMException.ResultCodes.FAILED_TO_FIND_CONTAINER;
/**
* ContainerManager class contains the mapping from a name to a pipeline
* mapping. This is used by SCM when allocating new locations and when
* looking up a key.
*/
public class SCMContainerManager implements ContainerManager {
private static final Logger LOG = LoggerFactory.getLogger(
SCMContainerManager.class);
private final Lock lock;
private final PipelineManager pipelineManager;
private final ContainerStateManager containerStateManager;
private final int numContainerPerVolume;
private final SCMContainerManagerMetrics scmContainerManagerMetrics;
private Table<ContainerID, ContainerInfo> containerStore;
private BatchOperationHandler batchHandler;
/**
* Constructs a mapping class that creates mapping between container names
* and pipelines.
* <p>
* passed to LevelDB and this memory is allocated in Native code space.
* CacheSize is specified
* in MB.
*
* @param conf - {@link ConfigurationSource}
* @param pipelineManager - {@link PipelineManager}
* @throws IOException on Failure.
*/
public SCMContainerManager(
final ConfigurationSource conf,
Table<ContainerID, ContainerInfo> containerStore,
BatchOperationHandler batchHandler,
PipelineManager pipelineManager)
throws IOException {
this.batchHandler = batchHandler;
this.containerStore = containerStore;
this.lock = new ReentrantLock();
this.pipelineManager = pipelineManager;
this.containerStateManager = new ContainerStateManager(conf);
this.numContainerPerVolume = conf
.getInt(ScmConfigKeys.OZONE_SCM_PIPELINE_OWNER_CONTAINER_COUNT,
ScmConfigKeys.OZONE_SCM_PIPELINE_OWNER_CONTAINER_COUNT_DEFAULT);
loadExistingContainers();
scmContainerManagerMetrics = SCMContainerManagerMetrics.create();
}
private int getOpenContainerCountPerPipeline(Pipeline pipeline) {
int minContainerCountPerDn = numContainerPerVolume *
pipelineManager.minHealthyVolumeNum(pipeline);
int minPipelineCountPerDn = pipelineManager.minPipelineLimit(pipeline);
return (int) Math.ceil(
((double) minContainerCountPerDn / minPipelineCountPerDn));
}
private void loadExistingContainers() throws IOException {
TableIterator<ContainerID, ? extends KeyValue<ContainerID, ContainerInfo>>
iterator = containerStore.iterator();
while (iterator.hasNext()) {
ContainerInfo container = iterator.next().getValue();
Preconditions.checkNotNull(container);
containerStateManager.loadContainer(container);
try {
if (container.getState() == LifeCycleState.OPEN) {
pipelineManager.addContainerToPipeline(container.getPipelineID(),
ContainerID.valueof(container.getContainerID()));
}
} catch (PipelineNotFoundException ex) {
LOG.warn("Found a Container {} which is in {} state with pipeline {} " +
"that does not exist. Closing Container.", container,
container.getState(), container.getPipelineID());
updateContainerState(container.containerID(),
HddsProtos.LifeCycleEvent.FINALIZE, true);
}
}
}
@VisibleForTesting
public ContainerStateManager getContainerStateManager() {
return containerStateManager;
}
@Override
public Set<ContainerID> getContainerIDs() {
lock.lock();
try {
return containerStateManager.getAllContainerIDs();
} finally {
lock.unlock();
}
}
@Override
public List<ContainerInfo> getContainers() {
lock.lock();
try {
return containerStateManager.getAllContainerIDs().stream().map(id -> {
try {
return containerStateManager.getContainer(id);
} catch (ContainerNotFoundException e) {
// How can this happen?
return null;
}
}).filter(Objects::nonNull).collect(Collectors.toList());
} finally {
lock.unlock();
}
}
@Override
public List<ContainerInfo> getContainers(LifeCycleState state) {
lock.lock();
try {
return containerStateManager.getContainerIDsByState(state).stream()
.map(id -> {
try {
return containerStateManager.getContainer(id);
} catch (ContainerNotFoundException e) {
// How can this happen?
return null;
}
}).filter(Objects::nonNull).collect(Collectors.toList());
} finally {
lock.unlock();
}
}
/**
* Get number of containers in the given state.
*
* @param state {@link LifeCycleState}
* @return Count
*/
public Integer getContainerCountByState(LifeCycleState state) {
return containerStateManager.getContainerCountByState(state);
}
/**
* {@inheritDoc}
*/
@Override
public ContainerInfo getContainer(final ContainerID containerID)
throws ContainerNotFoundException {
return containerStateManager.getContainer(containerID);
}
@Override
public boolean exists(ContainerID containerID) {
lock.lock();
try {
return (containerStateManager.getContainer(containerID) != null);
} catch (ContainerNotFoundException e) {
return false;
} finally {
lock.unlock();
}
}
/**
* {@inheritDoc}
*/
@Override
public List<ContainerInfo> listContainer(ContainerID startContainerID,
int count) {
lock.lock();
try {
scmContainerManagerMetrics.incNumListContainersOps();
final long startId = startContainerID == null ?
0 : startContainerID.getId();
final List<ContainerID> containersIds =
new ArrayList<>(containerStateManager.getAllContainerIDs());
Collections.sort(containersIds);
return containersIds.stream()
.filter(id -> id.getId() > startId)
.limit(count)
.map(id -> {
try {
return containerStateManager.getContainer(id);
} catch (ContainerNotFoundException ex) {
// This can never happen, as we hold lock no one else can remove
// the container after we got the container ids.
LOG.warn("Container Missing.", ex);
return null;
}
}).collect(Collectors.toList());
} finally {
lock.unlock();
}
}
/**
* Allocates a new container.
*
* @param replicationFactor - replication factor of the container.
* @param owner - The string name of the Service that owns this container.
* @return - Pipeline that makes up this container.
* @throws IOException - Exception
*/
@Override
public ContainerInfo allocateContainer(final ReplicationType type,
final ReplicationFactor replicationFactor, final String owner)
throws IOException {
try {
lock.lock();
ContainerInfo containerInfo = null;
try {
containerInfo =
containerStateManager.allocateContainer(pipelineManager, type,
replicationFactor, owner);
} catch (IOException ex) {
scmContainerManagerMetrics.incNumFailureCreateContainers();
throw ex;
}
// Add container to DB.
try {
addContainerToDB(containerInfo);
} catch (IOException ex) {
// When adding to DB failed, we are removing from containerStateMap.
// We should also remove from pipeline2Container Map in
// PipelineStateManager.
pipelineManager.removeContainerFromPipeline(
containerInfo.getPipelineID(),
new ContainerID(containerInfo.getContainerID()));
throw ex;
}
return containerInfo;
} finally {
lock.unlock();
}
}
/**
* Deletes a container from SCM.
*
* @param containerID - Container ID
* @throws IOException if container doesn't exist or container store failed
* to delete the
* specified key.
*/
@Override
public void deleteContainer(ContainerID containerID) throws IOException {
lock.lock();
try {
containerStateManager.removeContainer(containerID);
if (containerStore.get(containerID) != null) {
containerStore.delete(containerID);
} else {
// Where did the container go? o_O
LOG.warn("Unable to remove the container {} from container store," +
" it's missing!", containerID);
}
scmContainerManagerMetrics.incNumSuccessfulDeleteContainers();
} catch (ContainerNotFoundException cnfe) {
scmContainerManagerMetrics.incNumFailureDeleteContainers();
throw new SCMException(
"Failed to delete container " + containerID + ", reason : " +
"container doesn't exist.",
FAILED_TO_FIND_CONTAINER);
} finally {
lock.unlock();
}
}
/**
* {@inheritDoc} Used by client to update container state on SCM.
*/
@Override
public HddsProtos.LifeCycleState updateContainerState(
ContainerID containerID, HddsProtos.LifeCycleEvent event)
throws IOException {
// Should we return the updated ContainerInfo instead of LifeCycleState?
return updateContainerState(containerID, event, false);
}
private HddsProtos.LifeCycleState updateContainerState(
ContainerID containerID, HddsProtos.LifeCycleEvent event,
boolean skipPipelineToContainerRemove)
throws IOException {
// Should we return the updated ContainerInfo instead of LifeCycleState?
lock.lock();
try {
final ContainerInfo container = containerStateManager
.getContainer(containerID);
final LifeCycleState oldState = container.getState();
containerStateManager.updateContainerState(containerID, event);
final LifeCycleState newState = container.getState();
if (!skipPipelineToContainerRemove) {
if (oldState == LifeCycleState.OPEN &&
newState != LifeCycleState.OPEN) {
try {
pipelineManager
.removeContainerFromPipeline(container.getPipelineID(),
containerID);
} catch (PipelineNotFoundException e) {
LOG.warn("Unable to remove container {} from pipeline {} " +
" as the pipeline no longer exists",
containerID, container.getPipelineID());
}
}
}
if (newState == LifeCycleState.DELETED) {
containerStore.delete(containerID);
} else {
containerStore.put(containerID, container);
}
return newState;
} catch (ContainerNotFoundException cnfe) {
throw new SCMException(
"Failed to update container state"
+ containerID
+ ", reason : container doesn't exist.",
FAILED_TO_FIND_CONTAINER);
} finally {
lock.unlock();
}
}
/**
* Update deleteTransactionId according to deleteTransactionMap.
*
* @param deleteTransactionMap Maps the containerId to latest delete
* transaction id for the container.
* @throws IOException
*/
public void updateDeleteTransactionId(Map<Long, Long> deleteTransactionMap)
throws IOException {
if (deleteTransactionMap == null) {
return;
}
lock.lock();
try(BatchOperation batchOperation = batchHandler.initBatchOperation()) {
for (Map.Entry< Long, Long > entry : deleteTransactionMap.entrySet()) {
long containerID = entry.getKey();
ContainerID containerIdObject = new ContainerID(containerID);
ContainerInfo containerInfo =
containerStore.get(containerIdObject);
try {
ContainerInfo containerInfoInMem = containerStateManager
.getContainer(containerIdObject);
if (containerInfo == null || containerInfoInMem == null) {
throw new SCMException("Failed to increment number of deleted " +
"blocks for container " + containerID + ", reason : " +
"container doesn't exist.", FAILED_TO_FIND_CONTAINER);
}
containerInfo.updateDeleteTransactionId(entry.getValue());
containerInfo.setNumberOfKeys(containerInfoInMem.getNumberOfKeys());
containerInfo.setUsedBytes(containerInfoInMem.getUsedBytes());
containerStore.putWithBatch(batchOperation, containerIdObject,
containerInfo);
} catch (ContainerNotFoundException ex) {
// Container is not present therefore we don't need to update
// transaction id for this container.
LOG.warn(
"Failed to update the transaction Id as container: " + containerID
+ " for transaction: " + entry.getValue()
+ " does not exists");
}
}
batchHandler.commitBatchOperation(batchOperation);
containerStateManager.updateDeleteTransactionId(deleteTransactionMap);
} finally {
lock.unlock();
}
}
/**
* Return a container matching the attributes specified.
*
* @param sizeRequired - Space needed in the Container.
* @param owner - Owner of the container - A specific nameservice.
* @param pipeline - Pipeline to which the container should belong.
* @return ContainerInfo, null if there is no match found.
*/
public ContainerInfo getMatchingContainer(final long sizeRequired,
String owner, Pipeline pipeline) {
return getMatchingContainer(sizeRequired, owner, pipeline,
Collections.emptySet());
}
@SuppressWarnings("squid:S2445")
public ContainerInfo getMatchingContainer(final long sizeRequired,
String owner, Pipeline pipeline,
Collection<ContainerID>
excludedContainers) {
NavigableSet<ContainerID> containerIDs;
ContainerInfo containerInfo;
try {
synchronized (pipeline) {
containerIDs = getContainersForOwner(pipeline, owner);
if (containerIDs.size() < getOpenContainerCountPerPipeline(pipeline)) {
containerInfo =
containerStateManager.allocateContainer(
pipelineManager, owner, pipeline);
// Add to DB
addContainerToDB(containerInfo);
} else {
containerIDs.removeAll(excludedContainers);
containerInfo =
containerStateManager.getMatchingContainer(
sizeRequired, owner, pipeline.getId(), containerIDs);
if (containerInfo == null) {
containerInfo =
containerStateManager.
allocateContainer(pipelineManager, owner,
pipeline);
// Add to DB
addContainerToDB(containerInfo);
}
}
containerStateManager.updateLastUsedMap(pipeline.getId(),
containerInfo.containerID(), owner);
// TODO: #CLUTIL cleanup entries in lastUsedMap
return containerInfo;
}
} catch (Exception e) {
LOG.warn("Container allocation failed for pipeline={} requiredSize={}.",
pipeline, sizeRequired, e);
return null;
}
}
/**
* Add newly allocated container to container DB.
* @param containerInfo
* @throws IOException
*/
protected void addContainerToDB(ContainerInfo containerInfo)
throws IOException {
try {
containerStore
.put(new ContainerID(containerInfo.getContainerID()), containerInfo);
// Incrementing here, as allocateBlock to create a container calls
// getMatchingContainer() and finally calls this API to add newly
// created container to DB.
// Even allocateContainer calls this API to add newly allocated
// container to DB. So we need to increment metrics here.
scmContainerManagerMetrics.incNumSuccessfulCreateContainers();
} catch (IOException ex) {
// If adding to containerStore fails, we should remove the container
// from in-memory map.
scmContainerManagerMetrics.incNumFailureCreateContainers();
LOG.error("Add Container to DB failed for ContainerID #{}",
containerInfo.getContainerID());
try {
containerStateManager.removeContainer(containerInfo.containerID());
} catch (ContainerNotFoundException cnfe) {
// This should not happen, as we are removing after adding in to
// container state cmap.
}
throw ex;
}
}
/**
* Returns the container ID's matching with specified owner.
* @param pipeline
* @param owner
* @return NavigableSet<ContainerID>
*/
private NavigableSet<ContainerID> getContainersForOwner(
Pipeline pipeline, String owner) throws IOException {
NavigableSet<ContainerID> containerIDs =
pipelineManager.getContainersInPipeline(pipeline.getId());
Iterator<ContainerID> containerIDIterator = containerIDs.iterator();
while (containerIDIterator.hasNext()) {
ContainerID cid = containerIDIterator.next();
try {
if (!getContainer(cid).getOwner().equals(owner)) {
containerIDIterator.remove();
}
} catch (ContainerNotFoundException e) {
LOG.error("Could not find container info for container id={}.", cid,
e);
containerIDIterator.remove();
}
}
return containerIDs;
}
/**
* Returns the latest list of DataNodes where replica for given containerId
* exist. Throws an SCMException if no entry is found for given containerId.
*
* @param containerID
* @return Set<DatanodeDetails>
*/
public Set<ContainerReplica> getContainerReplicas(
final ContainerID containerID) throws ContainerNotFoundException {
return containerStateManager.getContainerReplicas(containerID);
}
/**
* Add a container Replica for given DataNode.
*
* @param containerID
* @param replica
*/
public void updateContainerReplica(final ContainerID containerID,
final ContainerReplica replica) throws ContainerNotFoundException {
containerStateManager.updateContainerReplica(containerID, replica);
}
/**
* Remove a container Replica for given DataNode.
*
* @param containerID
* @param replica
* @return True of dataNode is removed successfully else false.
*/
public void removeContainerReplica(final ContainerID containerID,
final ContainerReplica replica)
throws ContainerNotFoundException, ContainerReplicaNotFoundException {
containerStateManager.removeContainerReplica(containerID, replica);
}
/**
* Closes this stream and releases any system resources associated with it.
* If the stream is
* already closed then invoking this method has no effect.
* <p>
* <p>As noted in {@link AutoCloseable#close()}, cases where the close may
* fail require careful
* attention. It is strongly advised to relinquish the underlying resources
* and to internally
* <em>mark</em> the {@code Closeable} as closed, prior to throwing the
* {@code IOException}.
*
* @throws IOException if an I/O error occurs
*/
@Override
public void close() throws IOException {
if (containerStateManager != null) {
containerStateManager.close();
}
if (scmContainerManagerMetrics != null) {
this.scmContainerManagerMetrics.unRegister();
}
}
public void notifyContainerReportProcessing(boolean isFullReport,
boolean success) {
if (isFullReport) {
if (success) {
scmContainerManagerMetrics.incNumContainerReportsProcessedSuccessful();
} else {
scmContainerManagerMetrics.incNumContainerReportsProcessedFailed();
}
} else {
if (success) {
scmContainerManagerMetrics.incNumICRReportsProcessedSuccessful();
} else {
scmContainerManagerMetrics.incNumICRReportsProcessedFailed();
}
}
}
protected PipelineManager getPipelineManager() {
return pipelineManager;
}
public Lock getLock() {
return lock;
}
@VisibleForTesting
public Table<ContainerID, ContainerInfo> getContainerStore() {
return this.containerStore;
}
}