blob: e5c18d5e4f933e16c62712b8ded3eed2d695cc08 [file] [log] [blame]
/**
* Licensed to the Apache Software Foundation (ASF) under one or more
* contributor license agreements. See the NOTICE file distributed with this
* work for additional information regarding copyright ownership. The ASF
* licenses this file to you under the Apache License, Version 2.0 (the
* "License"); you may not use this file except in compliance with the License.
* You may obtain a copy of the License at
* <p>
* http://www.apache.org/licenses/LICENSE-2.0
* <p>
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS, WITHOUT
* WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. See the
* License for the specific language governing permissions and limitations under
* the License.
*/
package org.apache.hadoop.ozone.client.rpc;
import java.io.IOException;
import static java.nio.charset.StandardCharsets.UTF_8;
import java.time.Duration;
import java.util.ArrayList;
import java.util.Collections;
import java.util.List;
import java.util.UUID;
import java.util.concurrent.TimeUnit;
import org.apache.hadoop.hdds.HddsUtils;
import org.apache.hadoop.hdds.client.BlockID;
import org.apache.hadoop.hdds.client.RatisReplicationConfig;
import org.apache.hadoop.hdds.client.ReplicationType;
import org.apache.hadoop.hdds.conf.DatanodeRatisServerConfig;
import org.apache.hadoop.hdds.conf.OzoneConfiguration;
import org.apache.hadoop.hdds.protocol.DatanodeDetails;
import org.apache.hadoop.hdds.ratis.conf.RatisClientConfig;
import org.apache.hadoop.hdds.scm.OzoneClientConfig;
import org.apache.hadoop.hdds.scm.ScmConfigKeys;
import org.apache.hadoop.hdds.scm.container.ContainerID;
import org.apache.hadoop.hdds.scm.container.ContainerInfo;
import org.apache.hadoop.hdds.scm.pipeline.Pipeline;
import org.apache.hadoop.net.DNSToSwitchMapping;
import org.apache.hadoop.net.NetUtils;
import org.apache.hadoop.net.StaticMapping;
import org.apache.hadoop.ozone.MiniOzoneCluster;
import org.apache.hadoop.ozone.OzoneConfigKeys;
import org.apache.hadoop.ozone.OzoneConsts;
import org.apache.hadoop.ozone.client.ObjectStore;
import org.apache.hadoop.ozone.client.OzoneClient;
import org.apache.hadoop.ozone.client.OzoneClientFactory;
import org.apache.hadoop.ozone.client.io.BlockOutputStreamEntry;
import org.apache.hadoop.ozone.client.io.KeyOutputStream;
import org.apache.hadoop.ozone.client.io.OzoneOutputStream;
import org.apache.hadoop.ozone.container.ContainerTestHelper;
import org.apache.hadoop.ozone.container.TestHelper;
import org.apache.hadoop.ozone.container.common.helpers.BlockData;
import org.apache.hadoop.ozone.container.common.interfaces.DBHandle;
import org.apache.hadoop.ozone.container.keyvalue.KeyValueContainer;
import org.apache.hadoop.ozone.container.keyvalue.KeyValueContainerData;
import org.apache.hadoop.ozone.container.keyvalue.helpers.BlockUtils;
import org.apache.hadoop.ozone.om.helpers.OmKeyArgs;
import org.apache.hadoop.ozone.om.helpers.OmKeyInfo;
import org.apache.hadoop.ozone.om.helpers.OmKeyLocationInfo;
import static org.apache.hadoop.fs.CommonConfigurationKeysPublic.NET_TOPOLOGY_NODE_SWITCH_MAPPING_IMPL_KEY;
import static org.apache.hadoop.hdds.protocol.proto.HddsProtos.ReplicationFactor.THREE;
import static org.apache.hadoop.hdds.scm.ScmConfigKeys.OZONE_SCM_STALENODE_INTERVAL;
import org.apache.ozone.test.tag.Flaky;
import org.junit.Assert;
import org.junit.jupiter.api.AfterEach;
import org.junit.jupiter.api.Test;
import org.junit.jupiter.api.Timeout;
/**
* Tests Exception handling by Ozone Client.
*/
@Timeout(300)
public class TestFailureHandlingByClient {
private MiniOzoneCluster cluster;
private OzoneConfiguration conf;
private OzoneClient client;
private ObjectStore objectStore;
private int chunkSize;
private int blockSize;
private String volumeName;
private String bucketName;
private String keyString;
/**
* Create a MiniDFSCluster for testing.
* <p>
* Ozone is made active by setting OZONE_ENABLED = true
*
* @throws IOException
*/
private void init() throws Exception {
conf = new OzoneConfiguration();
chunkSize = (int) OzoneConsts.MB;
blockSize = 4 * chunkSize;
conf.setTimeDuration(OZONE_SCM_STALENODE_INTERVAL, 100, TimeUnit.SECONDS);
RatisClientConfig ratisClientConfig =
conf.getObject(RatisClientConfig.class);
ratisClientConfig.setWriteRequestTimeout(Duration.ofSeconds(30));
ratisClientConfig.setWatchRequestTimeout(Duration.ofSeconds(30));
conf.setFromObject(ratisClientConfig);
conf.setTimeDuration(
OzoneConfigKeys.DFS_RATIS_LEADER_ELECTION_MINIMUM_TIMEOUT_DURATION_KEY,
1, TimeUnit.SECONDS);
conf.setBoolean(
OzoneConfigKeys.OZONE_NETWORK_TOPOLOGY_AWARE_READ_KEY, true);
conf.setInt(ScmConfigKeys.OZONE_DATANODE_PIPELINE_LIMIT, 2);
DatanodeRatisServerConfig ratisServerConfig =
conf.getObject(DatanodeRatisServerConfig.class);
ratisServerConfig.setRequestTimeOut(Duration.ofSeconds(3));
ratisServerConfig.setWatchTimeOut(Duration.ofSeconds(3));
conf.setFromObject(ratisServerConfig);
RatisClientConfig.RaftConfig raftClientConfig =
conf.getObject(RatisClientConfig.RaftConfig.class);
raftClientConfig.setRpcRequestTimeout(Duration.ofSeconds(3));
raftClientConfig.setRpcWatchRequestTimeout(Duration.ofSeconds(3));
conf.setFromObject(raftClientConfig);
OzoneClientConfig clientConfig = conf.getObject(OzoneClientConfig.class);
clientConfig.setStreamBufferFlushDelay(false);
conf.setFromObject(clientConfig);
conf.setQuietMode(false);
conf.setClass(NET_TOPOLOGY_NODE_SWITCH_MAPPING_IMPL_KEY,
StaticMapping.class, DNSToSwitchMapping.class);
StaticMapping.addNodeToRack(NetUtils.normalizeHostNames(
Collections.singleton(HddsUtils.getHostName(conf))).get(0),
"/rack1");
cluster = MiniOzoneCluster.newBuilder(conf)
.setNumDatanodes(10).setTotalPipelineNumLimit(15).build();
cluster.waitForClusterToBeReady();
//the easiest way to create an open container is creating a key
client = OzoneClientFactory.getRpcClient(conf);
objectStore = client.getObjectStore();
keyString = UUID.randomUUID().toString();
volumeName = "datanodefailurehandlingtest";
bucketName = volumeName;
objectStore.createVolume(volumeName);
objectStore.getVolume(volumeName).createBucket(bucketName);
}
private void startCluster() throws Exception {
init();
}
/**
* Shutdown MiniDFSCluster.
*/
@AfterEach
public void shutdown() {
if (cluster != null) {
cluster.shutdown();
}
}
@Test
public void testBlockWritesWithDnFailures() throws Exception {
startCluster();
String keyName = UUID.randomUUID().toString();
OzoneOutputStream key = createKey(keyName, ReplicationType.RATIS, 0);
byte[] data = ContainerTestHelper.getFixedLengthString(
keyString, 2 * chunkSize + chunkSize / 2).getBytes(UTF_8);
key.write(data);
// get the name of a valid container
Assert.assertTrue(key.getOutputStream() instanceof KeyOutputStream);
KeyOutputStream groupOutputStream =
(KeyOutputStream) key.getOutputStream();
List<OmKeyLocationInfo> locationInfoList =
groupOutputStream.getLocationInfoList();
Assert.assertTrue(locationInfoList.size() == 1);
long containerId = locationInfoList.get(0).getContainerID();
ContainerInfo container = cluster.getStorageContainerManager()
.getContainerManager()
.getContainer(ContainerID.valueOf(containerId));
Pipeline pipeline =
cluster.getStorageContainerManager().getPipelineManager()
.getPipeline(container.getPipelineID());
List<DatanodeDetails> datanodes = pipeline.getNodes();
cluster.shutdownHddsDatanode(datanodes.get(0));
cluster.shutdownHddsDatanode(datanodes.get(1));
// The write will fail but exception will be handled and length will be
// updated correctly in OzoneManager once the steam is closed
key.close();
//get the name of a valid container
OmKeyArgs keyArgs = new OmKeyArgs.Builder().setVolumeName(volumeName)
.setBucketName(bucketName)
.setReplicationConfig(RatisReplicationConfig.getInstance(THREE))
.setKeyName(keyName)
.setRefreshPipeline(true)
.build();
OmKeyInfo keyInfo = cluster.getOzoneManager().lookupKey(keyArgs);
Assert.assertEquals(data.length, keyInfo.getDataSize());
validateData(keyName, data);
// Verify that the block information is updated correctly in the DB on
// failures
testBlockCountOnFailures(keyInfo);
}
/**
* Test whether blockData and Container metadata (block count and used
* bytes) is updated correctly when there is a write failure.
* We can combine this test with {@link #testBlockWritesWithDnFailures()}
* as that test also simulates a write failure and client writes failed
* chunk writes to a new block.
*/
private void testBlockCountOnFailures(OmKeyInfo omKeyInfo) throws Exception {
// testBlockWritesWithDnFailures writes chunkSize*2.5 size of data into
// KeyOutputStream. But before closing the outputStream, 2 of the DNs in
// the pipeline being written to are closed. So the key will be written
// to 2 blocks as atleast the last 0.5 chunk would not be committed to the
// first block before the stream is closed.
/**
* There are 3 possible scenarios:
* 1. Block1 has 2 chunks and OMKeyInfo also has 2 chunks against this block
* => Block2 should have 1 chunk
* (2 chunks were written to Block1, committed and acknowledged by
* CommitWatcher)
* 2. Block1 has 1 chunk and OMKeyInfo has 1 chunk against this block
* => Block2 should have 2 chunks
* (Possibly 2 chunks were written but only 1 was committed to the
* block)
* 3. Block1 has 2 chunks but OMKeyInfo has only 1 chunk against this block
* => Block2 should have 2 chunks
* (This happens when the 2nd chunk has been committed to Block1 but
* not acknowledged by CommitWatcher before pipeline shutdown)
*/
// Get information about the first and second block (in different pipelines)
List<OmKeyLocationInfo> locationList = omKeyInfo.getLatestVersionLocations()
.getLocationList();
long containerId1 = locationList.get(0).getContainerID();
List<DatanodeDetails> block1DNs = locationList.get(0).getPipeline()
.getNodes();
long containerId2 = locationList.get(1).getContainerID();
List<DatanodeDetails> block2DNs = locationList.get(1).getPipeline()
.getNodes();
int block2ExpectedChunkCount;
if (locationList.get(0).getLength() == 2 * chunkSize) {
// Scenario 1
block2ExpectedChunkCount = 1;
} else {
// Scenario 2
block2ExpectedChunkCount = 2;
}
// For the first block, first 2 DNs in the pipeline are shutdown (to
// simulate a failure). It should have 1 or 2 chunks (depending on
// whether the DN CommitWatcher successfully acknowledged the 2nd chunk
// write or not). The 3rd chunk would not exist on the first pipeline as
// the pipeline would be closed before the last 0.5 chunk was committed
// to the block.
KeyValueContainerData containerData1 =
((KeyValueContainer) cluster.getHddsDatanode(block1DNs.get(2))
.getDatanodeStateMachine().getContainer().getContainerSet()
.getContainer(containerId1)).getContainerData();
try (DBHandle containerDb1 = BlockUtils.getDB(containerData1, conf)) {
BlockData blockData1 = containerDb1.getStore().getBlockDataTable().get(
containerData1.blockKey(locationList.get(0).getBlockID()
.getLocalID()));
// The first Block could have 1 or 2 chunkSize of data
int block1NumChunks = blockData1.getChunks().size();
Assert.assertTrue(block1NumChunks >= 1);
Assert.assertEquals(chunkSize * block1NumChunks, blockData1.getSize());
Assert.assertEquals(1, containerData1.getBlockCount());
Assert.assertEquals(chunkSize * block1NumChunks,
containerData1.getBytesUsed());
}
// Verify that the second block has the remaining 0.5*chunkSize of data
KeyValueContainerData containerData2 =
((KeyValueContainer) cluster.getHddsDatanode(block2DNs.get(0))
.getDatanodeStateMachine().getContainer().getContainerSet()
.getContainer(containerId2)).getContainerData();
try (DBHandle containerDb2 = BlockUtils.getDB(containerData2, conf)) {
BlockData blockData2 = containerDb2.getStore().getBlockDataTable().get(
containerData2.blockKey(locationList.get(1).getBlockID()
.getLocalID()));
// The second Block should have 0.5 chunkSize of data
Assert.assertEquals(block2ExpectedChunkCount,
blockData2.getChunks().size());
Assert.assertEquals(1, containerData2.getBlockCount());
int expectedBlockSize;
if (block2ExpectedChunkCount == 1) {
expectedBlockSize = chunkSize / 2;
} else {
expectedBlockSize = chunkSize + chunkSize / 2;
}
Assert.assertEquals(expectedBlockSize, blockData2.getSize());
Assert.assertEquals(expectedBlockSize, containerData2.getBytesUsed());
}
}
@Test
public void testWriteSmallFile() throws Exception {
startCluster();
String keyName = UUID.randomUUID().toString();
OzoneOutputStream key =
createKey(keyName, ReplicationType.RATIS, 0);
String data = ContainerTestHelper
.getFixedLengthString(keyString, chunkSize / 2);
key.write(data.getBytes(UTF_8));
// get the name of a valid container
Assert.assertTrue(key.getOutputStream() instanceof KeyOutputStream);
KeyOutputStream keyOutputStream =
(KeyOutputStream) key.getOutputStream();
List<OmKeyLocationInfo> locationInfoList =
keyOutputStream.getLocationInfoList();
long containerId = locationInfoList.get(0).getContainerID();
BlockID blockId = locationInfoList.get(0).getBlockID();
ContainerInfo container =
cluster.getStorageContainerManager().getContainerManager()
.getContainer(ContainerID.valueOf(containerId));
Pipeline pipeline =
cluster.getStorageContainerManager().getPipelineManager()
.getPipeline(container.getPipelineID());
List<DatanodeDetails> datanodes = pipeline.getNodes();
cluster.shutdownHddsDatanode(datanodes.get(0));
cluster.shutdownHddsDatanode(datanodes.get(1));
key.close();
// this will throw AlreadyClosedException and and current stream
// will be discarded and write a new block
OmKeyArgs keyArgs = new OmKeyArgs.Builder().setVolumeName(volumeName)
.setBucketName(bucketName)
.setReplicationConfig(RatisReplicationConfig.getInstance(THREE))
.setKeyName(keyName)
.setRefreshPipeline(true)
.build();
OmKeyInfo keyInfo = cluster.getOzoneManager().lookupKey(keyArgs);
// Make sure a new block is written
Assert.assertNotEquals(
keyInfo.getLatestVersionLocations().getBlocksLatestVersionOnly().get(0)
.getBlockID(), blockId);
Assert.assertEquals(data.getBytes(UTF_8).length,
keyInfo.getDataSize());
validateData(keyName, data.getBytes(UTF_8));
}
@Test
public void testContainerExclusionWithClosedContainerException()
throws Exception {
startCluster();
String keyName = UUID.randomUUID().toString();
OzoneOutputStream key =
createKey(keyName, ReplicationType.RATIS, blockSize);
String data = ContainerTestHelper
.getFixedLengthString(keyString, chunkSize);
// get the name of a valid container
Assert.assertTrue(key.getOutputStream() instanceof KeyOutputStream);
KeyOutputStream keyOutputStream =
(KeyOutputStream) key.getOutputStream();
List<BlockOutputStreamEntry> streamEntryList =
keyOutputStream.getStreamEntries();
// Assert that 1 block will be preallocated
Assert.assertEquals(1, streamEntryList.size());
key.write(data.getBytes(UTF_8));
key.flush();
long containerId = streamEntryList.get(0).getBlockID().getContainerID();
BlockID blockId = streamEntryList.get(0).getBlockID();
List<Long> containerIdList = new ArrayList<>();
containerIdList.add(containerId);
// below check will assert if the container does not get closed
TestHelper
.waitForContainerClose(cluster, containerIdList.toArray(new Long[0]));
// This write will hit ClosedContainerException and this container should
// will be added in the excludelist
key.write(data.getBytes(UTF_8));
key.flush();
Assert.assertTrue(keyOutputStream.getExcludeList().getContainerIds()
.contains(ContainerID.valueOf(containerId)));
Assert.assertTrue(
keyOutputStream.getExcludeList().getDatanodes().isEmpty());
Assert.assertTrue(
keyOutputStream.getExcludeList().getPipelineIds().isEmpty());
// The close will just write to the buffer
key.close();
OmKeyArgs keyArgs = new OmKeyArgs.Builder().setVolumeName(volumeName)
.setBucketName(bucketName)
.setReplicationConfig(RatisReplicationConfig.getInstance(THREE))
.setKeyName(keyName)
.setRefreshPipeline(true)
.build();
OmKeyInfo keyInfo = cluster.getOzoneManager().lookupKey(keyArgs);
// Make sure a new block is written
Assert.assertNotEquals(
keyInfo.getLatestVersionLocations().getBlocksLatestVersionOnly().get(0)
.getBlockID(), blockId);
Assert.assertEquals(2 * data.getBytes(UTF_8).length,
keyInfo.getDataSize());
validateData(keyName, data.concat(data).getBytes(UTF_8));
}
@Test
@Flaky("HDDS-3298")
public void testDatanodeExclusionWithMajorityCommit() throws Exception {
startCluster();
String keyName = UUID.randomUUID().toString();
OzoneOutputStream key =
createKey(keyName, ReplicationType.RATIS, blockSize);
String data = ContainerTestHelper
.getFixedLengthString(keyString, chunkSize);
// get the name of a valid container
Assert.assertTrue(key.getOutputStream() instanceof KeyOutputStream);
KeyOutputStream keyOutputStream =
(KeyOutputStream) key.getOutputStream();
List<BlockOutputStreamEntry> streamEntryList =
keyOutputStream.getStreamEntries();
// Assert that 1 block will be preallocated
Assert.assertEquals(1, streamEntryList.size());
key.write(data.getBytes(UTF_8));
key.flush();
long containerId = streamEntryList.get(0).getBlockID().getContainerID();
BlockID blockId = streamEntryList.get(0).getBlockID();
ContainerInfo container =
cluster.getStorageContainerManager().getContainerManager()
.getContainer(ContainerID.valueOf(containerId));
Pipeline pipeline =
cluster.getStorageContainerManager().getPipelineManager()
.getPipeline(container.getPipelineID());
List<DatanodeDetails> datanodes = pipeline.getNodes();
// shutdown 1 datanode. This will make sure the 2 way commit happens for
// next write ops.
cluster.shutdownHddsDatanode(datanodes.get(0));
key.write(data.getBytes(UTF_8));
key.write(data.getBytes(UTF_8));
key.flush();
Assert.assertTrue(keyOutputStream.getExcludeList().getDatanodes()
.contains(datanodes.get(0)));
Assert.assertTrue(
keyOutputStream.getExcludeList().getContainerIds().isEmpty());
Assert.assertTrue(
keyOutputStream.getExcludeList().getPipelineIds().isEmpty());
// The close will just write to the buffer
key.close();
OmKeyArgs keyArgs = new OmKeyArgs.Builder().setVolumeName(volumeName)
.setBucketName(bucketName)
.setReplicationConfig(RatisReplicationConfig.getInstance(THREE))
.setKeyName(keyName)
.setRefreshPipeline(true)
.build();
OmKeyInfo keyInfo = cluster.getOzoneManager().lookupKey(keyArgs);
// Make sure a new block is written
Assert.assertNotEquals(
keyInfo.getLatestVersionLocations().getBlocksLatestVersionOnly().get(0)
.getBlockID(), blockId);
Assert.assertEquals(3 * data.getBytes(UTF_8).length, keyInfo.getDataSize());
validateData(keyName, data.concat(data).concat(data).getBytes(UTF_8));
}
@Test
public void testPipelineExclusionWithPipelineFailure() throws Exception {
startCluster();
String keyName = UUID.randomUUID().toString();
OzoneOutputStream key =
createKey(keyName, ReplicationType.RATIS, blockSize);
String data = ContainerTestHelper
.getFixedLengthString(keyString, chunkSize);
// get the name of a valid container
Assert.assertTrue(key.getOutputStream() instanceof KeyOutputStream);
KeyOutputStream keyOutputStream =
(KeyOutputStream) key.getOutputStream();
List<BlockOutputStreamEntry> streamEntryList =
keyOutputStream.getStreamEntries();
// Assert that 1 block will be preallocated
Assert.assertEquals(1, streamEntryList.size());
key.write(data.getBytes(UTF_8));
key.flush();
long containerId = streamEntryList.get(0).getBlockID().getContainerID();
BlockID blockId = streamEntryList.get(0).getBlockID();
ContainerInfo container =
cluster.getStorageContainerManager().getContainerManager()
.getContainer(ContainerID.valueOf(containerId));
Pipeline pipeline =
cluster.getStorageContainerManager().getPipelineManager()
.getPipeline(container.getPipelineID());
List<DatanodeDetails> datanodes = pipeline.getNodes();
// Two nodes, next write will hit AlreadyClosedException , the pipeline
// will be added in the exclude list
cluster.shutdownHddsDatanode(datanodes.get(0));
cluster.shutdownHddsDatanode(datanodes.get(1));
key.write(data.getBytes(UTF_8));
key.write(data.getBytes(UTF_8));
key.flush();
Assert.assertTrue(keyOutputStream.getExcludeList().getPipelineIds()
.contains(pipeline.getId()));
Assert.assertTrue(
keyOutputStream.getExcludeList().getContainerIds().isEmpty());
Assert.assertTrue(
keyOutputStream.getExcludeList().getDatanodes().isEmpty());
// The close will just write to the buffer
key.close();
OmKeyArgs keyArgs = new OmKeyArgs.Builder().setVolumeName(volumeName)
.setBucketName(bucketName)
.setReplicationConfig(RatisReplicationConfig.getInstance(THREE))
.setKeyName(keyName)
.setRefreshPipeline(true)
.build();
OmKeyInfo keyInfo = cluster.getOzoneManager().lookupKey(keyArgs);
// Make sure a new block is written
Assert.assertNotEquals(
keyInfo.getLatestVersionLocations().getBlocksLatestVersionOnly().get(0)
.getBlockID(), blockId);
Assert.assertEquals(3 * data.getBytes(UTF_8).length, keyInfo.getDataSize());
validateData(keyName, data.concat(data).concat(data).getBytes(UTF_8));
}
private OzoneOutputStream createKey(String keyName, ReplicationType type,
long size) throws Exception {
return TestHelper
.createKey(keyName, type, size, objectStore, volumeName, bucketName);
}
private void validateData(String keyName, byte[] data) throws Exception {
TestHelper
.validateData(keyName, data, objectStore, volumeName, bucketName);
}
}