| /* |
| * Licensed to the Apache Software Foundation (ASF) under one |
| * or more contributor license agreements. See the NOTICE file |
| * distributed with this work for additional information |
| * regarding copyright ownership. The ASF licenses this file |
| * to you under the Apache License, Version 2.0 (the |
| * "License"); you may not use this file except in compliance |
| * with the License. You may obtain a copy of the License at |
| * <p> |
| * http://www.apache.org/licenses/LICENSE-2.0 |
| * <p> |
| * Unless required by applicable law or agreed to in writing, software |
| * distributed under the License is distributed on an "AS IS" BASIS, |
| * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. |
| * See the License for the specific language governing permissions and |
| * limitations under the License. |
| */ |
| |
| package org.apache.hadoop.ozone.container.ozoneimpl; |
| |
| import org.apache.hadoop.conf.StorageUnit; |
| import org.apache.hadoop.hdds.client.BlockID; |
| import org.apache.hadoop.hdds.conf.OzoneConfiguration; |
| import org.apache.hadoop.hdds.protocol.datanode.proto.ContainerProtos; |
| import org.apache.hadoop.hdds.utils.db.Table; |
| import org.apache.hadoop.hdds.scm.ScmConfigKeys; |
| import org.apache.hadoop.ozone.OzoneConsts; |
| import org.apache.hadoop.ozone.container.common.helpers.BlockData; |
| import org.apache.hadoop.ozone.container.common.helpers.ChunkInfo; |
| import org.apache.hadoop.ozone.container.common.impl.ChunkLayOutVersion; |
| import org.apache.hadoop.ozone.container.common.impl.ContainerSet; |
| import org.apache.hadoop.ozone.container.common.interfaces.Container; |
| import org.apache.hadoop.ozone.container.common.utils.ContainerCache; |
| import org.apache.hadoop.ozone.container.common.utils.ReferenceCountedDB; |
| import org.apache.hadoop.ozone.container.common.volume.HddsVolume; |
| import org.apache.hadoop.ozone.container.common.volume.MutableVolumeSet; |
| import org.apache.hadoop.ozone.container.common.volume.RoundRobinVolumeChoosingPolicy; |
| import org.apache.hadoop.ozone.container.keyvalue.KeyValueContainer; |
| import org.apache.hadoop.ozone.container.keyvalue.KeyValueContainerData; |
| import org.apache.hadoop.ozone.container.keyvalue.helpers.BlockUtils; |
| import org.junit.Assert; |
| import org.junit.Before; |
| import org.junit.Rule; |
| import org.junit.Test; |
| import org.junit.rules.TemporaryFolder; |
| import org.mockito.Mockito; |
| |
| import java.io.File; |
| import java.util.ArrayList; |
| import java.util.List; |
| import java.util.UUID; |
| |
| import static org.mockito.ArgumentMatchers.anyList; |
| import static org.mockito.ArgumentMatchers.anyLong; |
| import static org.mockito.Mockito.mock; |
| |
| /** |
| * Test ContainerReader class which loads containers from disks. |
| */ |
| public class TestContainerReader { |
| |
| @Rule |
| public final TemporaryFolder tempDir = new TemporaryFolder(); |
| |
| private MutableVolumeSet volumeSet; |
| private HddsVolume hddsVolume; |
| private ContainerSet containerSet; |
| private OzoneConfiguration conf; |
| |
| |
| private RoundRobinVolumeChoosingPolicy volumeChoosingPolicy; |
| private UUID datanodeId; |
| private String clusterId = UUID.randomUUID().toString(); |
| private int blockCount = 10; |
| private long blockLen = 1024; |
| |
| @Before |
| public void setup() throws Exception { |
| |
| File volumeDir = tempDir.newFolder(); |
| volumeSet = Mockito.mock(MutableVolumeSet.class); |
| containerSet = new ContainerSet(); |
| conf = new OzoneConfiguration(); |
| |
| datanodeId = UUID.randomUUID(); |
| hddsVolume = new HddsVolume.Builder(volumeDir |
| .getAbsolutePath()).conf(conf).datanodeUuid(datanodeId |
| .toString()).clusterID(clusterId).build(); |
| |
| volumeSet = mock(MutableVolumeSet.class); |
| volumeChoosingPolicy = mock(RoundRobinVolumeChoosingPolicy.class); |
| Mockito.when(volumeChoosingPolicy.chooseVolume(anyList(), anyLong())) |
| .thenReturn(hddsVolume); |
| |
| for (int i=0; i<2; i++) { |
| KeyValueContainerData keyValueContainerData = new KeyValueContainerData(i, |
| ChunkLayOutVersion.FILE_PER_BLOCK, |
| (long) StorageUnit.GB.toBytes(5), UUID.randomUUID().toString(), |
| datanodeId.toString()); |
| |
| KeyValueContainer keyValueContainer = |
| new KeyValueContainer(keyValueContainerData, |
| conf); |
| keyValueContainer.create(volumeSet, volumeChoosingPolicy, clusterId); |
| |
| |
| List<Long> blkNames; |
| if (i % 2 == 0) { |
| blkNames = addBlocks(keyValueContainer, true); |
| markBlocksForDelete(keyValueContainer, true, blkNames, i); |
| } else { |
| blkNames = addBlocks(keyValueContainer, false); |
| markBlocksForDelete(keyValueContainer, false, blkNames, i); |
| } |
| // Close the RocksDB instance for this container and remove from the cache |
| // so it does not affect the ContainerReader, which avoids using the cache |
| // at startup for performance reasons. |
| BlockUtils.removeDB(keyValueContainerData, conf); |
| } |
| } |
| |
| |
| private void markBlocksForDelete(KeyValueContainer keyValueContainer, |
| boolean setMetaData, List<Long> blockNames, int count) throws Exception { |
| try(ReferenceCountedDB metadataStore = BlockUtils.getDB(keyValueContainer |
| .getContainerData(), conf)) { |
| |
| for (int i = 0; i < count; i++) { |
| Table<String, BlockData> blockDataTable = |
| metadataStore.getStore().getBlockDataTable(); |
| |
| String blk = Long.toString(blockNames.get(i)); |
| BlockData blkInfo = blockDataTable.get(blk); |
| |
| blockDataTable.delete(blk); |
| blockDataTable.put(OzoneConsts.DELETING_KEY_PREFIX + blk, blkInfo); |
| } |
| |
| if (setMetaData) { |
| // Pending delete blocks are still counted towards the block count |
| // and bytes used metadata values, so those do not change. |
| Table<String, Long> metadataTable = |
| metadataStore.getStore().getMetadataTable(); |
| metadataTable.put(OzoneConsts.PENDING_DELETE_BLOCK_COUNT, (long)count); |
| } |
| } |
| |
| } |
| |
| private List<Long> addBlocks(KeyValueContainer keyValueContainer, |
| boolean setMetaData) throws Exception { |
| long containerId = keyValueContainer.getContainerData().getContainerID(); |
| |
| List<Long> blkNames = new ArrayList<>(); |
| try(ReferenceCountedDB metadataStore = BlockUtils.getDB(keyValueContainer |
| .getContainerData(), conf)) { |
| |
| for (int i = 0; i < blockCount; i++) { |
| // Creating BlockData |
| BlockID blockID = new BlockID(containerId, i); |
| BlockData blockData = new BlockData(blockID); |
| blockData.addMetadata(OzoneConsts.VOLUME, OzoneConsts.OZONE); |
| blockData.addMetadata(OzoneConsts.OWNER, |
| OzoneConsts.OZONE_SIMPLE_HDFS_USER); |
| List<ContainerProtos.ChunkInfo> chunkList = new ArrayList<>(); |
| long localBlockID = blockID.getLocalID(); |
| ChunkInfo info = new ChunkInfo(String.format( |
| "%d.data.%d", localBlockID, 0), 0, blockLen); |
| chunkList.add(info.getProtoBufMessage()); |
| blockData.setChunks(chunkList); |
| blkNames.add(localBlockID); |
| metadataStore.getStore().getBlockDataTable() |
| .put(Long.toString(localBlockID), blockData); |
| } |
| |
| if (setMetaData) { |
| metadataStore.getStore().getMetadataTable() |
| .put(OzoneConsts.BLOCK_COUNT, (long)blockCount); |
| metadataStore.getStore().getMetadataTable() |
| .put(OzoneConsts.CONTAINER_BYTES_USED, blockCount * blockLen); |
| } |
| } |
| |
| return blkNames; |
| } |
| |
| @Test |
| public void testContainerReader() throws Exception { |
| ContainerReader containerReader = new ContainerReader(volumeSet, |
| hddsVolume, containerSet, conf); |
| |
| Thread thread = new Thread(containerReader); |
| thread.start(); |
| thread.join(); |
| |
| Assert.assertEquals(2, containerSet.containerCount()); |
| |
| for (int i=0; i < 2; i++) { |
| Container keyValueContainer = containerSet.getContainer(i); |
| |
| KeyValueContainerData keyValueContainerData = (KeyValueContainerData) |
| keyValueContainer.getContainerData(); |
| |
| // Verify block related metadata. |
| Assert.assertEquals(blockCount, |
| keyValueContainerData.getKeyCount()); |
| |
| Assert.assertEquals(blockCount * blockLen, |
| keyValueContainerData.getBytesUsed()); |
| |
| Assert.assertEquals(i, |
| keyValueContainerData.getNumPendingDeletionBlocks()); |
| } |
| } |
| |
| @Test |
| public void testContainerReaderWithLoadException() throws Exception { |
| MutableVolumeSet volumeSet1; |
| HddsVolume hddsVolume1; |
| ContainerSet containerSet1 = new ContainerSet(); |
| File volumeDir1 = tempDir.newFolder(); |
| RoundRobinVolumeChoosingPolicy volumeChoosingPolicy1; |
| |
| volumeSet1 = Mockito.mock(MutableVolumeSet.class); |
| UUID datanode = UUID.randomUUID(); |
| hddsVolume1 = new HddsVolume.Builder(volumeDir1 |
| .getAbsolutePath()).conf(conf).datanodeUuid(datanode |
| .toString()).clusterID(clusterId).build(); |
| volumeChoosingPolicy1 = mock(RoundRobinVolumeChoosingPolicy.class); |
| Mockito.when(volumeChoosingPolicy1.chooseVolume(anyList(), anyLong())) |
| .thenReturn(hddsVolume1); |
| |
| int containerCount = 3; |
| for (int i = 0; i < containerCount; i++) { |
| KeyValueContainerData keyValueContainerData = new KeyValueContainerData(i, |
| ChunkLayOutVersion.FILE_PER_BLOCK, |
| (long) StorageUnit.GB.toBytes(5), UUID.randomUUID().toString(), |
| datanodeId.toString()); |
| KeyValueContainer keyValueContainer = |
| new KeyValueContainer(keyValueContainerData, conf); |
| keyValueContainer.create(volumeSet1, volumeChoosingPolicy1, clusterId); |
| BlockUtils.removeDB(keyValueContainerData, conf); |
| |
| if (i == 0) { |
| // rename first container directory name |
| String containerPathStr = |
| keyValueContainer.getContainerData().getContainerPath(); |
| File containerPath = new File(containerPathStr); |
| String renamePath = containerPathStr + "-aa"; |
| containerPath.renameTo(new File(renamePath)); |
| } |
| } |
| |
| ContainerReader containerReader = new ContainerReader(volumeSet1, |
| hddsVolume1, containerSet1, conf); |
| containerReader.readVolume(hddsVolume1.getHddsRootDir()); |
| Assert.assertEquals(containerCount - 1, containerSet1.containerCount()); |
| } |
| @Test |
| public void testMultipleContainerReader() throws Exception { |
| final int volumeNum = 10; |
| StringBuffer datanodeDirs = new StringBuffer(); |
| File[] volumeDirs = new File[volumeNum]; |
| for (int i = 0; i < volumeNum; i++) { |
| volumeDirs[i] = tempDir.newFolder(); |
| datanodeDirs = datanodeDirs.append(volumeDirs[i]).append(","); |
| } |
| conf.set(ScmConfigKeys.HDDS_DATANODE_DIR_KEY, |
| datanodeDirs.toString()); |
| MutableVolumeSet volumeSets = |
| new MutableVolumeSet(datanodeId.toString(), clusterId, conf); |
| ContainerCache cache = ContainerCache.getInstance(conf); |
| cache.clear(); |
| |
| RoundRobinVolumeChoosingPolicy policy = |
| new RoundRobinVolumeChoosingPolicy(); |
| |
| final int containerCount = 100; |
| blockCount = containerCount; |
| for (int i = 0; i < containerCount; i++) { |
| KeyValueContainerData keyValueContainerData = |
| new KeyValueContainerData(i, ChunkLayOutVersion.FILE_PER_BLOCK, |
| (long) StorageUnit.GB.toBytes(5), UUID.randomUUID().toString(), |
| datanodeId.toString()); |
| |
| KeyValueContainer keyValueContainer = |
| new KeyValueContainer(keyValueContainerData, |
| conf); |
| keyValueContainer.create(volumeSets, policy, clusterId); |
| |
| List<Long> blkNames; |
| if (i % 2 == 0) { |
| blkNames = addBlocks(keyValueContainer, true); |
| markBlocksForDelete(keyValueContainer, true, blkNames, i); |
| } else { |
| blkNames = addBlocks(keyValueContainer, false); |
| markBlocksForDelete(keyValueContainer, false, blkNames, i); |
| } |
| // Close the RocksDB instance for this container and remove from the cache |
| // so it does not affect the ContainerReader, which avoids using the cache |
| // at startup for performance reasons. |
| BlockUtils.removeDB(keyValueContainerData, conf); |
| } |
| |
| List<HddsVolume> hddsVolumes = volumeSets.getVolumesList(); |
| ContainerReader[] containerReaders = new ContainerReader[volumeNum]; |
| Thread[] threads = new Thread[volumeNum]; |
| for (int i = 0; i < volumeNum; i++) { |
| containerReaders[i] = new ContainerReader(volumeSets, |
| hddsVolumes.get(i), containerSet, conf); |
| threads[i] = new Thread(containerReaders[i]); |
| } |
| long startTime = System.currentTimeMillis(); |
| for (int i = 0; i < volumeNum; i++) { |
| threads[i].start(); |
| } |
| for (int i = 0; i < volumeNum; i++) { |
| threads[i].join(); |
| } |
| System.out.println("Open " + volumeNum + " Volume with " + containerCount + |
| " costs " + (System.currentTimeMillis() - startTime) / 1000 + "s"); |
| Assert.assertEquals(containerCount, |
| containerSet.getContainerMap().entrySet().size()); |
| // There should be no open containers cached by the ContainerReader as it |
| // opens and closed them avoiding the cache. |
| Assert.assertEquals(0, cache.size()); |
| } |
| } |