blob: 1b2a915a1ecfe01814ba88f1a812546658844cf1 [file] [log] [blame]
/**
* Licensed to the Apache Software Foundation (ASF) under one or more
* contributor license agreements. See the NOTICE file distributed with this
* work for additional information regarding copyright ownership. The ASF
* licenses this file to you under the Apache License, Version 2.0 (the
* "License"); you may not use this file except in compliance with the License.
* You may obtain a copy of the License at
* <p>
* http://www.apache.org/licenses/LICENSE-2.0
* <p>
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS, WITHOUT
* WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. See the
* License for the specific language governing permissions and limitations under
* the License.
*/
package org.apache.hadoop.ozone.recon;
import static org.apache.hadoop.hdds.HddsConfigKeys.HDDS_CONTAINER_REPORT_INTERVAL;
import static org.apache.hadoop.hdds.HddsConfigKeys.HDDS_PIPELINE_REPORT_INTERVAL;
import static org.apache.hadoop.hdds.protocol.proto.HddsProtos.ReplicationFactor.ONE;
import static org.apache.hadoop.hdds.protocol.proto.HddsProtos.ReplicationType.RATIS;
import static org.apache.hadoop.hdds.scm.events.SCMEvents.CLOSE_CONTAINER;
import static org.apache.hadoop.ozone.container.ozoneimpl.TestOzoneContainer.runTestOzoneContainerViaDataNode;
import static org.junit.Assert.assertEquals;
import static org.junit.Assert.assertFalse;
import static org.junit.Assert.assertNotNull;
import static org.junit.Assert.assertTrue;
import java.util.Optional;
import org.apache.hadoop.hdds.conf.OzoneConfiguration;
import org.apache.hadoop.hdds.scm.XceiverClientGrpc;
import org.apache.hadoop.hdds.scm.container.ContainerID;
import org.apache.hadoop.hdds.scm.container.ContainerInfo;
import org.apache.hadoop.hdds.scm.container.ContainerManager;
import org.apache.hadoop.hdds.scm.node.NodeManager;
import org.apache.hadoop.hdds.scm.pipeline.Pipeline;
import org.apache.hadoop.hdds.scm.pipeline.PipelineManager;
import org.apache.hadoop.hdds.scm.pipeline.PipelineNotFoundException;
import org.apache.hadoop.hdds.scm.server.OzoneStorageContainerManager;
import org.apache.hadoop.hdds.scm.server.StorageContainerManager;
import org.apache.hadoop.ozone.MiniOzoneCluster;
import org.apache.hadoop.ozone.recon.scm.ReconNodeManager;
import org.apache.hadoop.ozone.recon.scm.ReconStorageContainerManagerFacade;
import org.apache.hadoop.test.GenericTestUtils;
import org.apache.hadoop.test.LambdaTestUtils;
import org.junit.After;
import org.junit.Assert;
import org.junit.Before;
import org.junit.Rule;
import org.junit.Test;
import org.junit.rules.TemporaryFolder;
import org.junit.rules.Timeout;
import org.slf4j.event.Level;
/**
* Recon's passive SCM integration tests.
*/
public class TestReconAsPassiveScm {
/**
* Set a timeout for each test.
*/
@Rule
public Timeout timeout = Timeout.seconds(300);
private MiniOzoneCluster cluster = null;
private OzoneConfiguration conf;
@Rule
public TemporaryFolder temporaryFolder = new TemporaryFolder();
@Before
public void init() throws Exception {
conf = new OzoneConfiguration();
conf.set(HDDS_CONTAINER_REPORT_INTERVAL, "5s");
conf.set(HDDS_PIPELINE_REPORT_INTERVAL, "5s");
cluster = MiniOzoneCluster.newBuilder(conf).setNumDatanodes(3)
.includeRecon(true).build();
cluster.waitForClusterToBeReady();
}
@After
public void shutdown() {
if (cluster != null) {
cluster.shutdown();
}
}
@Test
public void testDatanodeRegistrationAndReports() throws Exception {
ReconStorageContainerManagerFacade reconScm =
(ReconStorageContainerManagerFacade)
cluster.getReconServer().getReconStorageContainerManager();
StorageContainerManager scm = cluster.getStorageContainerManager();
PipelineManager reconPipelineManager = reconScm.getPipelineManager();
PipelineManager scmPipelineManager = scm.getPipelineManager();
LambdaTestUtils.await(60000, 5000,
() -> (reconPipelineManager.getPipelines().size() >= 4));
// Verify if Recon has all the pipelines from SCM.
scmPipelineManager.getPipelines().forEach(p -> {
try {
assertNotNull(reconPipelineManager.getPipeline(p.getId()));
} catch (PipelineNotFoundException e) {
Assert.fail();
}
});
// Verify we can never create a pipeline in Recon.
LambdaTestUtils.intercept(UnsupportedOperationException.class,
"Trying to create pipeline in Recon, which is prohibited!",
() -> reconPipelineManager.createPipeline(RATIS, ONE));
ContainerManager scmContainerManager = scm.getContainerManager();
assertTrue(scmContainerManager.getContainerIDs().isEmpty());
// Verify if all the 3 nodes are registered with Recon.
NodeManager reconNodeManager = reconScm.getScmNodeManager();
NodeManager scmNodeManager = scm.getScmNodeManager();
assertEquals(scmNodeManager.getAllNodes().size(),
reconNodeManager.getAllNodes().size());
// Create container
ContainerManager reconContainerManager = reconScm.getContainerManager();
ContainerInfo containerInfo =
scmContainerManager.allocateContainer(RATIS, ONE, "test");
long containerID = containerInfo.getContainerID();
Pipeline pipeline =
scmPipelineManager.getPipeline(containerInfo.getPipelineID());
XceiverClientGrpc client = new XceiverClientGrpc(pipeline, conf);
runTestOzoneContainerViaDataNode(containerID, client);
// Verify Recon picked up the new container that was created.
assertEquals(scmContainerManager.getContainerIDs(),
reconContainerManager.getContainerIDs());
GenericTestUtils.LogCapturer logCapturer =
GenericTestUtils.LogCapturer.captureLogs(ReconNodeManager.LOG);
GenericTestUtils.setLogLevel(ReconNodeManager.LOG, Level.DEBUG);
reconScm.getEventQueue().fireEvent(CLOSE_CONTAINER,
containerInfo.containerID());
GenericTestUtils.waitFor(() -> logCapturer.getOutput()
.contains("Ignoring unsupported command closeContainerCommand"),
1000, 20000);
}
@Test
public void testReconRestart() throws Exception {
final OzoneStorageContainerManager reconScm =
cluster.getReconServer().getReconStorageContainerManager();
StorageContainerManager scm = cluster.getStorageContainerManager();
// Stop Recon
ContainerManager scmContainerManager = scm.getContainerManager();
assertTrue(scmContainerManager.getContainerIDs().isEmpty());
ContainerManager reconContainerManager = reconScm.getContainerManager();
assertTrue(reconContainerManager.getContainerIDs().isEmpty());
LambdaTestUtils.await(60000, 5000,
() -> (reconScm.getScmNodeManager().getAllNodes().size() == 3));
cluster.stopRecon();
// Create container in SCM.
ContainerInfo containerInfo =
scmContainerManager.allocateContainer(RATIS, ONE, "test");
long containerID = containerInfo.getContainerID();
PipelineManager scmPipelineManager = scm.getPipelineManager();
Pipeline pipeline =
scmPipelineManager.getPipeline(containerInfo.getPipelineID());
XceiverClientGrpc client = new XceiverClientGrpc(pipeline, conf);
runTestOzoneContainerViaDataNode(containerID, client);
assertFalse(scmContainerManager.getContainerIDs().isEmpty());
// Close a pipeline
Optional<Pipeline> pipelineToClose = scmPipelineManager
.getPipelines(RATIS, ONE)
.stream()
.filter(p -> !p.getId().equals(containerInfo.getPipelineID()))
.findFirst();
assertTrue(pipelineToClose.isPresent());
scmPipelineManager.finalizeAndDestroyPipeline(pipelineToClose.get(), false);
// Start Recon
cluster.startRecon();
// Verify if Recon has all the nodes on restart (even if heartbeats are
// not yet received).
NodeManager reconNodeManager = reconScm.getScmNodeManager();
NodeManager scmNodeManager = scm.getScmNodeManager();
assertEquals(scmNodeManager.getAllNodes().size(),
reconNodeManager.getAllNodes().size());
// Verify Recon picks up new container, close pipeline SCM actions.
OzoneStorageContainerManager newReconScm =
cluster.getReconServer().getReconStorageContainerManager();
PipelineManager reconPipelineManager = newReconScm.getPipelineManager();
assertFalse(
reconPipelineManager.containsPipeline(pipelineToClose.get().getId()));
LambdaTestUtils.await(90000, 5000,
() -> (newReconScm.getContainerManager()
.exists(ContainerID.valueof(containerID))));
}
}