| /* |
| * Licensed to the Apache Software Foundation (ASF) under one |
| * or more contributor license agreements. See the NOTICE file |
| * distributed with this work for additional information |
| * regarding copyright ownership. The ASF licenses this file |
| * to you under the Apache License, Version 2.0 (the |
| * "License"); you may not use this file except in compliance |
| * with the License. You may obtain a copy of the License at |
| * |
| * http://www.apache.org/licenses/LICENSE-2.0 |
| * |
| * Unless required by applicable law or agreed to in writing, software |
| * distributed under the License is distributed on an "AS IS" BASIS, |
| * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. |
| * See the License for the specific language governing permissions and |
| * limitations under the License. |
| */ |
| |
| package org.apache.hadoop.ozone.container; |
| |
| import java.util.ArrayList; |
| import java.util.List; |
| |
| import org.apache.hadoop.hdds.client.BlockID; |
| import org.apache.hadoop.hdds.conf.OzoneConfiguration; |
| import org.apache.hadoop.hdds.protocol.DatanodeDetails; |
| import org.apache.hadoop.hdds.protocol.datanode.proto.ContainerProtos; |
| import org.apache.hadoop.hdds.protocol.datanode.proto.ContainerProtos |
| .ContainerCommandRequestProto; |
| import org.apache.hadoop.hdds.protocol.datanode.proto.ContainerProtos |
| .ContainerCommandResponseProto; |
| import org.apache.hadoop.hdds.protocol.datanode.proto.ContainerProtos |
| .ContainerType; |
| import org.apache.hadoop.hdds.protocol.datanode.proto.ContainerProtos |
| .DatanodeBlockID; |
| import org.apache.hadoop.hdds.scm.XceiverClientGrpc; |
| import org.apache.hadoop.hdds.scm.XceiverClientSpi; |
| import org.apache.hadoop.hdds.scm.pipeline.Pipeline; |
| import org.apache.hadoop.ozone.HddsDatanodeService; |
| import org.apache.hadoop.ozone.MiniOzoneCluster; |
| import org.apache.hadoop.ozone.container.common.helpers.BlockData; |
| import org.apache.hadoop.ozone.container.common.interfaces.Container; |
| import org.apache.hadoop.ozone.container.common.statemachine.DatanodeStateMachine; |
| import org.apache.hadoop.ozone.container.keyvalue.KeyValueHandler; |
| import org.apache.hadoop.ozone.container.ozoneimpl.OzoneContainer; |
| import org.apache.hadoop.ozone.container.ozoneimpl.TestOzoneContainer; |
| import org.apache.hadoop.ozone.protocol.commands.ReplicateContainerCommand; |
| import org.apache.hadoop.test.GenericTestUtils; |
| |
| import static org.apache.hadoop.ozone.container.ozoneimpl.TestOzoneContainer |
| .writeChunkForContainer; |
| |
| import org.junit.After; |
| import org.junit.Assert; |
| import org.junit.Before; |
| import org.junit.Rule; |
| import org.junit.Test; |
| import org.junit.rules.Timeout; |
| |
| /** |
| * Tests ozone containers replication. |
| */ |
| public class TestContainerReplication { |
| /** |
| * Set the timeout for every test. |
| */ |
| @Rule |
| public Timeout testTimeout = new Timeout(300000); |
| |
| private OzoneConfiguration conf; |
| private MiniOzoneCluster cluster; |
| |
| @Before |
| public void setup() throws Exception { |
| conf = newOzoneConfiguration(); |
| cluster = MiniOzoneCluster.newBuilder(conf).setNumDatanodes(2) |
| .setRandomContainerPort(true).build(); |
| } |
| |
| @After |
| public void teardown() { |
| if (cluster != null) { |
| cluster.shutdown(); |
| } |
| } |
| |
| @Test |
| public void testContainerReplication() throws Exception { |
| //GIVEN |
| long containerId = 1L; |
| |
| cluster.waitForClusterToBeReady(); |
| |
| HddsDatanodeService firstDatanode = cluster.getHddsDatanodes().get(0); |
| |
| //copy from the first datanode |
| List<DatanodeDetails> sourceDatanodes = new ArrayList<>(); |
| sourceDatanodes.add(firstDatanode.getDatanodeDetails()); |
| |
| Pipeline sourcePipelines = |
| ContainerTestHelper.createPipeline(sourceDatanodes); |
| |
| //create a new client |
| XceiverClientSpi client = new XceiverClientGrpc(sourcePipelines, conf); |
| client.connect(); |
| |
| //New container for testing |
| TestOzoneContainer.createContainerForTesting(client, containerId); |
| |
| ContainerCommandRequestProto requestProto = |
| writeChunkForContainer(client, containerId, 1024); |
| |
| DatanodeBlockID blockID = requestProto.getWriteChunk().getBlockID(); |
| |
| // Put Block to the test container |
| ContainerCommandRequestProto putBlockRequest = ContainerTestHelper |
| .getPutBlockRequest(sourcePipelines, requestProto.getWriteChunk()); |
| |
| ContainerCommandResponseProto response = |
| client.sendCommand(putBlockRequest); |
| |
| Assert.assertNotNull(response); |
| Assert.assertEquals(ContainerProtos.Result.SUCCESS, response.getResult()); |
| |
| HddsDatanodeService destinationDatanode = |
| chooseDatanodeWithoutContainer(sourcePipelines, |
| cluster.getHddsDatanodes()); |
| |
| // Close the container |
| ContainerCommandRequestProto closeContainerRequest = ContainerTestHelper |
| .getCloseContainer(sourcePipelines, containerId); |
| response = client.sendCommand(closeContainerRequest); |
| Assert.assertNotNull(response); |
| Assert.assertEquals(ContainerProtos.Result.SUCCESS, response.getResult()); |
| |
| //WHEN: send the order to replicate the container |
| cluster.getStorageContainerManager().getScmNodeManager() |
| .addDatanodeCommand(destinationDatanode.getDatanodeDetails().getUuid(), |
| new ReplicateContainerCommand(containerId, |
| sourcePipelines.getNodes())); |
| |
| DatanodeStateMachine destinationDatanodeDatanodeStateMachine = |
| destinationDatanode.getDatanodeStateMachine(); |
| |
| //wait for the replication |
| GenericTestUtils.waitFor(() |
| -> destinationDatanodeDatanodeStateMachine.getSupervisor() |
| .getReplicationCounter() > 0, 1000, 20_000); |
| |
| OzoneContainer ozoneContainer = |
| destinationDatanodeDatanodeStateMachine.getContainer(); |
| |
| Container container = |
| ozoneContainer |
| .getContainerSet().getContainer(containerId); |
| |
| Assert.assertNotNull( |
| "Container is not replicated to the destination datanode", |
| container); |
| |
| Assert.assertNotNull( |
| "ContainerData of the replicated container is null", |
| container.getContainerData()); |
| |
| KeyValueHandler handler = (KeyValueHandler) ozoneContainer.getDispatcher() |
| .getHandler(ContainerType.KeyValueContainer); |
| |
| BlockData key = handler.getBlockManager() |
| .getBlock(container, BlockID.getFromProtobuf(blockID)); |
| |
| Assert.assertNotNull(key); |
| Assert.assertEquals(1, key.getChunks().size()); |
| Assert.assertEquals(requestProto.getWriteChunk().getChunkData(), |
| key.getChunks().get(0)); |
| } |
| |
| private HddsDatanodeService chooseDatanodeWithoutContainer(Pipeline pipeline, |
| List<HddsDatanodeService> dataNodes) { |
| for (HddsDatanodeService datanode : dataNodes) { |
| if (!pipeline.getNodes().contains(datanode.getDatanodeDetails())) { |
| return datanode; |
| } |
| } |
| throw new AssertionError( |
| "No datanode outside of the pipeline"); |
| } |
| |
| private static OzoneConfiguration newOzoneConfiguration() { |
| return new OzoneConfiguration(); |
| } |
| |
| } |