blob: 1e6dd0e548b5a9d9158e866e93c8e95ef48e93c9 [file] [log] [blame]
/*
* Licensed to the Apache Software Foundation (ASF) under one or more
* contributor license agreements. See the NOTICE file distributed with this
* work for additional information regarding copyright ownership. The ASF
* licenses this file to you under the Apache License, Version 2.0 (the
* "License"); you may not use this file except in compliance with the License.
* You may obtain a copy of the License at
* <p>
* http://www.apache.org/licenses/LICENSE-2.0
* <p>
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS, WITHOUT
* WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. See the
* License for the specific language governing permissions and limitations under
* the License.
*/
package org.apache.hadoop.ozone.container.common;
import java.io.File;
import java.io.IOException;
import java.nio.ByteBuffer;
import java.util.UUID;
import java.util.ArrayList;
import java.util.List;
import java.util.Map;
import java.util.concurrent.TimeUnit;
import java.util.concurrent.TimeoutException;
import com.google.common.collect.Lists;
import org.apache.commons.io.FileUtils;
import org.apache.hadoop.hdds.client.BlockID;
import org.apache.hadoop.hdds.conf.ConfigurationSource;
import org.apache.hadoop.hdds.conf.MutableConfigurationSource;
import org.apache.hadoop.hdds.conf.OzoneConfiguration;
import org.apache.hadoop.hdds.protocol.datanode.proto.ContainerProtos;
import org.apache.hadoop.hdds.protocol.proto.StorageContainerDatanodeProtocolProtos;
import org.apache.hadoop.hdds.scm.ScmConfigKeys;
import org.apache.hadoop.hdds.utils.BackgroundService;
import org.apache.hadoop.hdds.utils.MetadataKeyFilters;
import org.apache.hadoop.hdds.utils.db.BatchOperation;
import org.apache.hadoop.hdds.utils.db.Table;
import org.apache.hadoop.hdds.utils.db.TableIterator;
import org.apache.hadoop.ozone.OzoneConsts;
import org.apache.hadoop.ozone.common.Checksum;
import org.apache.hadoop.ozone.common.ChunkBuffer;
import org.apache.hadoop.ozone.container.ContainerTestHelper;
import org.apache.hadoop.ozone.container.common.helpers.BlockData;
import org.apache.hadoop.ozone.container.common.helpers.ChunkInfo;
import org.apache.hadoop.ozone.container.common.helpers.ContainerMetrics;
import org.apache.hadoop.ozone.container.common.impl.ChunkLayOutVersion;
import org.apache.hadoop.ozone.container.common.impl.ContainerData;
import org.apache.hadoop.ozone.container.common.impl.ContainerSet;
import org.apache.hadoop.ozone.container.common.impl.TopNOrderedContainerDeletionChoosingPolicy;
import org.apache.hadoop.ozone.container.common.interfaces.Container;
import org.apache.hadoop.ozone.container.common.interfaces.ContainerDispatcher;
import org.apache.hadoop.ozone.container.common.transport.server.ratis.DispatcherContext;
import org.apache.hadoop.ozone.container.common.utils.ReferenceCountedDB;
import org.apache.hadoop.ozone.container.common.volume.MutableVolumeSet;
import org.apache.hadoop.ozone.container.common.volume.RoundRobinVolumeChoosingPolicy;
import org.apache.hadoop.ozone.container.common.volume.VolumeSet;
import org.apache.hadoop.ozone.container.keyvalue.KeyValueContainer;
import org.apache.hadoop.ozone.container.keyvalue.KeyValueContainerData;
import org.apache.hadoop.ozone.container.keyvalue.KeyValueHandler;
import org.apache.hadoop.ozone.container.keyvalue.helpers.BlockUtils;
import org.apache.hadoop.ozone.container.keyvalue.impl.FilePerBlockStrategy;
import org.apache.hadoop.ozone.container.keyvalue.impl.FilePerChunkStrategy;
import org.apache.hadoop.ozone.container.keyvalue.interfaces.ChunkManager;
import org.apache.hadoop.ozone.container.keyvalue.statemachine.background.BlockDeletingService;
import org.apache.hadoop.ozone.container.metadata.DatanodeStore;
import org.apache.hadoop.ozone.container.metadata.DatanodeStoreSchemaTwoImpl;
import org.apache.hadoop.ozone.container.ozoneimpl.OzoneContainer;
import org.apache.hadoop.ozone.container.testutils.BlockDeletingServiceTestImpl;
import org.apache.hadoop.test.GenericTestUtils;
import org.apache.hadoop.test.GenericTestUtils.LogCapturer;
import org.apache.hadoop.ozone.container.common.statemachine.DatanodeConfiguration;
import static java.util.stream.Collectors.toList;
import static org.apache.commons.lang3.RandomStringUtils.randomAlphanumeric;
import org.junit.AfterClass;
import org.junit.Assert;
import org.junit.BeforeClass;
import org.junit.Test;
import org.junit.runner.RunWith;
import org.junit.runners.Parameterized;
import static org.apache.hadoop.ozone.OzoneConfigKeys.OZONE_BLOCK_DELETING_CONTAINER_LIMIT_PER_INTERVAL;
import static org.apache.hadoop.ozone.OzoneConfigKeys.OZONE_BLOCK_DELETING_LIMIT_PER_CONTAINER;
import static org.apache.hadoop.ozone.OzoneConfigKeys.OZONE_BLOCK_DELETING_SERVICE_INTERVAL;
import static org.apache.hadoop.ozone.OzoneConsts.SCHEMA_VERSIONS;
import static org.apache.hadoop.ozone.OzoneConsts.SCHEMA_V1;
import static org.apache.hadoop.ozone.OzoneConsts.SCHEMA_V2;
import static org.apache.hadoop.ozone.container.common.impl.ChunkLayOutVersion.FILE_PER_BLOCK;
import static org.apache.hadoop.ozone.container.common.states.endpoint.VersionEndpointTask.LOG;
import static org.mockito.ArgumentMatchers.any;
import static org.mockito.Mockito.mock;
import static org.mockito.Mockito.when;
import static java.nio.charset.StandardCharsets.UTF_8;
/**
* Tests to test block deleting service.
*/
@RunWith(Parameterized.class)
public class TestBlockDeletingService {
private static File testRoot;
private static String scmId;
private static String clusterID;
private static String datanodeUuid;
private static MutableConfigurationSource conf;
private final ChunkLayOutVersion layout;
private final String schemaVersion;
private int blockLimitPerInterval;
private static VolumeSet volumeSet;
public TestBlockDeletingService(LayoutInfo layoutInfo) {
this.layout = layoutInfo.layout;
this.schemaVersion = layoutInfo.schemaVersion;
}
@Parameterized.Parameters
public static Iterable<Object[]> parameters() {
return LayoutInfo.layoutList.stream().map(each -> new Object[] {each})
.collect(toList());
}
public static class LayoutInfo {
private final String schemaVersion;
private final ChunkLayOutVersion layout;
public LayoutInfo(String schemaVersion, ChunkLayOutVersion layout) {
this.schemaVersion = schemaVersion;
this.layout = layout;
}
private static List<LayoutInfo> layoutList = new ArrayList<>();
static {
for (ChunkLayOutVersion ch : ChunkLayOutVersion.getAllVersions()) {
for (String sch : SCHEMA_VERSIONS) {
layoutList.add(new LayoutInfo(sch, ch));
}
}
}
}
@BeforeClass
public static void init() throws IOException {
testRoot = GenericTestUtils
.getTestDir(TestBlockDeletingService.class.getSimpleName());
if (testRoot.exists()) {
FileUtils.cleanDirectory(testRoot);
}
scmId = UUID.randomUUID().toString();
clusterID = UUID.randomUUID().toString();
conf = new OzoneConfiguration();
conf.set(ScmConfigKeys.HDDS_DATANODE_DIR_KEY, testRoot.getAbsolutePath());
datanodeUuid = UUID.randomUUID().toString();
volumeSet = new MutableVolumeSet(datanodeUuid, conf);
}
@AfterClass
public static void cleanup() throws IOException {
FileUtils.deleteDirectory(testRoot);
}
private static final DispatcherContext WRITE_STAGE =
new DispatcherContext.Builder()
.setStage(DispatcherContext.WriteChunkStage.WRITE_DATA).build();
private static final DispatcherContext COMMIT_STAGE =
new DispatcherContext.Builder()
.setStage(DispatcherContext.WriteChunkStage.COMMIT_DATA).build();
/**
* A helper method to create some blocks and put them under deletion
* state for testing. This method directly updates container.db and
* creates some fake chunk files for testing.
*/
private void createToDeleteBlocks(ContainerSet containerSet,
int numOfContainers,
int numOfBlocksPerContainer,
int numOfChunksPerBlock) throws IOException {
ChunkManager chunkManager;
if (layout == FILE_PER_BLOCK) {
chunkManager = new FilePerBlockStrategy(true, null);
} else {
chunkManager = new FilePerChunkStrategy(true, null);
}
byte[] arr = randomAlphanumeric(1048576).getBytes(UTF_8);
ChunkBuffer buffer = ChunkBuffer.wrap(ByteBuffer.wrap(arr));
int txnID = 0;
for (int x = 0; x < numOfContainers; x++) {
long containerID = ContainerTestHelper.getTestContainerID();
KeyValueContainerData data =
new KeyValueContainerData(containerID, layout,
ContainerTestHelper.CONTAINER_MAX_SIZE,
UUID.randomUUID().toString(), datanodeUuid);
data.closeContainer();
data.setSchemaVersion(schemaVersion);
KeyValueContainer container = new KeyValueContainer(data, conf);
container.create(volumeSet,
new RoundRobinVolumeChoosingPolicy(), scmId);
containerSet.addContainer(container);
data = (KeyValueContainerData) containerSet.getContainer(
containerID).getContainerData();
data.setSchemaVersion(schemaVersion);
if (schemaVersion.equals(SCHEMA_V1)) {
createPendingDeleteBlocksSchema1(numOfBlocksPerContainer, data,
containerID, numOfChunksPerBlock, buffer, chunkManager, container);
} else if (schemaVersion.equals(SCHEMA_V2)) {
createPendingDeleteBlocksSchema2(numOfBlocksPerContainer, txnID,
containerID, numOfChunksPerBlock, buffer, chunkManager, container,
data);
} else {
throw new UnsupportedOperationException(
"Only schema version 1 and schema version 2 are "
+ "supported.");
}
}
}
@SuppressWarnings("checkstyle:parameternumber")
private void createPendingDeleteBlocksSchema1(int numOfBlocksPerContainer,
KeyValueContainerData data, long containerID, int numOfChunksPerBlock,
ChunkBuffer buffer, ChunkManager chunkManager,
KeyValueContainer container) {
BlockID blockID = null;
try (ReferenceCountedDB metadata = BlockUtils.getDB(data, conf)) {
for (int j = 0; j < numOfBlocksPerContainer; j++) {
blockID = ContainerTestHelper.getTestBlockID(containerID);
String deleteStateName =
OzoneConsts.DELETING_KEY_PREFIX + blockID.getLocalID();
BlockData kd = new BlockData(blockID);
List<ContainerProtos.ChunkInfo> chunks = Lists.newArrayList();
putChunksInBlock(numOfChunksPerBlock, j, chunks, buffer, chunkManager,
container, blockID);
kd.setChunks(chunks);
metadata.getStore().getBlockDataTable().put(deleteStateName, kd);
container.getContainerData().incrPendingDeletionBlocks(1);
}
updateMetaData(data, container, numOfBlocksPerContainer,
numOfChunksPerBlock);
} catch (IOException exception) {
LOG.info("Exception " + exception);
LOG.warn("Failed to put block: " + blockID + " in BlockDataTable.");
}
}
@SuppressWarnings("checkstyle:parameternumber")
private void createPendingDeleteBlocksSchema2(int numOfBlocksPerContainer,
int txnID, long containerID, int numOfChunksPerBlock, ChunkBuffer buffer,
ChunkManager chunkManager, KeyValueContainer container,
KeyValueContainerData data) {
List<Long> containerBlocks = new ArrayList<>();
for (int i = 0; i < numOfBlocksPerContainer; i++) {
txnID = txnID + 1;
BlockID blockID = ContainerTestHelper.getTestBlockID(containerID);
BlockData kd = new BlockData(blockID);
List<ContainerProtos.ChunkInfo> chunks = Lists.newArrayList();
putChunksInBlock(numOfChunksPerBlock, i, chunks, buffer, chunkManager,
container, blockID);
kd.setChunks(chunks);
String bID = null;
try (ReferenceCountedDB metadata = BlockUtils.getDB(data, conf)) {
bID = blockID.getLocalID() + "";
metadata.getStore().getBlockDataTable().put(bID, kd);
} catch (IOException exception) {
LOG.info("Exception = " + exception);
LOG.warn("Failed to put block: " + bID + " in BlockDataTable.");
}
container.getContainerData().incrPendingDeletionBlocks(1);
// Below we are creating one transaction per block just for
// testing purpose
containerBlocks.add(blockID.getLocalID());
createTxn(data, containerBlocks, txnID, containerID);
containerBlocks.clear();
}
updateMetaData(data, container, numOfBlocksPerContainer,
numOfChunksPerBlock);
}
private void createTxn(KeyValueContainerData data, List<Long> containerBlocks,
int txnID, long containerID) {
try (ReferenceCountedDB metadata = BlockUtils.getDB(data, conf)) {
StorageContainerDatanodeProtocolProtos.DeletedBlocksTransaction dtx =
StorageContainerDatanodeProtocolProtos.DeletedBlocksTransaction
.newBuilder().setTxID(txnID).setContainerID(containerID)
.addAllLocalID(containerBlocks).setCount(0).build();
try (BatchOperation batch = metadata.getStore().getBatchHandler()
.initBatchOperation()) {
DatanodeStore ds = metadata.getStore();
DatanodeStoreSchemaTwoImpl dnStoreTwoImpl =
(DatanodeStoreSchemaTwoImpl) ds;
dnStoreTwoImpl.getDeleteTransactionTable()
.putWithBatch(batch, (long) txnID, dtx);
metadata.getStore().getBatchHandler().commitBatchOperation(batch);
}
} catch (IOException exception) {
LOG.warn("Transaction creation was not successful for txnID: " + txnID
+ " consisting of " + containerBlocks.size() + " blocks.");
}
}
private void putChunksInBlock(int numOfChunksPerBlock, int i,
List<ContainerProtos.ChunkInfo> chunks, ChunkBuffer buffer,
ChunkManager chunkManager, KeyValueContainer container, BlockID blockID) {
long chunkLength = 100;
try {
for (int k = 0; k < numOfChunksPerBlock; k++) {
final String chunkName = String.format("block.%d.chunk.%d", i, k);
final long offset = k * chunkLength;
ContainerProtos.ChunkInfo info =
ContainerProtos.ChunkInfo.newBuilder().setChunkName(chunkName)
.setLen(chunkLength).setOffset(offset)
.setChecksumData(Checksum.getNoChecksumDataProto()).build();
chunks.add(info);
ChunkInfo chunkInfo = new ChunkInfo(chunkName, offset, chunkLength);
ChunkBuffer chunkData = buffer.duplicate(0, (int) chunkLength);
chunkManager
.writeChunk(container, blockID, chunkInfo, chunkData, WRITE_STAGE);
chunkManager
.writeChunk(container, blockID, chunkInfo, chunkData, COMMIT_STAGE);
}
} catch (IOException ex) {
LOG.warn("Putting chunks in blocks was not successful for BlockID: "
+ blockID);
}
}
private void updateMetaData(KeyValueContainerData data,
KeyValueContainer container, int numOfBlocksPerContainer,
int numOfChunksPerBlock) {
long chunkLength = 100;
try (ReferenceCountedDB metadata = BlockUtils.getDB(data, conf)) {
container.getContainerData().setKeyCount(numOfBlocksPerContainer);
// Set block count, bytes used and pending delete block count.
metadata.getStore().getMetadataTable()
.put(OzoneConsts.BLOCK_COUNT, (long) numOfBlocksPerContainer);
metadata.getStore().getMetadataTable()
.put(OzoneConsts.CONTAINER_BYTES_USED,
chunkLength * numOfChunksPerBlock * numOfBlocksPerContainer);
metadata.getStore().getMetadataTable()
.put(OzoneConsts.PENDING_DELETE_BLOCK_COUNT,
(long) numOfBlocksPerContainer);
} catch (IOException exception) {
LOG.warn("Meta Data update was not successful for container: "+container);
}
}
/**
* Run service runDeletingTasks and wait for it's been processed.
*/
private void deleteAndWait(BlockDeletingServiceTestImpl service,
int timesOfProcessed) throws TimeoutException, InterruptedException {
service.runDeletingTasks();
GenericTestUtils.waitFor(()
-> service.getTimesOfProcessed() == timesOfProcessed, 100, 3000);
}
/**
* Get under deletion blocks count from DB,
* note this info is parsed from container.db.
*/
private int getUnderDeletionBlocksCount(ReferenceCountedDB meta,
KeyValueContainerData data) throws IOException {
if (data.getSchemaVersion().equals(SCHEMA_V1)) {
return meta.getStore().getBlockDataTable()
.getRangeKVs(null, 100, MetadataKeyFilters.getDeletingKeyFilter())
.size();
} else if (data.getSchemaVersion().equals(SCHEMA_V2)) {
int pendingBlocks = 0;
DatanodeStore ds = meta.getStore();
DatanodeStoreSchemaTwoImpl dnStoreTwoImpl =
(DatanodeStoreSchemaTwoImpl) ds;
try (
TableIterator<Long, ? extends Table.KeyValue<Long,
StorageContainerDatanodeProtocolProtos.DeletedBlocksTransaction>>
iter = dnStoreTwoImpl.getDeleteTransactionTable().iterator()) {
while (iter.hasNext()) {
StorageContainerDatanodeProtocolProtos.DeletedBlocksTransaction
delTx = iter.next().getValue();
pendingBlocks += delTx.getLocalIDList().size();
}
}
return pendingBlocks;
} else {
throw new UnsupportedOperationException(
"Only schema version 1 and schema version 2 are supported.");
}
}
@Test
public void testBlockDeletion() throws Exception {
DatanodeConfiguration dnConf = conf.getObject(DatanodeConfiguration.class);
dnConf.setBlockDeletionLimit(3);
this.blockLimitPerInterval = dnConf.getBlockDeletionLimit();
conf.setFromObject(dnConf);
ContainerSet containerSet = new ContainerSet();
createToDeleteBlocks(containerSet, 1, 3, 1);
ContainerMetrics metrics = ContainerMetrics.create(conf);
KeyValueHandler keyValueHandler =
new KeyValueHandler(conf, datanodeUuid, containerSet, volumeSet,
metrics, c -> {
});
BlockDeletingServiceTestImpl svc =
getBlockDeletingService(containerSet, conf, keyValueHandler);
svc.start();
GenericTestUtils.waitFor(svc::isStarted, 100, 3000);
// Ensure 1 container was created
List<ContainerData> containerData = Lists.newArrayList();
containerSet.listContainer(0L, 1, containerData);
KeyValueContainerData data = (KeyValueContainerData) containerData.get(0);
Assert.assertEquals(1, containerData.size());
try(ReferenceCountedDB meta = BlockUtils.getDB(
(KeyValueContainerData) containerData.get(0), conf)) {
Map<Long, Container<?>> containerMap = containerSet.getContainerMapCopy();
// NOTE: this test assumes that all the container is KetValueContainer and
// have DeleteTransactionId in KetValueContainerData. If other
// types is going to be added, this test should be checked.
long transactionId = ((KeyValueContainerData) containerMap
.get(containerData.get(0).getContainerID()).getContainerData())
.getDeleteTransactionId();
long containerSpace = containerData.get(0).getBytesUsed();
// Number of deleted blocks in container should be equal to 0 before
// block delete
Assert.assertEquals(0, transactionId);
// Ensure there are 3 blocks under deletion and 0 deleted blocks
Assert.assertEquals(3, getUnderDeletionBlocksCount(meta, data));
Assert.assertEquals(3, meta.getStore().getMetadataTable()
.get(OzoneConsts.PENDING_DELETE_BLOCK_COUNT).longValue());
// Container contains 3 blocks. So, space used by the container
// should be greater than zero.
Assert.assertTrue(containerSpace > 0);
// An interval will delete 1 * 2 blocks
deleteAndWait(svc, 1);
// After first interval 2 blocks will be deleted. Hence, current space
// used by the container should be less than the space used by the
// container initially(before running deletion services).
Assert.assertTrue(containerData.get(0).getBytesUsed() < containerSpace);
deleteAndWait(svc, 2);
// After deletion of all 3 blocks, space used by the containers
// should be zero.
containerSpace = containerData.get(0).getBytesUsed();
Assert.assertTrue(containerSpace == 0);
// Check finally DB counters.
// Not checking bytes used, as handler is a mock call.
Assert.assertEquals(0, meta.getStore().getMetadataTable()
.get(OzoneConsts.PENDING_DELETE_BLOCK_COUNT).longValue());
Assert.assertEquals(0,
meta.getStore().getMetadataTable().get(OzoneConsts.BLOCK_COUNT)
.longValue());
}
svc.shutdown();
}
@Test
@SuppressWarnings("java:S2699") // waitFor => assertion with timeout
public void testShutdownService() throws Exception {
conf.setTimeDuration(OZONE_BLOCK_DELETING_SERVICE_INTERVAL, 500,
TimeUnit.MILLISECONDS);
conf.setInt(OZONE_BLOCK_DELETING_CONTAINER_LIMIT_PER_INTERVAL, 10);
conf.setInt(OZONE_BLOCK_DELETING_LIMIT_PER_CONTAINER, 10);
ContainerSet containerSet = new ContainerSet();
// Create 1 container with 100 blocks
createToDeleteBlocks(containerSet, 1, 100, 1);
ContainerMetrics metrics = ContainerMetrics.create(conf);
KeyValueHandler keyValueHandler =
new KeyValueHandler(conf, datanodeUuid, containerSet, volumeSet,
metrics, c -> {
});
BlockDeletingServiceTestImpl service =
getBlockDeletingService(containerSet, conf, keyValueHandler);
service.start();
GenericTestUtils.waitFor(service::isStarted, 100, 3000);
// Run some deleting tasks and verify there are threads running
service.runDeletingTasks();
GenericTestUtils.waitFor(() -> service.getThreadCount() > 0, 100, 1000);
// Shutdown service and verify all threads are stopped
service.shutdown();
GenericTestUtils.waitFor(() -> service.getThreadCount() == 0, 100, 1000);
}
@Test
public void testBlockDeletionTimeout() throws Exception {
DatanodeConfiguration dnConf = conf.getObject(DatanodeConfiguration.class);
dnConf.setBlockDeletionLimit(3);
blockLimitPerInterval = dnConf.getBlockDeletionLimit();
conf.setFromObject(dnConf);
ContainerSet containerSet = new ContainerSet();
createToDeleteBlocks(containerSet, 1, 3, 1);
ContainerMetrics metrics = ContainerMetrics.create(conf);
KeyValueHandler keyValueHandler =
new KeyValueHandler(conf, datanodeUuid, containerSet, volumeSet,
metrics, c -> {
});
// set timeout value as 1ns to trigger timeout behavior
long timeout = 1;
OzoneContainer ozoneContainer =
mockDependencies(containerSet, keyValueHandler);
BlockDeletingService svc = new BlockDeletingService(ozoneContainer,
TimeUnit.MILLISECONDS.toNanos(1000), timeout, TimeUnit.NANOSECONDS,
conf);
svc.start();
LogCapturer log = LogCapturer.captureLogs(BackgroundService.LOG);
GenericTestUtils.waitFor(() -> {
if (log.getOutput().contains("Background task execution took")) {
log.stopCapturing();
return true;
}
return false;
}, 100, 1000);
log.stopCapturing();
svc.shutdown();
// test for normal case that doesn't have timeout limitation
createToDeleteBlocks(containerSet, 1, 3, 1);
timeout = 0;
svc = new BlockDeletingService(ozoneContainer,
TimeUnit.MILLISECONDS.toNanos(1000), timeout, TimeUnit.MILLISECONDS,
conf);
svc.start();
// get container meta data
KeyValueContainer container =
(KeyValueContainer) containerSet.getContainerIterator().next();
KeyValueContainerData data = container.getContainerData();
try (ReferenceCountedDB meta = BlockUtils.getDB(data, conf)) {
LogCapturer newLog = LogCapturer.captureLogs(BackgroundService.LOG);
GenericTestUtils.waitFor(() -> {
try {
return getUnderDeletionBlocksCount(meta, data) == 0;
} catch (IOException ignored) {
}
return false;
}, 100, 1000);
newLog.stopCapturing();
// The block deleting successfully and shouldn't catch timed
// out warning log.
Assert.assertFalse(newLog.getOutput().contains(
"Background task executes timed out, retrying in next interval"));
}
svc.shutdown();
}
private BlockDeletingServiceTestImpl getBlockDeletingService(
ContainerSet containerSet, ConfigurationSource config,
KeyValueHandler keyValueHandler) {
OzoneContainer ozoneContainer =
mockDependencies(containerSet, keyValueHandler);
return new BlockDeletingServiceTestImpl(ozoneContainer, 1000, config);
}
private OzoneContainer mockDependencies(ContainerSet containerSet,
KeyValueHandler keyValueHandler) {
OzoneContainer ozoneContainer = mock(OzoneContainer.class);
when(ozoneContainer.getContainerSet()).thenReturn(containerSet);
when(ozoneContainer.getWriteChannel()).thenReturn(null);
ContainerDispatcher dispatcher = mock(ContainerDispatcher.class);
when(ozoneContainer.getDispatcher()).thenReturn(dispatcher);
when(dispatcher.getHandler(any())).thenReturn(keyValueHandler);
return ozoneContainer;
}
@Test(timeout = 30000)
public void testContainerThrottle() throws Exception {
// Properties :
// - Number of containers : 2
// - Number of blocks per container : 1
// - Number of chunks per block : 10
// - Container limit per interval : 1
// - Block limit per container : 1
//
// Each time only 1 container can be processed, so each time
// 1 block from 1 container can be deleted.
// Process 1 container per interval
conf.set(
ScmConfigKeys.OZONE_SCM_KEY_VALUE_CONTAINER_DELETION_CHOOSING_POLICY,
TopNOrderedContainerDeletionChoosingPolicy.class.getName());
DatanodeConfiguration dnConf = conf.getObject(DatanodeConfiguration.class);
dnConf.setBlockDeletionLimit(1);
this.blockLimitPerInterval = dnConf.getBlockDeletionLimit();
conf.setFromObject(dnConf);
ContainerSet containerSet = new ContainerSet();
int containerCount = 2;
int chunksPerBlock = 10;
int blocksPerContainer = 1;
createToDeleteBlocks(containerSet, containerCount, blocksPerContainer,
chunksPerBlock);
ContainerMetrics metrics = ContainerMetrics.create(conf);
KeyValueHandler keyValueHandler =
new KeyValueHandler(conf, datanodeUuid, containerSet, volumeSet,
metrics, c -> {
});
BlockDeletingServiceTestImpl service =
getBlockDeletingService(containerSet, conf, keyValueHandler);
service.start();
List<ContainerData> containerData = Lists.newArrayList();
containerSet.listContainer(0L, containerCount, containerData);
try {
GenericTestUtils.waitFor(service::isStarted, 100, 3000);
// Deleting one of the two containers and its single block.
// Hence, space used by the container of whose block has been
// deleted should be zero.
deleteAndWait(service, 1);
Assert.assertTrue((containerData.get(0).getBytesUsed() == 0)
|| containerData.get(1).getBytesUsed() == 0);
Assert.assertFalse((containerData.get(0).getBytesUsed() == 0) && (
containerData.get(1).getBytesUsed() == 0));
// Deleting the second container. Hence, space used by both the
// containers should be zero.
deleteAndWait(service, 2);
Assert.assertTrue((containerData.get(0).getBytesUsed() == 0) && (
containerData.get(1).getBytesUsed() == 0));
} finally {
service.shutdown();
}
}
public long currentBlockSpace(List<ContainerData> containerData,
int totalContainers) {
long totalSpaceUsed = 0;
for (int i = 0; i < totalContainers; i++) {
totalSpaceUsed += containerData.get(i).getBytesUsed();
}
return totalSpaceUsed;
}
@Test(timeout = 30000)
public void testBlockThrottle() throws Exception {
// Properties :
// - Number of containers : 5
// - Number of blocks per container : 3
// - Number of chunks per block : 1
// - Container limit per interval : 10
// - Block limit per container : 2
//
// Each time containers can be all scanned, but only 10 blocks
// can be actually deleted. So it requires 2 waves
// to cleanup all the 15 blocks.
DatanodeConfiguration dnConf = conf.getObject(DatanodeConfiguration.class);
dnConf.setBlockDeletionLimit(10);
this.blockLimitPerInterval = dnConf.getBlockDeletionLimit();
conf.setFromObject(dnConf);
ContainerSet containerSet = new ContainerSet();
ContainerMetrics metrics = ContainerMetrics.create(conf);
KeyValueHandler keyValueHandler =
new KeyValueHandler(conf, datanodeUuid, containerSet, volumeSet,
metrics, c -> {
});
int containerCount = 5;
int blocksPerContainer = 3;
createToDeleteBlocks(containerSet, containerCount,
blocksPerContainer, 1);
BlockDeletingServiceTestImpl service =
getBlockDeletingService(containerSet, conf, keyValueHandler);
service.start();
List<ContainerData> containerData = Lists.newArrayList();
containerSet.listContainer(0L, containerCount, containerData);
long blockSpace = containerData.get(0).getBytesUsed() / blocksPerContainer;
long totalContainerSpace =
containerCount * containerData.get(0).getBytesUsed();
try {
GenericTestUtils.waitFor(service::isStarted, 100, 3000);
// Total blocks = 3 * 5 = 15
// blockLimitPerInterval = 10
// each interval will at most runDeletingTasks = 10 blocks
// but as per of deletion policy (random/topNorder), it will fetch all 3
// blocks from first 3 containers and 1 block from last container.
// C1 - 3 BLOCKS, C2 - 3 BLOCKS, C3 - 3 BLOCKS, C4 - 1 BLOCK
// Deleted space of 10 blocks should be equal to (initial total space
// of container - current total space of container).
deleteAndWait(service, 1);
Assert.assertEquals(blockLimitPerInterval * blockSpace,
(totalContainerSpace - currentBlockSpace(containerData,
containerCount)));
// There is only 5 blocks left to runDeletingTasks
// (Deleted space of previous 10 blocks + these left 5 blocks) should
// be equal to (initial total space of container
// - current total space of container(it will be zero as all blocks
// in all the containers are deleted)).
deleteAndWait(service, 2);
Assert.assertEquals(blocksPerContainer * containerCount * blockSpace,
(totalContainerSpace - currentBlockSpace(containerData,
containerCount)));
} finally {
service.shutdown();
}
}
}