blob: 044ac9100168e31ccf2b486eeadb637de4d6bedf [file] [log] [blame]
/**
* Licensed to the Apache Software Foundation (ASF) under one or more
* contributor license agreements. See the NOTICE file distributed with this
* work for additional information regarding copyright ownership. The ASF
* licenses this file to you under the Apache License, Version 2.0 (the
* "License"); you may not use this file except in compliance with the License.
* You may obtain a copy of the License at
* <p>
* http://www.apache.org/licenses/LICENSE-2.0
* <p>
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS, WITHOUT
* WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. See the
* License for the specific language governing permissions and limitations under
* the License.
*/
package org.apache.hadoop.ozone.client.rpc;
import java.io.File;
import java.io.IOException;
import java.time.Duration;
import java.util.HashMap;
import java.util.List;
import java.util.concurrent.TimeUnit;
import org.apache.hadoop.hdds.client.BlockID;
import org.apache.hadoop.hdds.client.ReplicationFactor;
import org.apache.hadoop.hdds.client.ReplicationType;
import org.apache.hadoop.hdds.conf.DatanodeRatisServerConfig;
import org.apache.hadoop.hdds.conf.OzoneConfiguration;
import org.apache.hadoop.hdds.protocol.datanode.proto.ContainerProtos;
import org.apache.hadoop.hdds.protocol.proto.HddsProtos;
import org.apache.hadoop.hdds.ratis.conf.RatisClientConfig;
import org.apache.hadoop.hdds.scm.ScmConfig;
import org.apache.hadoop.hdds.scm.ScmConfigKeys;
import org.apache.hadoop.hdds.scm.XceiverClientManager;
import org.apache.hadoop.hdds.scm.XceiverClientSpi;
import org.apache.hadoop.hdds.scm.container.common.helpers.StorageContainerException;
import org.apache.hadoop.hdds.scm.pipeline.Pipeline;
import org.apache.hadoop.ozone.HddsDatanodeService;
import org.apache.hadoop.ozone.MiniOzoneCluster;
import org.apache.hadoop.ozone.OzoneConfigKeys;
import org.apache.hadoop.ozone.client.ObjectStore;
import org.apache.hadoop.ozone.client.OzoneClient;
import org.apache.hadoop.ozone.client.OzoneClientFactory;
import org.apache.hadoop.ozone.client.io.KeyOutputStream;
import org.apache.hadoop.ozone.client.io.OzoneOutputStream;
import org.apache.hadoop.ozone.container.ContainerTestHelper;
import org.apache.hadoop.ozone.container.common.helpers.BlockData;
import org.apache.hadoop.ozone.container.common.helpers.ChunkInfo;
import org.apache.hadoop.ozone.container.common.interfaces.Container;
import org.apache.hadoop.ozone.container.common.statemachine.DatanodeConfiguration;
import org.apache.hadoop.ozone.container.common.statemachine.DatanodeStateMachine;
import org.apache.hadoop.ozone.container.common.transport.server.ratis.ContainerStateMachine;
import org.apache.hadoop.ozone.container.keyvalue.KeyValueContainerData;
import org.apache.hadoop.ozone.container.keyvalue.KeyValueHandler;
import org.apache.hadoop.ozone.container.ozoneimpl.OzoneContainer;
import org.apache.hadoop.ozone.om.helpers.OmKeyArgs;
import org.apache.hadoop.ozone.om.helpers.OmKeyInfo;
import org.apache.hadoop.ozone.om.helpers.OmKeyLocationInfo;
import org.apache.hadoop.test.GenericTestUtils;
import static org.apache.hadoop.hdds.HddsConfigKeys.HDDS_COMMAND_STATUS_REPORT_INTERVAL;
import static org.apache.hadoop.hdds.HddsConfigKeys.HDDS_CONTAINER_REPORT_INTERVAL;
import static org.apache.hadoop.hdds.scm.ScmConfigKeys.HDDS_SCM_WATCHER_TIMEOUT;
import static org.apache.hadoop.hdds.scm.ScmConfigKeys.OZONE_SCM_PIPELINE_DESTROY_TIMEOUT;
import static org.apache.hadoop.hdds.scm.ScmConfigKeys.OZONE_SCM_STALENODE_INTERVAL;
import org.junit.AfterClass;
import org.junit.Assert;
import org.junit.BeforeClass;
import org.junit.Test;
/**
* Tests delete key operation with a slow follower in the datanode
* pipeline.
*/
public class TestDeleteWithSlowFollower {
private static MiniOzoneCluster cluster;
private static OzoneConfiguration conf;
private static OzoneClient client;
private static ObjectStore objectStore;
private static String volumeName;
private static String bucketName;
private static String path;
private static XceiverClientManager xceiverClientManager;
private static final int FACTOR_THREE_PIPELINE_COUNT = 1;
/**
* Create a MiniDFSCluster for testing.
*
* @throws IOException
*/
@BeforeClass
public static void init() throws Exception {
conf = new OzoneConfiguration();
path = GenericTestUtils
.getTempPath(TestContainerStateMachineFailures.class.getSimpleName());
File baseDir = new File(path);
baseDir.mkdirs();
conf.setTimeDuration(HDDS_CONTAINER_REPORT_INTERVAL, 200,
TimeUnit.MILLISECONDS);
conf.setInt(ScmConfigKeys.OZONE_DATANODE_PIPELINE_LIMIT, 1);
// Make the stale, dead and server failure timeout higher so that a dead
// node is not detecte at SCM as well as the pipeline close action
// never gets initiated early at Datanode in the test.
conf.setTimeDuration(HDDS_COMMAND_STATUS_REPORT_INTERVAL, 200,
TimeUnit.MILLISECONDS);
conf.setTimeDuration(HDDS_SCM_WATCHER_TIMEOUT, 1000,
TimeUnit.MILLISECONDS);
conf.setTimeDuration(OZONE_SCM_STALENODE_INTERVAL, 1000,
TimeUnit.SECONDS);
conf.setTimeDuration(ScmConfigKeys.OZONE_SCM_DEADNODE_INTERVAL, 2000,
TimeUnit.SECONDS);
conf.setTimeDuration(OZONE_SCM_PIPELINE_DESTROY_TIMEOUT, 1000,
TimeUnit.SECONDS);
DatanodeRatisServerConfig ratisServerConfig =
conf.getObject(DatanodeRatisServerConfig.class);
ratisServerConfig.setFollowerSlownessTimeout(Duration.ofSeconds(1000));
ratisServerConfig.setNoLeaderTimeout(Duration.ofSeconds(1000));
ratisServerConfig.setRequestTimeOut(Duration.ofSeconds(3));
ratisServerConfig.setWatchTimeOut(Duration.ofSeconds(3));
conf.setFromObject(ratisServerConfig);
RatisClientConfig.RaftConfig raftClientConfig =
conf.getObject(RatisClientConfig.RaftConfig.class);
raftClientConfig.setRpcRequestTimeout(Duration.ofSeconds(3));
raftClientConfig.setRpcWatchRequestTimeout(Duration.ofSeconds(10));
conf.setFromObject(raftClientConfig);
conf.setTimeDuration(OzoneConfigKeys.OZONE_BLOCK_DELETING_SERVICE_INTERVAL,
1, TimeUnit.SECONDS);
ScmConfig scmConfig = conf.getObject(ScmConfig.class);
scmConfig.setBlockDeletionInterval(Duration.ofSeconds(1));
conf.setFromObject(scmConfig);
DatanodeConfiguration datanodeConfiguration = conf.getObject(
DatanodeConfiguration.class);
datanodeConfiguration.setBlockDeletionInterval(Duration.ofMillis(100));
conf.setFromObject(datanodeConfiguration);
RatisClientConfig ratisClientConfig =
conf.getObject(RatisClientConfig.class);
ratisClientConfig.setWriteRequestTimeout(Duration.ofSeconds(30));
ratisClientConfig.setWatchRequestTimeout(Duration.ofSeconds(30));
conf.setFromObject(ratisClientConfig);
conf.setQuietMode(false);
int numOfDatanodes = 3;
cluster = MiniOzoneCluster.newBuilder(conf)
.setNumDatanodes(numOfDatanodes)
.setTotalPipelineNumLimit(
numOfDatanodes + FACTOR_THREE_PIPELINE_COUNT)
.setHbInterval(100)
.build();
cluster.waitForClusterToBeReady();
cluster.waitForPipelineTobeReady(HddsProtos.ReplicationFactor.THREE, 60000);
//the easiest way to create an open container is creating a key
client = OzoneClientFactory.getRpcClient(conf);
objectStore = client.getObjectStore();
xceiverClientManager = new XceiverClientManager(conf);
volumeName = "testcontainerstatemachinefailures";
bucketName = volumeName;
objectStore.createVolume(volumeName);
objectStore.getVolume(volumeName).createBucket(bucketName);
}
/**
* Shutdown MiniDFSCluster.
*/
@AfterClass
public static void shutdown() {
if (cluster != null) {
cluster.shutdown();
}
}
/**
* The test simulates a slow follower by first writing key thereby creating a
* a container on 3 dns of the cluster. Then, a dn is shutdown and a close
* container cmd gets issued so that in the leader and the alive follower,
* container gets closed. And then, key is deleted and
* the node is started up again so that it
* rejoins the ring and starts applying the transaction from where it left
* by fetching the entries from the leader. Until and unless this follower
* catches up and its replica gets closed,
* the data is not deleted from any of the nodes which have the
* closed replica.
*/
@Test
public void testDeleteKeyWithSlowFollower() throws Exception {
String keyName = "ratis";
OzoneOutputStream key =
objectStore.getVolume(volumeName).getBucket(bucketName)
.createKey(keyName, 0, ReplicationType.RATIS,
ReplicationFactor.THREE, new HashMap<>());
byte[] testData = "ratis".getBytes();
// First write and flush creates a container in the datanode
key.write(testData);
key.flush();
KeyOutputStream groupOutputStream = (KeyOutputStream) key.getOutputStream();
List<OmKeyLocationInfo> locationInfoList =
groupOutputStream.getLocationInfoList();
Assert.assertEquals(1, locationInfoList.size());
OmKeyLocationInfo omKeyLocationInfo = locationInfoList.get(0);
long containerID = omKeyLocationInfo.getContainerID();
// A container is created on the datanode. Now figure out a follower node to
// kill/slow down.
HddsDatanodeService follower = null;
HddsDatanodeService leader = null;
List<Pipeline> pipelineList =
cluster.getStorageContainerManager().getPipelineManager()
.getPipelines(HddsProtos.ReplicationType.RATIS,
HddsProtos.ReplicationFactor.THREE);
Assert.assertTrue(pipelineList.size() >= FACTOR_THREE_PIPELINE_COUNT);
Pipeline pipeline = pipelineList.get(0);
for (HddsDatanodeService dn : cluster.getHddsDatanodes()) {
if (ContainerTestHelper.isRatisFollower(dn, pipeline)) {
follower = dn;
} else if (ContainerTestHelper.isRatisLeader(dn, pipeline)) {
leader = dn;
}
}
Assert.assertNotNull(follower);
Assert.assertNotNull(leader);
//ensure that the chosen follower is still a follower
Assert.assertTrue(ContainerTestHelper.isRatisFollower(follower, pipeline));
// shutdown the follower node
cluster.shutdownHddsDatanode(follower.getDatanodeDetails());
key.write(testData);
key.close();
// now move the container to the closed on the datanode.
XceiverClientSpi xceiverClient =
xceiverClientManager.acquireClient(pipeline);
ContainerProtos.ContainerCommandRequestProto.Builder request =
ContainerProtos.ContainerCommandRequestProto.newBuilder();
request.setDatanodeUuid(pipeline.getFirstNode().getUuidString());
request.setCmdType(ContainerProtos.Type.CloseContainer);
request.setContainerID(containerID);
request.setCloseContainer(
ContainerProtos.CloseContainerRequestProto.getDefaultInstance());
xceiverClient.sendCommand(request.build());
ContainerStateMachine stateMachine =
(ContainerStateMachine) ContainerTestHelper
.getStateMachine(leader, pipeline);
OmKeyArgs keyArgs = new OmKeyArgs.Builder().setVolumeName(volumeName).
setBucketName(bucketName).setType(HddsProtos.ReplicationType.RATIS)
.setFactor(HddsProtos.ReplicationFactor.THREE).setKeyName(keyName)
.build();
OmKeyInfo info = cluster.getOzoneManager().lookupKey(keyArgs);
BlockID blockID =
info.getKeyLocationVersions().get(0).getLocationList().get(0)
.getBlockID();
OzoneContainer ozoneContainer;
final DatanodeStateMachine dnStateMachine =
leader.getDatanodeStateMachine();
ozoneContainer = dnStateMachine.getContainer();
KeyValueHandler keyValueHandler =
(KeyValueHandler) ozoneContainer.getDispatcher()
.getHandler(ContainerProtos.ContainerType.KeyValueContainer);
Container container =
ozoneContainer.getContainerSet().getContainer(blockID.getContainerID());
KeyValueContainerData containerData =
((KeyValueContainerData) container.getContainerData());
long delTrxId = containerData.getDeleteTransactionId();
long numPendingDeletionBlocks = containerData.getNumPendingDeletionBlocks();
BlockData blockData =
keyValueHandler.getBlockManager().getBlock(container, blockID);
//cluster.getOzoneManager().deleteKey(keyArgs);
client.getObjectStore().getVolume(volumeName).getBucket(bucketName).
deleteKey("ratis");
GenericTestUtils.waitFor(() -> {
return
dnStateMachine.getCommandDispatcher().getDeleteBlocksCommandHandler()
.getInvocationCount() >= 1;
}, 500, 100000);
Assert.assertTrue(containerData.getDeleteTransactionId() > delTrxId);
Assert.assertTrue(
containerData.getNumPendingDeletionBlocks() > numPendingDeletionBlocks);
// make sure the chunk was never deleted on the leader even though
// deleteBlock handler is invoked
try {
for (ContainerProtos.ChunkInfo chunkInfo : blockData.getChunks()) {
keyValueHandler.getChunkManager()
.readChunk(container, blockID, ChunkInfo.getFromProtoBuf(chunkInfo),
null);
}
} catch (IOException ioe) {
Assert.fail("Exception should not be thrown.");
}
long numReadStateMachineOps =
stateMachine.getMetrics().getNumReadStateMachineOps();
Assert.assertTrue(
stateMachine.getMetrics().getNumReadStateMachineFails() == 0);
stateMachine.evictStateMachineCache();
cluster.restartHddsDatanode(follower.getDatanodeDetails(), false);
// wait for the raft server to come up and join the ratis ring
Thread.sleep(10000);
// Make sure the readStateMachine call got triggered after the follower
// caught up
Assert.assertTrue(stateMachine.getMetrics().getNumReadStateMachineOps()
> numReadStateMachineOps);
Assert.assertTrue(
stateMachine.getMetrics().getNumReadStateMachineFails() == 0);
// wait for the chunk to get deleted now
Thread.sleep(10000);
for (HddsDatanodeService dn : cluster.getHddsDatanodes()) {
keyValueHandler =
(KeyValueHandler) dn.getDatanodeStateMachine().getContainer()
.getDispatcher()
.getHandler(ContainerProtos.ContainerType.KeyValueContainer);
// make sure the chunk is now deleted on the all dns
try {
for (ContainerProtos.ChunkInfo chunkInfo : blockData.getChunks()) {
keyValueHandler.getChunkManager().readChunk(container, blockID,
ChunkInfo.getFromProtoBuf(chunkInfo), null);
}
Assert.fail("Expected exception is not thrown");
} catch (IOException ioe) {
Assert.assertTrue(ioe instanceof StorageContainerException);
Assert.assertTrue(((StorageContainerException) ioe).getResult()
== ContainerProtos.Result.UNABLE_TO_FIND_CHUNK);
}
}
}
}