blob: 8989c3e6b3348f978982e60f475f2df2d9fb24f3 [file] [log] [blame]
/**
* Licensed to the Apache Software Foundation (ASF) under one
* or more contributor license agreements. See the NOTICE file
* distributed with this work for additional information
* regarding copyright ownership. The ASF licenses this file
* to you under the Apache License, Version 2.0 (the
* "License"); you may not use this file except in compliance
* with the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/
package org.apache.hadoop.ozone.container.keyvalue;
import org.apache.hadoop.conf.StorageUnit;
import org.apache.hadoop.hdds.client.BlockID;
import org.apache.hadoop.hdds.conf.ConfigurationSource;
import org.apache.hadoop.hdds.conf.OzoneConfiguration;
import org.apache.hadoop.hdds.protocol.datanode.proto.ContainerProtos;
import org.apache.hadoop.hdds.scm.container.common.helpers
.StorageContainerException;
import org.apache.hadoop.hdds.utils.db.Table;
import org.apache.hadoop.ozone.OzoneConsts;
import org.apache.hadoop.ozone.container.common.helpers.BlockData;
import org.apache.hadoop.ozone.container.common.impl.ChunkLayOutVersion;
import org.apache.hadoop.ozone.container.common.impl.ContainerDataYaml;
import org.apache.hadoop.ozone.container.common.volume.HddsVolume;
import org.apache.hadoop.ozone.container.common.volume
.RoundRobinVolumeChoosingPolicy;
import org.apache.hadoop.ozone.container.common.volume.VolumeSet;
import org.apache.hadoop.ozone.container.common.volume.MutableVolumeSet;
import org.apache.hadoop.ozone.container.keyvalue.helpers.BlockUtils;
import org.apache.hadoop.ozone.container.metadata.AbstractDatanodeStore;
import org.apache.hadoop.test.GenericTestUtils;
import org.apache.hadoop.util.DiskChecker;
import org.apache.hadoop.ozone.container.common.utils.ReferenceCountedDB;
import org.junit.Assert;
import org.junit.Before;
import org.junit.Rule;
import org.junit.Test;
import org.junit.rules.TemporaryFolder;
import org.junit.runner.RunWith;
import org.junit.runners.Parameterized;
import org.mockito.Mockito;
import org.rocksdb.ColumnFamilyOptions;
import java.io.File;
import java.io.FileInputStream;
import java.io.FileOutputStream;
import java.io.IOException;
import java.io.OutputStream;
import java.util.HashMap;
import java.util.Map;
import java.util.List;
import java.util.UUID;
import java.util.concurrent.atomic.AtomicReference;
import java.util.stream.Collectors;
import java.util.stream.IntStream;
import static org.apache.ratis.util.Preconditions.assertTrue;
import static org.junit.Assert.assertEquals;
import static org.junit.Assert.assertFalse;
import static org.junit.Assert.assertNull;
import static org.junit.Assert.fail;
import static org.mockito.ArgumentMatchers.anyList;
import static org.mockito.ArgumentMatchers.anyLong;
import static org.mockito.Mockito.mock;
/**
* Class to test KeyValue Container operations.
*/
@RunWith(Parameterized.class)
public class TestKeyValueContainer {
@Rule
public TemporaryFolder folder = new TemporaryFolder();
private String scmId = UUID.randomUUID().toString();
private VolumeSet volumeSet;
private RoundRobinVolumeChoosingPolicy volumeChoosingPolicy;
private KeyValueContainerData keyValueContainerData;
private KeyValueContainer keyValueContainer;
private UUID datanodeId;
private final ChunkLayOutVersion layout;
// Use one configuration object across parameterized runs of tests.
// This preserves the column family options in the container options
// cache for testContainersShareColumnFamilyOptions.
private static final OzoneConfiguration CONF = new OzoneConfiguration();
public TestKeyValueContainer(ChunkLayOutVersion layout) {
this.layout = layout;
}
@Parameterized.Parameters
public static Iterable<Object[]> parameters() {
return ChunkLayoutTestInfo.chunkLayoutParameters();
}
@Before
public void setUp() throws Exception {
datanodeId = UUID.randomUUID();
HddsVolume hddsVolume = new HddsVolume.Builder(folder.getRoot()
.getAbsolutePath()).conf(CONF).datanodeUuid(datanodeId
.toString()).build();
volumeSet = mock(MutableVolumeSet.class);
volumeChoosingPolicy = mock(RoundRobinVolumeChoosingPolicy.class);
Mockito.when(volumeChoosingPolicy.chooseVolume(anyList(), anyLong()))
.thenReturn(hddsVolume);
keyValueContainerData = new KeyValueContainerData(1L,
layout,
(long) StorageUnit.GB.toBytes(5), UUID.randomUUID().toString(),
datanodeId.toString());
keyValueContainer = new KeyValueContainer(keyValueContainerData, CONF);
}
@Test
public void testCreateContainer() throws Exception {
createContainer();
String containerMetaDataPath = keyValueContainerData.getMetadataPath();
String chunksPath = keyValueContainerData.getChunksPath();
// Check whether containerMetaDataPath and chunksPath exists or not.
assertTrue(containerMetaDataPath != null);
assertTrue(chunksPath != null);
// Check whether container file and container db file exists or not.
assertTrue(keyValueContainer.getContainerFile().exists(),
".Container File does not exist");
assertTrue(keyValueContainer.getContainerDBFile().exists(), "Container " +
"DB does not exist");
}
@Test
public void testContainerImportExport() throws Exception {
long containerId = keyValueContainer.getContainerData().getContainerID();
createContainer();
long numberOfKeysToWrite = 12;
closeContainer();
populate(numberOfKeysToWrite);
//destination path
File folderToExport = folder.newFile("exported.tar.gz");
TarContainerPacker packer = new TarContainerPacker();
//export the container
try (FileOutputStream fos = new FileOutputStream(folderToExport)) {
keyValueContainer
.exportContainerData(fos, packer);
}
//delete the original one
keyValueContainer.delete();
//create a new one
KeyValueContainerData containerData =
new KeyValueContainerData(containerId,
keyValueContainerData.getLayOutVersion(),
keyValueContainerData.getMaxSize(), UUID.randomUUID().toString(),
datanodeId.toString());
KeyValueContainer container = new KeyValueContainer(containerData, CONF);
HddsVolume containerVolume = volumeChoosingPolicy.chooseVolume(volumeSet
.getVolumesList(), 1);
String hddsVolumeDir = containerVolume.getHddsRootDir().toString();
container.populatePathFields(scmId, containerVolume, hddsVolumeDir);
try (FileInputStream fis = new FileInputStream(folderToExport)) {
container.importContainerData(fis, packer);
}
assertEquals("value1", containerData.getMetadata().get("key1"));
assertEquals(keyValueContainerData.getContainerDBType(),
containerData.getContainerDBType());
assertEquals(keyValueContainerData.getState(),
containerData.getState());
assertEquals(numberOfKeysToWrite,
containerData.getKeyCount());
assertEquals(keyValueContainerData.getLayOutVersion(),
containerData.getLayOutVersion());
assertEquals(keyValueContainerData.getMaxSize(),
containerData.getMaxSize());
assertEquals(keyValueContainerData.getBytesUsed(),
containerData.getBytesUsed());
//Can't overwrite existing container
try {
try (FileInputStream fis = new FileInputStream(folderToExport)) {
container.importContainerData(fis, packer);
}
fail("Container is imported twice. Previous files are overwritten");
} catch (IOException ex) {
//all good
assertTrue(container.getContainerFile().exists());
}
//Import failure should cleanup the container directory
containerData =
new KeyValueContainerData(containerId + 1,
keyValueContainerData.getLayOutVersion(),
keyValueContainerData.getMaxSize(), UUID.randomUUID().toString(),
datanodeId.toString());
container = new KeyValueContainer(containerData, CONF);
containerVolume = volumeChoosingPolicy.chooseVolume(volumeSet
.getVolumesList(), 1);
hddsVolumeDir = containerVolume.getHddsRootDir().toString();
container.populatePathFields(scmId, containerVolume, hddsVolumeDir);
try {
FileInputStream fis = new FileInputStream(folderToExport);
fis.close();
container.importContainerData(fis, packer);
fail("Container import should fail");
} catch (Exception ex) {
assertTrue(ex instanceof IOException);
} finally {
File directory =
new File(container.getContainerData().getContainerPath());
assertFalse(directory.exists());
}
}
/**
* Create the container on disk.
*/
private void createContainer() throws StorageContainerException {
keyValueContainer.create(volumeSet, volumeChoosingPolicy, scmId);
keyValueContainerData = keyValueContainer.getContainerData();
}
/**
* Add some keys to the container.
*/
private void populate(long numberOfKeysToWrite) throws IOException {
try (ReferenceCountedDB metadataStore =
BlockUtils.getDB(keyValueContainer.getContainerData(), CONF)) {
Table<String, BlockData> blockDataTable =
metadataStore.getStore().getBlockDataTable();
for (long i = 0; i < numberOfKeysToWrite; i++) {
blockDataTable.put("test" + i, new BlockData(new BlockID(i, i)));
}
// As now when we put blocks, we increment block count and update in DB.
// As for test, we are doing manually so adding key count to DB.
metadataStore.getStore().getMetadataTable()
.put(OzoneConsts.BLOCK_COUNT, numberOfKeysToWrite);
}
Map<String, String> metadata = new HashMap<>();
metadata.put("key1", "value1");
keyValueContainer.update(metadata, true);
}
/**
* Set container state to CLOSED.
*/
private void closeContainer() {
keyValueContainerData.setState(
ContainerProtos.ContainerDataProto.State.CLOSED);
}
@Test
public void concurrentExport() throws Exception {
createContainer();
populate(100);
closeContainer();
AtomicReference<String> failed = new AtomicReference<>();
TarContainerPacker packer = new TarContainerPacker();
List<Thread> threads = IntStream.range(0, 20)
.mapToObj(i -> new Thread(() -> {
try {
File file = folder.newFile("concurrent" + i + ".tar.gz");
try (OutputStream out = new FileOutputStream(file)) {
keyValueContainer.exportContainerData(out, packer);
}
} catch (Exception e) {
failed.compareAndSet(null, e.getMessage());
}
}))
.collect(Collectors.toList());
threads.forEach(Thread::start);
for (Thread thread : threads) {
thread.join();
}
assertNull(failed.get());
}
@Test
public void testDuplicateContainer() throws Exception {
try {
// Create Container.
keyValueContainer.create(volumeSet, volumeChoosingPolicy, scmId);
keyValueContainer.create(volumeSet, volumeChoosingPolicy, scmId);
fail("testDuplicateContainer failed");
} catch (StorageContainerException ex) {
GenericTestUtils.assertExceptionContains("ContainerFile already " +
"exists", ex);
assertEquals(ContainerProtos.Result.CONTAINER_ALREADY_EXISTS, ex
.getResult());
}
}
@Test
public void testDiskFullExceptionCreateContainer() throws Exception {
Mockito.when(volumeChoosingPolicy.chooseVolume(anyList(), anyLong()))
.thenThrow(DiskChecker.DiskOutOfSpaceException.class);
try {
keyValueContainer.create(volumeSet, volumeChoosingPolicy, scmId);
fail("testDiskFullExceptionCreateContainer failed");
} catch (StorageContainerException ex) {
GenericTestUtils.assertExceptionContains("disk out of space",
ex);
assertEquals(ContainerProtos.Result.DISK_OUT_OF_SPACE, ex.getResult());
}
}
@Test
public void testDeleteContainer() throws Exception {
closeContainer();
keyValueContainer = new KeyValueContainer(
keyValueContainerData, CONF);
keyValueContainer.create(volumeSet, volumeChoosingPolicy, scmId);
keyValueContainer.delete();
String containerMetaDataPath = keyValueContainerData
.getMetadataPath();
File containerMetaDataLoc = new File(containerMetaDataPath);
assertFalse("Container directory still exists", containerMetaDataLoc
.getParentFile().exists());
assertFalse("Container File still exists",
keyValueContainer.getContainerFile().exists());
assertFalse("Container DB file still exists",
keyValueContainer.getContainerDBFile().exists());
}
@Test
public void testCloseContainer() throws Exception {
keyValueContainer.create(volumeSet, volumeChoosingPolicy, scmId);
keyValueContainer.close();
keyValueContainerData = keyValueContainer
.getContainerData();
assertEquals(ContainerProtos.ContainerDataProto.State.CLOSED,
keyValueContainerData.getState());
//Check state in the .container file
File containerFile = keyValueContainer.getContainerFile();
keyValueContainerData = (KeyValueContainerData) ContainerDataYaml
.readContainerFile(containerFile);
assertEquals(ContainerProtos.ContainerDataProto.State.CLOSED,
keyValueContainerData.getState());
}
@Test
public void testReportOfUnhealthyContainer() throws Exception {
keyValueContainer.create(volumeSet, volumeChoosingPolicy, scmId);
Assert.assertNotNull(keyValueContainer.getContainerReport());
keyValueContainer.markContainerUnhealthy();
File containerFile = keyValueContainer.getContainerFile();
keyValueContainerData = (KeyValueContainerData) ContainerDataYaml
.readContainerFile(containerFile);
assertEquals(ContainerProtos.ContainerDataProto.State.UNHEALTHY,
keyValueContainerData.getState());
Assert.assertNotNull(keyValueContainer.getContainerReport());
}
@Test
public void testUpdateContainer() throws IOException {
keyValueContainer.create(volumeSet, volumeChoosingPolicy, scmId);
Map<String, String> metadata = new HashMap<>();
metadata.put(OzoneConsts.VOLUME, OzoneConsts.OZONE);
metadata.put(OzoneConsts.OWNER, OzoneConsts.OZONE_SIMPLE_HDFS_USER);
keyValueContainer.update(metadata, true);
keyValueContainerData = keyValueContainer
.getContainerData();
assertEquals(2, keyValueContainerData.getMetadata().size());
//Check metadata in the .container file
File containerFile = keyValueContainer.getContainerFile();
keyValueContainerData = (KeyValueContainerData) ContainerDataYaml
.readContainerFile(containerFile);
assertEquals(2, keyValueContainerData.getMetadata().size());
}
@Test
public void testUpdateContainerUnsupportedRequest() throws Exception {
try {
closeContainer();
keyValueContainer = new KeyValueContainer(keyValueContainerData, CONF);
keyValueContainer.create(volumeSet, volumeChoosingPolicy, scmId);
Map<String, String> metadata = new HashMap<>();
metadata.put(OzoneConsts.VOLUME, OzoneConsts.OZONE);
keyValueContainer.update(metadata, false);
fail("testUpdateContainerUnsupportedRequest failed");
} catch (StorageContainerException ex) {
GenericTestUtils.assertExceptionContains("Updating a closed container " +
"without force option is not allowed", ex);
assertEquals(ContainerProtos.Result.UNSUPPORTED_REQUEST, ex
.getResult());
}
}
@Test
public void testContainersShareColumnFamilyOptions() throws Exception {
// Get a read only view (not a copy) of the options cache.
Map<ConfigurationSource, ColumnFamilyOptions> cachedOptions =
AbstractDatanodeStore.getColumnFamilyOptionsCache();
// Create Container 1
keyValueContainer.create(volumeSet, volumeChoosingPolicy, scmId);
Assert.assertEquals(1, cachedOptions.size());
ColumnFamilyOptions options1 = cachedOptions.get(CONF);
Assert.assertNotNull(options1);
// Create Container 2
keyValueContainerData = new KeyValueContainerData(2L,
layout,
(long) StorageUnit.GB.toBytes(5), UUID.randomUUID().toString(),
datanodeId.toString());
keyValueContainer = new KeyValueContainer(keyValueContainerData, CONF);
keyValueContainer.create(volumeSet, volumeChoosingPolicy, scmId);
Assert.assertEquals(1, cachedOptions.size());
ColumnFamilyOptions options2 = cachedOptions.get(CONF);
Assert.assertNotNull(options2);
// Column family options object should be reused.
Assert.assertSame(options1, options2);
}
}