blob: df26e3608a44f9b351174dae5955377e196e13f2 [file] [log] [blame]
/*
* Licensed to the Apache Software Foundation (ASF) under one or more
* contributor license agreements. See the NOTICE file distributed with this
* work for additional information regarding copyright ownership. The ASF
* licenses this file to you under the Apache License, Version 2.0 (the
* "License"); you may not use this file except in compliance with the License.
* You may obtain a copy of the License at
* <p>
* <p>http://www.apache.org/licenses/LICENSE-2.0
* <p>
* <p>Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS, WITHOUT
* WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. See the
* License for the specific language governing permissions and limitations under
* the License.
*/
package org.apache.hadoop.hdds.scm.container;
import com.google.common.annotations.VisibleForTesting;
import com.google.common.base.Preconditions;
import com.google.common.primitives.Longs;
import org.apache.hadoop.conf.Configuration;
import org.apache.hadoop.conf.StorageUnit;
import org.apache.hadoop.hdds.protocol.DatanodeDetails;
import org.apache.hadoop.hdds.protocol.proto.HddsProtos.LifeCycleEvent;
import org.apache.hadoop.hdds.protocol.proto.HddsProtos.LifeCycleState;
import org.apache.hadoop.hdds.protocol.proto.HddsProtos.SCMContainerInfo;
import org.apache.hadoop.hdds.scm.block.PendingDeleteStatusList;
import org.apache.hadoop.hdds.scm.container.common.helpers.ContainerWithPipeline;
import org.apache.hadoop.hdds.scm.container.common.helpers.Pipeline;
import org.apache.hadoop.hdds.scm.ScmConfigKeys;
import org.apache.hadoop.hdds.scm.container.common.helpers.ContainerInfo;
import org.apache.hadoop.hdds.scm.container.common.helpers.PipelineID;
import org.apache.hadoop.hdds.scm.events.SCMEvents;
import org.apache.hadoop.hdds.scm.exceptions.SCMException;
import org.apache.hadoop.hdds.scm.node.NodeManager;
import org.apache.hadoop.hdds.scm.pipelines.PipelineSelector;
import org.apache.hadoop.hdds.protocol.proto.HddsProtos;
import org.apache.hadoop.hdds.protocol.proto.HddsProtos.ReplicationFactor;
import org.apache.hadoop.hdds.protocol.proto.HddsProtos.ReplicationType;
import org.apache.hadoop.hdds.protocol.proto
.StorageContainerDatanodeProtocolProtos;
import org.apache.hadoop.hdds.protocol.proto
.StorageContainerDatanodeProtocolProtos.ContainerReportsProto;
import org.apache.hadoop.hdds.server.events.EventPublisher;
import org.apache.hadoop.ozone.OzoneConsts;
import org.apache.hadoop.ozone.lease.Lease;
import org.apache.hadoop.ozone.lease.LeaseException;
import org.apache.hadoop.ozone.lease.LeaseManager;
import org.apache.hadoop.utils.BatchOperation;
import org.apache.hadoop.utils.MetadataStore;
import org.apache.hadoop.utils.MetadataStoreBuilder;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
import java.io.File;
import java.io.IOException;
import java.nio.charset.Charset;
import java.util.ArrayList;
import java.util.List;
import java.util.Map;
import java.util.Set;
import java.util.concurrent.TimeUnit;
import java.util.concurrent.locks.Lock;
import java.util.concurrent.locks.ReentrantLock;
import static org.apache.hadoop.hdds.scm.ScmConfigKeys
.OZONE_SCM_CONTAINER_SIZE_DEFAULT;
import static org.apache.hadoop.hdds.scm.ScmConfigKeys
.OZONE_SCM_CONTAINER_SIZE;
import static org.apache.hadoop.hdds.scm.exceptions.SCMException.ResultCodes
.FAILED_TO_CHANGE_CONTAINER_STATE;
import static org.apache.hadoop.hdds.server.ServerUtils.getOzoneMetaDirPath;
import static org.apache.hadoop.ozone.OzoneConsts.SCM_CONTAINER_DB;
/**
* ContainerManager class contains the mapping from a name to a pipeline
* mapping. This is used by SCM when allocating new locations and when
* looking up a key.
*/
public class SCMContainerManager implements ContainerManager {
private static final Logger LOG = LoggerFactory.getLogger(SCMContainerManager
.class);
private final NodeManager nodeManager;
private final long cacheSize;
private final Lock lock;
private final Charset encoding = Charset.forName("UTF-8");
private final MetadataStore containerStore;
private final PipelineSelector pipelineSelector;
private final ContainerStateManager containerStateManager;
private final LeaseManager<ContainerInfo> containerLeaseManager;
private final EventPublisher eventPublisher;
private final long size;
/**
* Constructs a mapping class that creates mapping between container names
* and pipelines.
*
* @param nodeManager - NodeManager so that we can get the nodes that are
* healthy to place new
* containers.
* @param cacheSizeMB - Amount of memory reserved for the LSM tree to cache
* its nodes. This is
* passed to LevelDB and this memory is allocated in Native code space.
* CacheSize is specified
* in MB.
* @throws IOException on Failure.
*/
@SuppressWarnings("unchecked")
public SCMContainerManager(
final Configuration conf, final NodeManager nodeManager, final int
cacheSizeMB, EventPublisher eventPublisher) throws IOException {
this.nodeManager = nodeManager;
this.cacheSize = cacheSizeMB;
File metaDir = getOzoneMetaDirPath(conf);
// Write the container name to pipeline mapping.
File containerDBPath = new File(metaDir, SCM_CONTAINER_DB);
containerStore =
MetadataStoreBuilder.newBuilder()
.setConf(conf)
.setDbFile(containerDBPath)
.setCacheSize(this.cacheSize * OzoneConsts.MB)
.build();
this.lock = new ReentrantLock();
size = (long)conf.getStorageSize(OZONE_SCM_CONTAINER_SIZE,
OZONE_SCM_CONTAINER_SIZE_DEFAULT, StorageUnit.BYTES);
this.pipelineSelector = new PipelineSelector(nodeManager,
conf, eventPublisher, cacheSizeMB);
this.containerStateManager =
new ContainerStateManager(conf, this, pipelineSelector);
LOG.trace("Container State Manager created.");
this.eventPublisher = eventPublisher;
long containerCreationLeaseTimeout = conf.getTimeDuration(
ScmConfigKeys.OZONE_SCM_CONTAINER_CREATION_LEASE_TIMEOUT,
ScmConfigKeys.OZONE_SCM_CONTAINER_CREATION_LEASE_TIMEOUT_DEFAULT,
TimeUnit.MILLISECONDS);
containerLeaseManager = new LeaseManager<>("ContainerCreation",
containerCreationLeaseTimeout);
containerLeaseManager.start();
}
/**
* {@inheritDoc}
*/
@Override
public ContainerInfo getContainer(final long containerID) throws
IOException {
ContainerInfo containerInfo;
lock.lock();
try {
byte[] containerBytes = containerStore.get(
Longs.toByteArray(containerID));
if (containerBytes == null) {
throw new SCMException(
"Specified key does not exist. key : " + containerID,
SCMException.ResultCodes.FAILED_TO_FIND_CONTAINER);
}
HddsProtos.SCMContainerInfo temp = HddsProtos.SCMContainerInfo.PARSER
.parseFrom(containerBytes);
containerInfo = ContainerInfo.fromProtobuf(temp);
return containerInfo;
} finally {
lock.unlock();
}
}
/**
* Returns the ContainerInfo and pipeline from the containerID. If container
* has no available replicas in datanodes it returns pipeline with no
* datanodes and empty leaderID . Pipeline#isEmpty can be used to check for
* an empty pipeline.
*
* @param containerID - ID of container.
* @return - ContainerWithPipeline such as creation state and the pipeline.
* @throws IOException
*/
@Override
public ContainerWithPipeline getContainerWithPipeline(long containerID)
throws IOException {
ContainerInfo contInfo;
lock.lock();
try {
byte[] containerBytes = containerStore.get(
Longs.toByteArray(containerID));
if (containerBytes == null) {
throw new SCMException(
"Specified key does not exist. key : " + containerID,
SCMException.ResultCodes.FAILED_TO_FIND_CONTAINER);
}
HddsProtos.SCMContainerInfo temp = HddsProtos.SCMContainerInfo.PARSER
.parseFrom(containerBytes);
contInfo = ContainerInfo.fromProtobuf(temp);
Pipeline pipeline;
String leaderId = "";
if (contInfo.isContainerOpen()) {
// If pipeline with given pipeline Id already exist return it
pipeline = pipelineSelector.getPipeline(contInfo.getPipelineID());
} else {
// For close containers create pipeline from datanodes with replicas
Set<DatanodeDetails> dnWithReplicas = containerStateManager
.getContainerReplicas(contInfo.containerID());
if (!dnWithReplicas.isEmpty()) {
leaderId = dnWithReplicas.iterator().next().getUuidString();
}
pipeline = new Pipeline(leaderId, contInfo.getState(),
ReplicationType.STAND_ALONE, contInfo.getReplicationFactor(),
PipelineID.randomId());
dnWithReplicas.forEach(pipeline::addMember);
}
return new ContainerWithPipeline(contInfo, pipeline);
} finally {
lock.unlock();
}
}
/**
* {@inheritDoc}
*/
@Override
public List<ContainerInfo> listContainer(long startContainerID,
int count) throws IOException {
List<ContainerInfo> containerList = new ArrayList<>();
lock.lock();
try {
if (containerStore.isEmpty()) {
throw new IOException("No container exists in current db");
}
byte[] startKey = startContainerID <= 0 ? null :
Longs.toByteArray(startContainerID);
List<Map.Entry<byte[], byte[]>> range =
containerStore.getSequentialRangeKVs(startKey, count, null);
// Transform the values into the pipelines.
// TODO: filter by container state
for (Map.Entry<byte[], byte[]> entry : range) {
ContainerInfo containerInfo =
ContainerInfo.fromProtobuf(
HddsProtos.SCMContainerInfo.PARSER.parseFrom(
entry.getValue()));
Preconditions.checkNotNull(containerInfo);
containerList.add(containerInfo);
}
} finally {
lock.unlock();
}
return containerList;
}
/**
* Allocates a new container.
*
* @param replicationFactor - replication factor of the container.
* @param owner - The string name of the Service that owns this container.
* @return - Pipeline that makes up this container.
* @throws IOException - Exception
*/
@Override
public ContainerWithPipeline allocateContainer(
ReplicationType type,
ReplicationFactor replicationFactor,
String owner)
throws IOException {
ContainerInfo containerInfo;
ContainerWithPipeline containerWithPipeline;
lock.lock();
try {
containerWithPipeline = containerStateManager.allocateContainer(
pipelineSelector, type, replicationFactor, owner);
containerInfo = containerWithPipeline.getContainerInfo();
byte[] containerIDBytes = Longs.toByteArray(
containerInfo.getContainerID());
containerStore.put(containerIDBytes, containerInfo.getProtobuf()
.toByteArray());
} finally {
lock.unlock();
}
return containerWithPipeline;
}
/**
* Deletes a container from SCM.
*
* @param containerID - Container ID
* @throws IOException if container doesn't exist or container store failed
* to delete the
* specified key.
*/
@Override
public void deleteContainer(long containerID) throws IOException {
lock.lock();
try {
byte[] dbKey = Longs.toByteArray(containerID);
byte[] containerBytes = containerStore.get(dbKey);
if (containerBytes == null) {
throw new SCMException(
"Failed to delete container " + containerID + ", reason : " +
"container doesn't exist.",
SCMException.ResultCodes.FAILED_TO_FIND_CONTAINER);
}
containerStore.delete(dbKey);
} finally {
lock.unlock();
}
}
/**
* {@inheritDoc} Used by client to update container state on SCM.
*/
@Override
public HddsProtos.LifeCycleState updateContainerState(
long containerID, HddsProtos.LifeCycleEvent event) throws
IOException {
ContainerInfo containerInfo;
lock.lock();
try {
byte[] dbKey = Longs.toByteArray(containerID);
byte[] containerBytes = containerStore.get(dbKey);
if (containerBytes == null) {
throw new SCMException(
"Failed to update container state"
+ containerID
+ ", reason : container doesn't exist.",
SCMException.ResultCodes.FAILED_TO_FIND_CONTAINER);
}
containerInfo =
ContainerInfo.fromProtobuf(HddsProtos.SCMContainerInfo.PARSER
.parseFrom(containerBytes));
Preconditions.checkNotNull(containerInfo);
switch (event) {
case CREATE:
// Acquire lease on container
Lease<ContainerInfo> containerLease =
containerLeaseManager.acquire(containerInfo);
// Register callback to be executed in case of timeout
containerLease.registerCallBack(() -> {
updateContainerState(containerID,
HddsProtos.LifeCycleEvent.TIMEOUT);
return null;
});
break;
case CREATED:
// Release the lease on container
containerLeaseManager.release(containerInfo);
break;
case FINALIZE:
// TODO: we don't need a lease manager here for closing as the
// container report will include the container state after HDFS-13008
// If a client failed to update the container close state, DN container
// report from 3 DNs will be used to close the container eventually.
break;
case CLOSE:
break;
case UPDATE:
break;
case DELETE:
break;
case TIMEOUT:
break;
case CLEANUP:
break;
default:
throw new SCMException("Unsupported container LifeCycleEvent.",
FAILED_TO_CHANGE_CONTAINER_STATE);
}
// If the below updateContainerState call fails, we should revert the
// changes made in switch case.
// Like releasing the lease in case of BEGIN_CREATE.
ContainerInfo updatedContainer = containerStateManager
.updateContainerState(containerInfo, event);
if (!updatedContainer.isContainerOpen()) {
pipelineSelector.removeContainerFromPipeline(
containerInfo.getPipelineID(), containerID);
}
containerStore.put(dbKey, updatedContainer.getProtobuf().toByteArray());
return updatedContainer.getState();
} catch (LeaseException e) {
throw new IOException("Lease Exception.", e);
} finally {
lock.unlock();
}
}
/**
* Update deleteTransactionId according to deleteTransactionMap.
*
* @param deleteTransactionMap Maps the containerId to latest delete
* transaction id for the container.
* @throws IOException
*/
public void updateDeleteTransactionId(Map<Long, Long> deleteTransactionMap)
throws IOException {
if (deleteTransactionMap == null) {
return;
}
lock.lock();
try {
BatchOperation batch = new BatchOperation();
for (Map.Entry<Long, Long> entry : deleteTransactionMap.entrySet()) {
long containerID = entry.getKey();
byte[] dbKey = Longs.toByteArray(containerID);
byte[] containerBytes = containerStore.get(dbKey);
if (containerBytes == null) {
throw new SCMException(
"Failed to increment number of deleted blocks for container "
+ containerID + ", reason : " + "container doesn't exist.",
SCMException.ResultCodes.FAILED_TO_FIND_CONTAINER);
}
ContainerInfo containerInfo = ContainerInfo.fromProtobuf(
HddsProtos.SCMContainerInfo.parseFrom(containerBytes));
containerInfo.updateDeleteTransactionId(entry.getValue());
batch.put(dbKey, containerInfo.getProtobuf().toByteArray());
}
containerStore.writeBatch(batch);
containerStateManager
.updateDeleteTransactionId(deleteTransactionMap);
} finally {
lock.unlock();
}
}
/**
* Returns the container State Manager.
*
* @return ContainerStateManager
*/
@Override
public ContainerStateManager getStateManager() {
return containerStateManager;
}
/**
* Return a container matching the attributes specified.
*
* @param sizeRequired - Space needed in the Container.
* @param owner - Owner of the container - A specific nameservice.
* @param type - Replication Type {StandAlone, Ratis}
* @param factor - Replication Factor {ONE, THREE}
* @param state - State of the Container-- {Open, Allocated etc.}
* @return ContainerInfo, null if there is no match found.
*/
public ContainerWithPipeline getMatchingContainerWithPipeline(
final long sizeRequired, String owner, ReplicationType type,
ReplicationFactor factor, LifeCycleState state) throws IOException {
ContainerInfo containerInfo = getStateManager()
.getMatchingContainer(sizeRequired, owner, type, factor, state);
if (containerInfo == null) {
return null;
}
Pipeline pipeline = pipelineSelector
.getPipeline(containerInfo.getPipelineID());
return new ContainerWithPipeline(containerInfo, pipeline);
}
/**
* Process container report from Datanode.
* <p>
* Processing follows a very simple logic for time being.
* <p>
* 1. Datanodes report the current State -- denoted by the datanodeState
* <p>
* 2. We are the older SCM state from the Database -- denoted by
* the knownState.
* <p>
* 3. We copy the usage etc. from currentState to newState and log that
* newState to the DB. This allows us SCM to bootup again and read the
* state of the world from the DB, and then reconcile the state from
* container reports, when they arrive.
*
* @param reports Container report
*/
@Override
public void processContainerReports(DatanodeDetails datanodeDetails,
ContainerReportsProto reports, boolean isRegisterCall)
throws IOException {
List<StorageContainerDatanodeProtocolProtos.ContainerInfo>
containerInfos = reports.getReportsList();
PendingDeleteStatusList pendingDeleteStatusList =
new PendingDeleteStatusList(datanodeDetails);
for (StorageContainerDatanodeProtocolProtos.ContainerInfo contInfo :
containerInfos) {
// Update replica info during registration process.
if (isRegisterCall) {
try {
getStateManager().addContainerReplica(ContainerID.
valueof(contInfo.getContainerID()), datanodeDetails);
} catch (Exception ex) {
// Continue to next one after logging the error.
LOG.error("Error while adding replica for containerId {}.",
contInfo.getContainerID(), ex);
}
}
byte[] dbKey = Longs.toByteArray(contInfo.getContainerID());
lock.lock();
try {
byte[] containerBytes = containerStore.get(dbKey);
if (containerBytes != null) {
HddsProtos.SCMContainerInfo knownState =
HddsProtos.SCMContainerInfo.PARSER.parseFrom(containerBytes);
if (knownState.getState() == LifeCycleState.CLOSING
&& contInfo.getState() == LifeCycleState.CLOSED) {
updateContainerState(contInfo.getContainerID(),
LifeCycleEvent.CLOSE);
//reread the container
knownState =
HddsProtos.SCMContainerInfo.PARSER
.parseFrom(containerStore.get(dbKey));
}
HddsProtos.SCMContainerInfo newState =
reconcileState(contInfo, knownState, datanodeDetails);
if (knownState.getDeleteTransactionId() > contInfo
.getDeleteTransactionId()) {
pendingDeleteStatusList
.addPendingDeleteStatus(contInfo.getDeleteTransactionId(),
knownState.getDeleteTransactionId(),
knownState.getContainerID());
}
// FIX ME: This can be optimized, we write twice to memory, where a
// single write would work well.
//
// We need to write this to DB again since the closed only write
// the updated State.
containerStore.put(dbKey, newState.toByteArray());
} else {
// Container not found in our container db.
LOG.error("Error while processing container report from datanode :" +
" {}, for container: {}, reason: container doesn't exist in" +
"container database.", datanodeDetails,
contInfo.getContainerID());
}
} finally {
lock.unlock();
}
}
if (pendingDeleteStatusList.getNumPendingDeletes() > 0) {
eventPublisher.fireEvent(SCMEvents.PENDING_DELETE_STATUS,
pendingDeleteStatusList);
}
}
/**
* Reconciles the state from Datanode with the state in SCM.
*
* @param datanodeState - State from the Datanode.
* @param knownState - State inside SCM.
* @param dnDetails
* @return new SCM State for this container.
*/
private HddsProtos.SCMContainerInfo reconcileState(
StorageContainerDatanodeProtocolProtos.ContainerInfo datanodeState,
SCMContainerInfo knownState, DatanodeDetails dnDetails) {
HddsProtos.SCMContainerInfo.Builder builder =
HddsProtos.SCMContainerInfo.newBuilder();
builder.setContainerID(knownState.getContainerID())
.setPipelineID(knownState.getPipelineID())
.setReplicationType(knownState.getReplicationType())
.setReplicationFactor(knownState.getReplicationFactor());
// TODO: If current state doesn't have this DN in list of DataNodes with
// replica then add it in list of replicas.
// If used size is greater than allocated size, we will be updating
// allocated size with used size. This update is done as a fallback
// mechanism in case SCM crashes without properly updating allocated
// size. Correct allocated value will be updated by
// ContainerStateManager during SCM shutdown.
long usedSize = datanodeState.getUsed();
long allocated = knownState.getAllocatedBytes() > usedSize ?
knownState.getAllocatedBytes() : usedSize;
builder.setAllocatedBytes(allocated)
.setUsedBytes(usedSize)
.setNumberOfKeys(datanodeState.getKeyCount())
.setState(knownState.getState())
.setStateEnterTime(knownState.getStateEnterTime())
.setContainerID(knownState.getContainerID())
.setDeleteTransactionId(knownState.getDeleteTransactionId());
if (knownState.getOwner() != null) {
builder.setOwner(knownState.getOwner());
}
return builder.build();
}
/**
* In Container is in closed state, if it is in closed, Deleting or Deleted
* State.
*
* @param info - ContainerInfo.
* @return true if is in open state, false otherwise
*/
private boolean shouldClose(ContainerInfo info) {
return info.getState() == HddsProtos.LifeCycleState.OPEN;
}
private boolean isClosed(ContainerInfo info) {
return info.getState() == HddsProtos.LifeCycleState.CLOSED;
}
/**
* Closes this stream and releases any system resources associated with it.
* If the stream is
* already closed then invoking this method has no effect.
* <p>
* <p>As noted in {@link AutoCloseable#close()}, cases where the close may
* fail require careful
* attention. It is strongly advised to relinquish the underlying resources
* and to internally
* <em>mark</em> the {@code Closeable} as closed, prior to throwing the
* {@code IOException}.
*
* @throws IOException if an I/O error occurs
*/
@Override
public void close() throws IOException {
if (containerLeaseManager != null) {
containerLeaseManager.shutdown();
}
if (containerStateManager != null) {
flushContainerInfo();
containerStateManager.close();
}
if (containerStore != null) {
containerStore.close();
}
if (pipelineSelector != null) {
pipelineSelector.shutdown();
}
}
/**
* Since allocatedBytes of a container is only in memory, stored in
* containerStateManager, when closing SCMContainerManager, we need to update
* this in the container store.
*
* @throws IOException on failure.
*/
@VisibleForTesting
public void flushContainerInfo() throws IOException {
List<ContainerInfo> containers = containerStateManager.getAllContainers();
List<Long> failedContainers = new ArrayList<>();
for (ContainerInfo info : containers) {
// even if some container updated failed, others can still proceed
try {
byte[] dbKey = Longs.toByteArray(info.getContainerID());
byte[] containerBytes = containerStore.get(dbKey);
// TODO : looks like when a container is deleted, the container is
// removed from containerStore but not containerStateManager, so it can
// return info of a deleted container. may revisit this in the future,
// for now, just skip a not-found container
if (containerBytes != null) {
containerStore.put(dbKey, info.getProtobuf().toByteArray());
} else {
LOG.debug("Container state manager has container {} but not found " +
"in container store, a deleted container?",
info.getContainerID());
}
} catch (IOException ioe) {
failedContainers.add(info.getContainerID());
}
}
if (!failedContainers.isEmpty()) {
throw new IOException("Error in flushing container info from container " +
"state manager: " + failedContainers);
}
}
@VisibleForTesting
public MetadataStore getContainerStore() {
return containerStore;
}
public PipelineSelector getPipelineSelector() {
return pipelineSelector;
}
}