blob: fae21b4fed7391c010493404a5872d570598c0d5 [file] [log] [blame]
/**
* Licensed to the Apache Software Foundation (ASF) under one or more
* contributor license agreements. See the NOTICE file distributed with this
* work for additional information regarding copyright ownership. The ASF
* licenses this file to you under the Apache License, Version 2.0 (the
* "License"); you may not use this file except in compliance with the License.
* You may obtain a copy of the License at
* <p>
* http://www.apache.org/licenses/LICENSE-2.0
* <p>
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS, WITHOUT
* WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. See the
* License for the specific language governing permissions and limitations under
* the License.
*/
package org.apache.hadoop.hdds.scm.block;
import javax.management.ObjectName;
import java.io.IOException;
import java.time.Duration;
import java.util.ArrayList;
import java.util.HashMap;
import java.util.List;
import java.util.Map;
import java.util.Objects;
import java.util.concurrent.TimeUnit;
import org.apache.hadoop.hdds.client.BlockID;
import org.apache.hadoop.hdds.client.ContainerBlockID;
import org.apache.hadoop.hdds.conf.ConfigurationSource;
import org.apache.hadoop.hdds.conf.StorageUnit;
import org.apache.hadoop.hdds.protocol.proto.HddsProtos.ReplicationFactor;
import org.apache.hadoop.hdds.protocol.proto.HddsProtos.ReplicationType;
import org.apache.hadoop.hdds.protocol.proto.HddsProtos.ScmOps;
import org.apache.hadoop.hdds.scm.PipelineRequestInformation;
import org.apache.hadoop.hdds.scm.ScmConfigKeys;
import org.apache.hadoop.hdds.scm.ScmUtils;
import org.apache.hadoop.hdds.scm.PipelineChoosePolicy;
import org.apache.hadoop.hdds.scm.ScmConfig;
import org.apache.hadoop.hdds.scm.container.ContainerInfo;
import org.apache.hadoop.hdds.scm.container.ContainerManager;
import org.apache.hadoop.hdds.scm.container.common.helpers.AllocatedBlock;
import org.apache.hadoop.hdds.scm.container.common.helpers.ExcludeList;
import org.apache.hadoop.hdds.scm.exceptions.SCMException;
import org.apache.hadoop.hdds.scm.pipeline.Pipeline;
import org.apache.hadoop.hdds.scm.pipeline.PipelineManager;
import org.apache.hadoop.hdds.scm.pipeline.PipelineNotFoundException;
import org.apache.hadoop.hdds.scm.safemode.SCMSafeModeManager.SafeModeStatus;
import org.apache.hadoop.hdds.scm.safemode.SafeModePrecheck;
import org.apache.hadoop.hdds.scm.server.StorageContainerManager;
import org.apache.hadoop.hdds.server.events.EventPublisher;
import org.apache.hadoop.hdds.utils.UniqueId;
import org.apache.hadoop.metrics2.util.MBeans;
import org.apache.hadoop.ozone.common.BlockGroup;
import org.apache.hadoop.util.StringUtils;
import static org.apache.hadoop.hdds.scm.exceptions.SCMException.ResultCodes.INVALID_BLOCK_SIZE;
import static org.apache.hadoop.ozone.OzoneConfigKeys.OZONE_BLOCK_DELETING_SERVICE_TIMEOUT;
import static org.apache.hadoop.ozone.OzoneConfigKeys.OZONE_BLOCK_DELETING_SERVICE_TIMEOUT_DEFAULT;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
/** Block Manager manages the block access for SCM. */
public class BlockManagerImpl implements BlockManager, BlockmanagerMXBean {
private static final Logger LOG =
LoggerFactory.getLogger(BlockManagerImpl.class);
// TODO : FIX ME : Hard coding the owner.
// Currently only user of the block service is Ozone, CBlock manages blocks
// by itself and does not rely on the Block service offered by SCM.
private final PipelineManager pipelineManager;
private final ContainerManager containerManager;
private final long containerSize;
private final DeletedBlockLog deletedBlockLog;
private final SCMBlockDeletingService blockDeletingService;
private ObjectName mxBean;
private SafeModePrecheck safeModePrecheck;
private PipelineChoosePolicy pipelineChoosePolicy;
/**
* Constructor.
*
* @param conf - configuration.
* @param scm
* @throws IOException
*/
public BlockManagerImpl(final ConfigurationSource conf,
final StorageContainerManager scm) {
Objects.requireNonNull(scm, "SCM cannot be null");
this.pipelineManager = scm.getPipelineManager();
this.containerManager = scm.getContainerManager();
this.pipelineChoosePolicy = scm.getPipelineChoosePolicy();
this.containerSize = (long)conf.getStorageSize(
ScmConfigKeys.OZONE_SCM_CONTAINER_SIZE,
ScmConfigKeys.OZONE_SCM_CONTAINER_SIZE_DEFAULT,
StorageUnit.BYTES);
mxBean = MBeans.register("BlockManager", "BlockManagerImpl", this);
// SCM block deleting transaction log and deleting service.
deletedBlockLog = new DeletedBlockLogImpl(conf, scm.getContainerManager(),
scm.getScmMetadataStore());
Duration svcInterval = conf.getObject(
ScmConfig.class).getBlockDeletionInterval();
long serviceTimeout =
conf.getTimeDuration(
OZONE_BLOCK_DELETING_SERVICE_TIMEOUT,
OZONE_BLOCK_DELETING_SERVICE_TIMEOUT_DEFAULT,
TimeUnit.MILLISECONDS);
blockDeletingService =
new SCMBlockDeletingService(deletedBlockLog, containerManager,
scm.getScmNodeManager(), scm.getEventQueue(), svcInterval,
serviceTimeout, conf);
safeModePrecheck = new SafeModePrecheck(conf);
}
/**
* Start block manager services.
*
* @throws IOException
*/
public void start() throws IOException {
this.blockDeletingService.start();
}
/**
* Shutdown block manager services.
*
* @throws IOException
*/
public void stop() throws IOException {
this.blockDeletingService.shutdown();
this.close();
}
/**
* Allocates a block in a container and returns that info.
*
* @param size - Block Size
* @param type Replication Type
* @param factor - Replication Factor
* @param excludeList List of datanodes/containers to exclude during block
* allocation.
* @return Allocated block
* @throws IOException on failure.
*/
@Override
public AllocatedBlock allocateBlock(final long size, ReplicationType type,
ReplicationFactor factor, String owner, ExcludeList excludeList)
throws IOException {
if (LOG.isTraceEnabled()) {
LOG.trace("Size : {} , type : {}, factor : {} ", size, type, factor);
}
ScmUtils.preCheck(ScmOps.allocateBlock, safeModePrecheck);
if (size < 0 || size > containerSize) {
LOG.warn("Invalid block size requested : {}", size);
throw new SCMException("Unsupported block size: " + size,
INVALID_BLOCK_SIZE);
}
/*
Here is the high level logic.
1. We try to find pipelines in open state.
2. If there are no pipelines in OPEN state, then we try to create one.
3. We allocate a block from the available containers in the selected
pipeline.
TODO : #CLUTIL Support random picking of two containers from the list.
So we can use different kind of policies.
*/
ContainerInfo containerInfo;
while (true) {
List<Pipeline> availablePipelines =
pipelineManager
.getPipelines(type, factor, Pipeline.PipelineState.OPEN,
excludeList.getDatanodes(), excludeList.getPipelineIds());
Pipeline pipeline = null;
if (availablePipelines.size() == 0 && !excludeList.isEmpty()) {
// if no pipelines can be found, try finding pipeline without
// exclusion
availablePipelines = pipelineManager
.getPipelines(type, factor, Pipeline.PipelineState.OPEN);
}
if (availablePipelines.size() == 0) {
try {
// TODO: #CLUTIL Remove creation logic when all replication types and
// factors are handled by pipeline creator
pipeline = pipelineManager.createPipeline(type, factor);
// wait until pipeline is ready
pipelineManager.waitPipelineReady(pipeline.getId(), 0);
} catch (SCMException se) {
LOG.warn("Pipeline creation failed for type:{} factor:{}. " +
"Datanodes may be used up.", type, factor, se);
break;
} catch (IOException e) {
LOG.warn("Pipeline creation failed for type:{} factor:{}. Retrying " +
"get pipelines call once.", type, factor, e);
availablePipelines = pipelineManager
.getPipelines(type, factor, Pipeline.PipelineState.OPEN,
excludeList.getDatanodes(), excludeList.getPipelineIds());
if (availablePipelines.size() == 0 && !excludeList.isEmpty()) {
// if no pipelines can be found, try finding pipeline without
// exclusion
availablePipelines = pipelineManager
.getPipelines(type, factor, Pipeline.PipelineState.OPEN);
}
if (availablePipelines.size() == 0) {
LOG.info("Could not find available pipeline of type:{} and " +
"factor:{} even after retrying", type, factor);
break;
}
}
}
if (null == pipeline) {
PipelineRequestInformation pri =
PipelineRequestInformation.Builder.getBuilder()
.setSize(size)
.build();
pipeline = pipelineChoosePolicy.choosePipeline(
availablePipelines, pri);
}
// look for OPEN containers that match the criteria.
containerInfo = containerManager.getMatchingContainer(size, owner,
pipeline, excludeList.getContainerIds());
if (containerInfo != null) {
return newBlock(containerInfo);
}
}
// we have tried all strategies we know and but somehow we are not able
// to get a container for this block. Log that info and return a null.
LOG.error(
"Unable to allocate a block for the size: {}, type: {}, factor: {}",
size, type, factor);
return null;
}
/**
* newBlock - returns a new block assigned to a container.
*
* @param containerInfo - Container Info.
* @return AllocatedBlock
*/
private AllocatedBlock newBlock(ContainerInfo containerInfo) {
try {
final Pipeline pipeline = pipelineManager
.getPipeline(containerInfo.getPipelineID());
// TODO : Revisit this local ID allocation when HA is added.
long localID = UniqueId.next();
long containerID = containerInfo.getContainerID();
AllocatedBlock.Builder abb = new AllocatedBlock.Builder()
.setContainerBlockID(new ContainerBlockID(containerID, localID))
.setPipeline(pipeline);
if (LOG.isTraceEnabled()) {
LOG.trace("New block allocated : {} Container ID: {}", localID,
containerID);
}
pipelineManager.incNumBlocksAllocatedMetric(pipeline.getId());
return abb.build();
} catch (PipelineNotFoundException ex) {
LOG.error("Pipeline Machine count is zero.", ex);
return null;
}
}
/**
* Deletes a list of blocks in an atomic operation. Internally, SCM writes
* these blocks into a
* {@link DeletedBlockLog} and deletes them from SCM DB. If this is
* successful, given blocks are
* entering pending deletion state and becomes invisible from SCM namespace.
*
* @param keyBlocksInfoList . This is the list of BlockGroup which contains
* groupID of keys and list of BlockIDs associated with them.
* @throws IOException if exception happens, non of the blocks is deleted.
*/
@Override
public void deleteBlocks(List<BlockGroup> keyBlocksInfoList)
throws IOException {
ScmUtils.preCheck(ScmOps.deleteBlock, safeModePrecheck);
Map<Long, List<Long>> containerBlocks = new HashMap<>();
// TODO: track the block size info so that we can reclaim the container
// TODO: used space when the block is deleted.
for (BlockGroup bg : keyBlocksInfoList) {
if (LOG.isDebugEnabled()) {
LOG.debug("Deleting blocks {}",
StringUtils.join(",", bg.getBlockIDList()));
}
for (BlockID block : bg.getBlockIDList()) {
long containerID = block.getContainerID();
if (containerBlocks.containsKey(containerID)) {
containerBlocks.get(containerID).add(block.getLocalID());
} else {
List<Long> item = new ArrayList<>();
item.add(block.getLocalID());
containerBlocks.put(containerID, item);
}
}
}
try {
deletedBlockLog.addTransactions(containerBlocks);
} catch (IOException e) {
throw new IOException("Skip writing the deleted blocks info to"
+ " the delLog because addTransaction fails. " + keyBlocksInfoList
.size() + "Keys skipped", e);
}
// TODO: Container report handling of the deleted blocks:
// Remove tombstone and update open container usage.
// We will revisit this when the closed container replication is done.
}
@Override
public DeletedBlockLog getDeletedBlockLog() {
return this.deletedBlockLog;
}
/**
* Close the resources for BlockManager.
*
* @throws IOException
*/
@Override
public void close() throws IOException {
if (deletedBlockLog != null) {
deletedBlockLog.close();
}
blockDeletingService.shutdown();
if (mxBean != null) {
MBeans.unregister(mxBean);
mxBean = null;
}
}
@Override
public int getOpenContainersNo() {
return 0;
// TODO : FIX ME : The open container being a single number does not make
// sense.
// We have to get open containers by Replication Type and Replication
// factor. Hence returning 0 for now.
// containers.get(HddsProtos.LifeCycleState.OPEN).size();
}
@Override
public SCMBlockDeletingService getSCMBlockDeletingService() {
return this.blockDeletingService;
}
/**
* Returns status of scm safe mode determined by SAFE_MODE_STATUS event.
* */
public boolean isScmInSafeMode() {
return this.safeModePrecheck.isInSafeMode();
}
/**
* Get class logger.
* */
public static Logger getLogger() {
return LOG;
}
@Override
public void onMessage(SafeModeStatus status,
EventPublisher publisher) {
this.safeModePrecheck.setInSafeMode(status.isInSafeMode());
}
/**
* This class uses system current time milliseconds to generate unique id.
*/
}