blob: 524c3bdb4ad9a1352ee13b0ccdf919e6b1a195a3 [file] [log] [blame]
/*
* Licensed to the Apache Software Foundation (ASF) under one
* or more contributor license agreements. See the NOTICE file
* distributed with this work for additional information
* regarding copyright ownership. The ASF licenses this file
* to you under the Apache License, Version 2.0 (the
* "License"); you may not use this file except in compliance
* with the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/
package org.apache.hadoop.ozone.container;
import java.util.ArrayList;
import java.util.List;
import org.apache.hadoop.hdds.client.BlockID;
import org.apache.hadoop.hdds.conf.OzoneConfiguration;
import org.apache.hadoop.hdds.protocol.DatanodeDetails;
import org.apache.hadoop.hdds.protocol.datanode.proto.ContainerProtos;
import org.apache.hadoop.hdds.protocol.datanode.proto.ContainerProtos
.ContainerCommandRequestProto;
import org.apache.hadoop.hdds.protocol.datanode.proto.ContainerProtos
.ContainerCommandResponseProto;
import org.apache.hadoop.hdds.protocol.datanode.proto.ContainerProtos
.ContainerType;
import org.apache.hadoop.hdds.protocol.datanode.proto.ContainerProtos
.DatanodeBlockID;
import org.apache.hadoop.hdds.scm.XceiverClientGrpc;
import org.apache.hadoop.hdds.scm.XceiverClientSpi;
import org.apache.hadoop.hdds.scm.pipeline.Pipeline;
import org.apache.hadoop.ozone.HddsDatanodeService;
import org.apache.hadoop.ozone.MiniOzoneCluster;
import org.apache.hadoop.ozone.container.common.helpers.BlockData;
import org.apache.hadoop.ozone.container.common.interfaces.Container;
import org.apache.hadoop.ozone.container.common.statemachine.DatanodeStateMachine;
import org.apache.hadoop.ozone.container.keyvalue.KeyValueHandler;
import org.apache.hadoop.ozone.container.ozoneimpl.OzoneContainer;
import org.apache.hadoop.ozone.container.ozoneimpl.TestOzoneContainer;
import org.apache.hadoop.ozone.protocol.commands.ReplicateContainerCommand;
import org.apache.hadoop.test.GenericTestUtils;
import static org.apache.hadoop.ozone.container.ozoneimpl.TestOzoneContainer
.writeChunkForContainer;
import org.junit.After;
import org.junit.Assert;
import org.junit.Before;
import org.junit.Rule;
import org.junit.Test;
import org.junit.rules.Timeout;
/**
* Tests ozone containers replication.
*/
public class TestContainerReplication {
/**
* Set the timeout for every test.
*/
@Rule
public Timeout testTimeout = new Timeout(300000);
private OzoneConfiguration conf;
private MiniOzoneCluster cluster;
@Before
public void setup() throws Exception {
conf = newOzoneConfiguration();
cluster = MiniOzoneCluster.newBuilder(conf).setNumDatanodes(2)
.setRandomContainerPort(true).build();
}
@After
public void teardown() {
if (cluster != null) {
cluster.shutdown();
}
}
@Test
public void testContainerReplication() throws Exception {
//GIVEN
long containerId = 1L;
cluster.waitForClusterToBeReady();
HddsDatanodeService firstDatanode = cluster.getHddsDatanodes().get(0);
//copy from the first datanode
List<DatanodeDetails> sourceDatanodes = new ArrayList<>();
sourceDatanodes.add(firstDatanode.getDatanodeDetails());
Pipeline sourcePipelines =
ContainerTestHelper.createPipeline(sourceDatanodes);
//create a new client
XceiverClientSpi client = new XceiverClientGrpc(sourcePipelines, conf);
client.connect();
//New container for testing
TestOzoneContainer.createContainerForTesting(client, containerId);
ContainerCommandRequestProto requestProto =
writeChunkForContainer(client, containerId, 1024);
DatanodeBlockID blockID = requestProto.getWriteChunk().getBlockID();
// Put Block to the test container
ContainerCommandRequestProto putBlockRequest = ContainerTestHelper
.getPutBlockRequest(sourcePipelines, requestProto.getWriteChunk());
ContainerCommandResponseProto response =
client.sendCommand(putBlockRequest);
Assert.assertNotNull(response);
Assert.assertEquals(ContainerProtos.Result.SUCCESS, response.getResult());
HddsDatanodeService destinationDatanode =
chooseDatanodeWithoutContainer(sourcePipelines,
cluster.getHddsDatanodes());
// Close the container
ContainerCommandRequestProto closeContainerRequest = ContainerTestHelper
.getCloseContainer(sourcePipelines, containerId);
response = client.sendCommand(closeContainerRequest);
Assert.assertNotNull(response);
Assert.assertEquals(ContainerProtos.Result.SUCCESS, response.getResult());
//WHEN: send the order to replicate the container
cluster.getStorageContainerManager().getScmNodeManager()
.addDatanodeCommand(destinationDatanode.getDatanodeDetails().getUuid(),
new ReplicateContainerCommand(containerId,
sourcePipelines.getNodes()));
DatanodeStateMachine destinationDatanodeDatanodeStateMachine =
destinationDatanode.getDatanodeStateMachine();
//wait for the replication
GenericTestUtils.waitFor(()
-> destinationDatanodeDatanodeStateMachine.getSupervisor()
.getReplicationCounter() > 0, 1000, 20_000);
OzoneContainer ozoneContainer =
destinationDatanodeDatanodeStateMachine.getContainer();
Container container =
ozoneContainer
.getContainerSet().getContainer(containerId);
Assert.assertNotNull(
"Container is not replicated to the destination datanode",
container);
Assert.assertNotNull(
"ContainerData of the replicated container is null",
container.getContainerData());
KeyValueHandler handler = (KeyValueHandler) ozoneContainer.getDispatcher()
.getHandler(ContainerType.KeyValueContainer);
BlockData key = handler.getBlockManager()
.getBlock(container, BlockID.getFromProtobuf(blockID));
Assert.assertNotNull(key);
Assert.assertEquals(1, key.getChunks().size());
Assert.assertEquals(requestProto.getWriteChunk().getChunkData(),
key.getChunks().get(0));
}
private HddsDatanodeService chooseDatanodeWithoutContainer(Pipeline pipeline,
List<HddsDatanodeService> dataNodes) {
for (HddsDatanodeService datanode : dataNodes) {
if (!pipeline.getNodes().contains(datanode.getDatanodeDetails())) {
return datanode;
}
}
throw new AssertionError(
"No datanode outside of the pipeline");
}
private static OzoneConfiguration newOzoneConfiguration() {
return new OzoneConfiguration();
}
}