blob: b4c1ae57e348c31ab74a11d85fac2533b87be754 [file] [log] [blame]
/*
* Licensed to the Apache Software Foundation (ASF) under one or more
* contributor license agreements. See the NOTICE file distributed with this
* work for additional information regarding copyright ownership. The ASF
* licenses this file to you under the Apache License, Version 2.0 (the
* "License"); you may not use this file except in compliance with the License.
* You may obtain a copy of the License at
* <p>
* http://www.apache.org/licenses/LICENSE-2.0
* <p>
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS, WITHOUT
* WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. See the
* License for the specific language governing permissions and limitations under
* the License.
*/
package org.apache.hadoop.ozone.container.common;
import java.io.File;
import java.io.IOException;
import java.util.List;
import java.util.Map;
import java.util.UUID;
import java.util.concurrent.TimeUnit;
import java.util.concurrent.TimeoutException;
import com.google.common.collect.Lists;
import com.google.common.primitives.Ints;
import com.google.common.primitives.Longs;
import org.apache.commons.io.FileUtils;
import org.apache.hadoop.hdds.client.BlockID;
import org.apache.hadoop.hdds.conf.ConfigurationSource;
import org.apache.hadoop.hdds.conf.OzoneConfiguration;
import org.apache.hadoop.hdds.protocol.datanode.proto.ContainerProtos;
import org.apache.hadoop.hdds.scm.ScmConfigKeys;
import org.apache.hadoop.hdds.utils.BackgroundService;
import org.apache.hadoop.hdds.utils.MetadataKeyFilters;
import org.apache.hadoop.hdfs.DFSUtil;
import org.apache.hadoop.ozone.OzoneConsts;
import org.apache.hadoop.ozone.common.Checksum;
import org.apache.hadoop.ozone.container.ContainerTestHelper;
import org.apache.hadoop.ozone.container.common.helpers.BlockData;
import org.apache.hadoop.ozone.container.common.impl.ChunkLayOutVersion;
import org.apache.hadoop.ozone.container.common.impl.ContainerData;
import org.apache.hadoop.ozone.container.common.impl.ContainerSet;
import org.apache.hadoop.ozone.container.common.impl.TopNOrderedContainerDeletionChoosingPolicy;
import org.apache.hadoop.ozone.container.common.interfaces.Container;
import org.apache.hadoop.ozone.container.common.interfaces.ContainerDispatcher;
import org.apache.hadoop.ozone.container.common.interfaces.Handler;
import org.apache.hadoop.ozone.container.common.utils.ReferenceCountedDB;
import org.apache.hadoop.ozone.container.common.volume.MutableVolumeSet;
import org.apache.hadoop.ozone.container.common.volume.RoundRobinVolumeChoosingPolicy;
import org.apache.hadoop.ozone.container.keyvalue.ChunkLayoutTestInfo;
import org.apache.hadoop.ozone.container.keyvalue.KeyValueContainer;
import org.apache.hadoop.ozone.container.keyvalue.KeyValueContainerData;
import org.apache.hadoop.ozone.container.keyvalue.KeyValueHandler;
import org.apache.hadoop.ozone.container.keyvalue.helpers.BlockUtils;
import org.apache.hadoop.ozone.container.keyvalue.statemachine.background.BlockDeletingService;
import org.apache.hadoop.ozone.container.ozoneimpl.OzoneContainer;
import org.apache.hadoop.ozone.container.testutils.BlockDeletingServiceTestImpl;
import org.apache.hadoop.test.GenericTestUtils;
import org.apache.hadoop.test.GenericTestUtils.LogCapturer;
import static org.apache.hadoop.ozone.OzoneConfigKeys.OZONE_BLOCK_DELETING_CONTAINER_LIMIT_PER_INTERVAL;
import static org.apache.hadoop.ozone.OzoneConfigKeys.OZONE_BLOCK_DELETING_LIMIT_PER_CONTAINER;
import static org.apache.hadoop.ozone.OzoneConfigKeys.OZONE_BLOCK_DELETING_SERVICE_INTERVAL;
import org.junit.AfterClass;
import org.junit.Assert;
import org.junit.BeforeClass;
import org.junit.Test;
import org.junit.runner.RunWith;
import org.junit.runners.Parameterized;
import static org.apache.hadoop.ozone.OzoneConsts.DB_BLOCK_COUNT_KEY;
import static org.apache.hadoop.ozone.OzoneConsts.DB_PENDING_DELETE_BLOCK_COUNT_KEY;
import static org.mockito.ArgumentMatchers.any;
import static org.mockito.Mockito.mock;
import static org.mockito.Mockito.times;
import static org.mockito.Mockito.verify;
import static org.mockito.Mockito.when;
/**
* Tests to test block deleting service.
*/
@RunWith(Parameterized.class)
public class TestBlockDeletingService {
private static File testRoot;
private static String scmId;
private static String clusterID;
private Handler handler;
private final ChunkLayOutVersion layout;
public TestBlockDeletingService(ChunkLayOutVersion layout) {
this.layout = layout;
}
@Parameterized.Parameters
public static Iterable<Object[]> parameters() {
return ChunkLayoutTestInfo.chunkLayoutParameters();
}
@BeforeClass
public static void init() throws IOException {
testRoot = GenericTestUtils
.getTestDir(TestBlockDeletingService.class.getSimpleName());
if (testRoot.exists()) {
FileUtils.cleanDirectory(testRoot);
}
scmId = UUID.randomUUID().toString();
clusterID = UUID.randomUUID().toString();
}
@AfterClass
public static void cleanup() throws IOException {
FileUtils.deleteDirectory(testRoot);
}
/**
* A helper method to create some blocks and put them under deletion
* state for testing. This method directly updates container.db and
* creates some fake chunk files for testing.
*/
private void createToDeleteBlocks(ContainerSet containerSet,
ConfigurationSource conf, int numOfContainers,
int numOfBlocksPerContainer,
int numOfChunksPerBlock) throws IOException {
for (int x = 0; x < numOfContainers; x++) {
conf.set(ScmConfigKeys.HDDS_DATANODE_DIR_KEY, testRoot.getAbsolutePath());
long containerID = ContainerTestHelper.getTestContainerID();
KeyValueContainerData data = new KeyValueContainerData(containerID,
layout,
ContainerTestHelper.CONTAINER_MAX_SIZE, UUID.randomUUID().toString(),
UUID.randomUUID().toString());
data.closeContainer();
KeyValueContainer container = new KeyValueContainer(data, conf);
container.create(new MutableVolumeSet(scmId, clusterID, conf),
new RoundRobinVolumeChoosingPolicy(), scmId);
containerSet.addContainer(container);
data = (KeyValueContainerData) containerSet.getContainer(
containerID).getContainerData();
long blockLength = 100;
try(ReferenceCountedDB metadata = BlockUtils.getDB(data, conf)) {
for (int j = 0; j < numOfBlocksPerContainer; j++) {
BlockID blockID =
ContainerTestHelper.getTestBlockID(containerID);
String deleteStateName = OzoneConsts.DELETING_KEY_PREFIX +
blockID.getLocalID();
BlockData kd = new BlockData(blockID);
List<ContainerProtos.ChunkInfo> chunks = Lists.newArrayList();
for (int k = 0; k < numOfChunksPerBlock; k++) {
ContainerProtos.ChunkInfo info =
ContainerProtos.ChunkInfo.newBuilder()
.setChunkName(blockID.getLocalID() + "_chunk_" + k)
.setLen(blockLength)
.setOffset(0)
.setChecksumData(Checksum.getNoChecksumDataProto())
.build();
chunks.add(info);
}
kd.setChunks(chunks);
metadata.getStore().put(DFSUtil.string2Bytes(deleteStateName),
kd.getProtoBufMessage().toByteArray());
container.getContainerData().incrPendingDeletionBlocks(1);
}
container.getContainerData().setKeyCount(numOfBlocksPerContainer);
container.getContainerData().setBytesUsed(
blockLength * numOfBlocksPerContainer);
// Set block count, bytes used and pending delete block count.
metadata.getStore().put(DB_BLOCK_COUNT_KEY,
Longs.toByteArray(numOfBlocksPerContainer));
metadata.getStore().put(OzoneConsts.DB_CONTAINER_BYTES_USED_KEY,
Longs.toByteArray(blockLength * numOfBlocksPerContainer));
metadata.getStore().put(DB_PENDING_DELETE_BLOCK_COUNT_KEY,
Ints.toByteArray(numOfBlocksPerContainer));
}
}
}
/**
* Run service runDeletingTasks and wait for it's been processed.
*/
private void deleteAndWait(BlockDeletingServiceTestImpl service,
int timesOfProcessed) throws TimeoutException, InterruptedException {
service.runDeletingTasks();
GenericTestUtils.waitFor(()
-> service.getTimesOfProcessed() == timesOfProcessed, 100, 3000);
}
/**
* Get under deletion blocks count from DB,
* note this info is parsed from container.db.
*/
private int getUnderDeletionBlocksCount(ReferenceCountedDB meta)
throws IOException {
List<Map.Entry<byte[], byte[]>> underDeletionBlocks =
meta.getStore().getRangeKVs(null, 100,
new MetadataKeyFilters.KeyPrefixFilter()
.addFilter(OzoneConsts.DELETING_KEY_PREFIX));
return underDeletionBlocks.size();
}
private int getDeletedBlocksCount(ReferenceCountedDB db) throws IOException {
List<Map.Entry<byte[], byte[]>> underDeletionBlocks =
db.getStore().getRangeKVs(null, 100,
new MetadataKeyFilters.KeyPrefixFilter()
.addFilter(OzoneConsts.DELETED_KEY_PREFIX));
return underDeletionBlocks.size();
}
@Test
public void testBlockDeletion() throws Exception {
OzoneConfiguration conf = new OzoneConfiguration();
conf.setInt(OZONE_BLOCK_DELETING_CONTAINER_LIMIT_PER_INTERVAL, 10);
conf.setInt(OZONE_BLOCK_DELETING_LIMIT_PER_CONTAINER, 2);
ContainerSet containerSet = new ContainerSet();
createToDeleteBlocks(containerSet, conf, 1, 3, 1);
BlockDeletingServiceTestImpl svc =
getBlockDeletingService(containerSet, conf);
svc.start();
GenericTestUtils.waitFor(svc::isStarted, 100, 3000);
// Ensure 1 container was created
List<ContainerData> containerData = Lists.newArrayList();
containerSet.listContainer(0L, 1, containerData);
Assert.assertEquals(1, containerData.size());
try(ReferenceCountedDB meta = BlockUtils.getDB(
(KeyValueContainerData) containerData.get(0), conf)) {
Map<Long, Container<?>> containerMap = containerSet.getContainerMapCopy();
// NOTE: this test assumes that all the container is KetValueContainer and
// have DeleteTransactionId in KetValueContainerData. If other
// types is going to be added, this test should be checked.
long transactionId = ((KeyValueContainerData) containerMap
.get(containerData.get(0).getContainerID()).getContainerData())
.getDeleteTransactionId();
// Number of deleted blocks in container should be equal to 0 before
// block delete
Assert.assertEquals(0, transactionId);
// Ensure there are 3 blocks under deletion and 0 deleted blocks
Assert.assertEquals(3, getUnderDeletionBlocksCount(meta));
Assert.assertEquals(0, getDeletedBlocksCount(meta));
// An interval will delete 1 * 2 blocks
deleteAndWait(svc, 1);
Assert.assertEquals(1, getUnderDeletionBlocksCount(meta));
Assert.assertEquals(2, getDeletedBlocksCount(meta));
deleteAndWait(svc, 2);
Assert.assertEquals(0, getUnderDeletionBlocksCount(meta));
Assert.assertEquals(3, getDeletedBlocksCount(meta));
deleteAndWait(svc, 3);
Assert.assertEquals(0, getUnderDeletionBlocksCount(meta));
Assert.assertEquals(3, getDeletedBlocksCount(meta));
// Check finally DB counters.
// Not checking bytes used, as handler is a mock call.
Assert.assertEquals(0, Ints.fromByteArray(
meta.getStore().get(DB_PENDING_DELETE_BLOCK_COUNT_KEY)));
Assert.assertEquals(0, Longs.fromByteArray(
meta.getStore().get(DB_BLOCK_COUNT_KEY)));
}
svc.shutdown();
}
@Test
@SuppressWarnings("java:S2699") // waitFor => assertion with timeout
public void testShutdownService() throws Exception {
OzoneConfiguration conf = new OzoneConfiguration();
conf.setTimeDuration(OZONE_BLOCK_DELETING_SERVICE_INTERVAL, 500,
TimeUnit.MILLISECONDS);
conf.setInt(OZONE_BLOCK_DELETING_CONTAINER_LIMIT_PER_INTERVAL, 10);
conf.setInt(OZONE_BLOCK_DELETING_LIMIT_PER_CONTAINER, 10);
ContainerSet containerSet = new ContainerSet();
// Create 1 container with 100 blocks
createToDeleteBlocks(containerSet, conf, 1, 100, 1);
BlockDeletingServiceTestImpl service =
getBlockDeletingService(containerSet, conf);
service.start();
GenericTestUtils.waitFor(service::isStarted, 100, 3000);
// Run some deleting tasks and verify there are threads running
service.runDeletingTasks();
GenericTestUtils.waitFor(() -> service.getThreadCount() > 0, 100, 1000);
// Shutdown service and verify all threads are stopped
service.shutdown();
GenericTestUtils.waitFor(() -> service.getThreadCount() == 0, 100, 1000);
}
@Test
public void testBlockDeletionTimeout() throws Exception {
OzoneConfiguration conf = new OzoneConfiguration();
conf.setInt(OZONE_BLOCK_DELETING_CONTAINER_LIMIT_PER_INTERVAL, 10);
conf.setInt(OZONE_BLOCK_DELETING_LIMIT_PER_CONTAINER, 2);
ContainerSet containerSet = new ContainerSet();
createToDeleteBlocks(containerSet, conf, 1, 3, 1);
// set timeout value as 1ns to trigger timeout behavior
long timeout = 1;
OzoneContainer ozoneContainer = mockDependencies(containerSet);
BlockDeletingService svc = new BlockDeletingService(ozoneContainer,
TimeUnit.MILLISECONDS.toNanos(1000), timeout, TimeUnit.NANOSECONDS,
conf);
svc.start();
LogCapturer log = LogCapturer.captureLogs(BackgroundService.LOG);
GenericTestUtils.waitFor(() -> {
if(log.getOutput().contains(
"Background task executes timed out, retrying in next interval")) {
log.stopCapturing();
return true;
}
return false;
}, 100, 1000);
log.stopCapturing();
svc.shutdown();
// test for normal case that doesn't have timeout limitation
timeout = 0;
createToDeleteBlocks(containerSet, conf, 1, 3, 1);
svc = new BlockDeletingService(ozoneContainer,
TimeUnit.MILLISECONDS.toNanos(1000), timeout, TimeUnit.MILLISECONDS,
conf);
svc.start();
// get container meta data
KeyValueContainer container =
(KeyValueContainer) containerSet.getContainerIterator().next();
KeyValueContainerData data = container.getContainerData();
try (ReferenceCountedDB meta = BlockUtils.getDB(data, conf)) {
LogCapturer newLog = LogCapturer.captureLogs(BackgroundService.LOG);
GenericTestUtils.waitFor(() -> {
try {
return getUnderDeletionBlocksCount(meta) == 0;
} catch (IOException ignored) {
}
return false;
}, 100, 1000);
newLog.stopCapturing();
// The block deleting successfully and shouldn't catch timed
// out warning log.
Assert.assertFalse(newLog.getOutput().contains(
"Background task executes timed out, retrying in next interval"));
}
svc.shutdown();
}
private BlockDeletingServiceTestImpl getBlockDeletingService(
ContainerSet containerSet, ConfigurationSource conf) {
OzoneContainer ozoneContainer = mockDependencies(containerSet);
return new BlockDeletingServiceTestImpl(ozoneContainer, 1000, conf);
}
private OzoneContainer mockDependencies(ContainerSet containerSet) {
OzoneContainer ozoneContainer = mock(OzoneContainer.class);
when(ozoneContainer.getContainerSet()).thenReturn(containerSet);
when(ozoneContainer.getWriteChannel()).thenReturn(null);
ContainerDispatcher dispatcher = mock(ContainerDispatcher.class);
when(ozoneContainer.getDispatcher()).thenReturn(dispatcher);
handler = mock(KeyValueHandler.class);
when(dispatcher.getHandler(any())).thenReturn(handler);
return ozoneContainer;
}
@Test(timeout = 30000)
public void testContainerThrottle() throws Exception {
// Properties :
// - Number of containers : 2
// - Number of blocks per container : 1
// - Number of chunks per block : 10
// - Container limit per interval : 1
// - Block limit per container : 1
//
// Each time only 1 container can be processed, so each time
// 1 block from 1 container can be deleted.
OzoneConfiguration conf = new OzoneConfiguration();
// Process 1 container per interval
conf.set(
ScmConfigKeys.OZONE_SCM_KEY_VALUE_CONTAINER_DELETION_CHOOSING_POLICY,
TopNOrderedContainerDeletionChoosingPolicy.class.getName());
conf.setInt(OZONE_BLOCK_DELETING_CONTAINER_LIMIT_PER_INTERVAL, 1);
conf.setInt(OZONE_BLOCK_DELETING_LIMIT_PER_CONTAINER, 1);
ContainerSet containerSet = new ContainerSet();
int containerCount = 2;
int chunksPerBlock = 10;
int blocksPerContainer = 1;
createToDeleteBlocks(containerSet, conf, containerCount, blocksPerContainer,
chunksPerBlock);
BlockDeletingServiceTestImpl service =
getBlockDeletingService(containerSet, conf);
service.start();
try {
GenericTestUtils.waitFor(service::isStarted, 100, 3000);
for (int i = 1; i <= containerCount; i++) {
deleteAndWait(service, i);
verify(handler, times(i * blocksPerContainer))
.deleteBlock(any(), any());
}
} finally {
service.shutdown();
}
}
@Test(timeout = 30000)
public void testBlockThrottle() throws Exception {
// Properties :
// - Number of containers : 5
// - Number of blocks per container : 3
// - Number of chunks per block : 1
// - Container limit per interval : 10
// - Block limit per container : 2
//
// Each time containers can be all scanned, but only 2 blocks
// per container can be actually deleted. So it requires 2 waves
// to cleanup all blocks.
OzoneConfiguration conf = new OzoneConfiguration();
conf.setInt(OZONE_BLOCK_DELETING_CONTAINER_LIMIT_PER_INTERVAL, 10);
int blockLimitPerTask = 2;
conf.setInt(OZONE_BLOCK_DELETING_LIMIT_PER_CONTAINER, blockLimitPerTask);
ContainerSet containerSet = new ContainerSet();
int containerCount = 5;
int blocksPerContainer = 3;
createToDeleteBlocks(containerSet, conf, containerCount,
blocksPerContainer, 1);
BlockDeletingServiceTestImpl service =
getBlockDeletingService(containerSet, conf);
service.start();
try {
GenericTestUtils.waitFor(service::isStarted, 100, 3000);
// Total blocks = 3 * 5 = 15
// block per task = 2
// number of containers = 5
// each interval will at most runDeletingTasks 5 * 2 = 10 blocks
deleteAndWait(service, 1);
verify(handler, times(blockLimitPerTask * containerCount))
.deleteBlock(any(), any());
// There is only 5 blocks left to runDeletingTasks
deleteAndWait(service, 2);
verify(handler, times(
blocksPerContainer * containerCount))
.deleteBlock(any(), any());
} finally {
service.shutdown();
}
}
}