| /** |
| * Licensed to the Apache Software Foundation (ASF) under one |
| * or more contributor license agreements. See the NOTICE file |
| * distributed with this work for additional information |
| * regarding copyright ownership. The ASF licenses this file |
| * to you under the Apache License, Version 2.0 (the |
| * "License"); you may not use this file except in compliance |
| * with the License. You may obtain a copy of the License at |
| * |
| * http://www.apache.org/licenses/LICENSE-2.0 |
| * |
| * Unless required by applicable law or agreed to in writing, software |
| * distributed under the License is distributed on an "AS IS" BASIS, |
| * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. |
| * See the License for the specific language governing permissions and |
| * limitations under the License. |
| */ |
| package org.apache.hadoop.hdds.scm.pipeline; |
| |
| import com.google.common.base.Supplier; |
| import org.apache.hadoop.fs.FileUtil; |
| import org.apache.hadoop.hdds.HddsConfigKeys; |
| import org.apache.hadoop.hdds.client.RatisReplicationConfig; |
| import org.apache.hadoop.hdds.conf.OzoneConfiguration; |
| import org.apache.hadoop.hdds.protocol.DatanodeDetails; |
| import org.apache.hadoop.hdds.protocol.proto.HddsProtos; |
| import org.apache.hadoop.hdds.protocol.proto.HddsProtos.ReplicationFactor; |
| import org.apache.hadoop.hdds.protocol.proto.StorageContainerDatanodeProtocolProtos; |
| import org.apache.hadoop.hdds.scm.HddsTestUtils; |
| import org.apache.hadoop.hdds.scm.container.ContainerID; |
| import org.apache.hadoop.hdds.scm.container.ContainerInfo; |
| import org.apache.hadoop.hdds.scm.container.ContainerManager; |
| import org.apache.hadoop.hdds.scm.container.ContainerReplica; |
| import org.apache.hadoop.hdds.scm.container.MockNodeManager; |
| import org.apache.hadoop.hdds.scm.exceptions.SCMException; |
| import org.apache.hadoop.hdds.scm.ha.SCMHADBTransactionBuffer; |
| import org.apache.hadoop.hdds.scm.ha.SCMHADBTransactionBufferStub; |
| import org.apache.hadoop.hdds.scm.ha.SCMHAManagerStub; |
| import org.apache.hadoop.hdds.scm.ha.SCMContext; |
| import org.apache.hadoop.hdds.scm.ha.SCMServiceManager; |
| import org.apache.hadoop.hdds.scm.metadata.SCMDBDefinition; |
| import org.apache.hadoop.hdds.scm.node.NodeStatus; |
| import org.apache.hadoop.hdds.scm.safemode.SCMSafeModeManager; |
| import org.apache.hadoop.hdds.scm.server.SCMDatanodeHeartbeatDispatcher; |
| import org.apache.hadoop.hdds.scm.server.StorageContainerManager; |
| import org.apache.hadoop.hdds.server.events.EventQueue; |
| import org.apache.hadoop.hdds.utils.db.DBStore; |
| import org.apache.hadoop.hdds.utils.db.DBStoreBuilder; |
| import org.apache.hadoop.hdds.utils.db.Table; |
| import org.apache.hadoop.metrics2.MetricsRecordBuilder; |
| import org.apache.hadoop.ozone.container.common.SCMTestUtils; |
| import org.apache.ozone.test.GenericTestUtils; |
| import org.apache.ratis.protocol.exceptions.NotLeaderException; |
| import org.junit.After; |
| import org.junit.Assert; |
| import org.junit.Before; |
| import org.junit.Test; |
| import org.slf4j.LoggerFactory; |
| |
| import java.io.File; |
| import java.io.IOException; |
| import java.util.ArrayList; |
| import java.util.HashSet; |
| import java.util.List; |
| import java.util.Set; |
| import java.util.UUID; |
| import java.util.concurrent.TimeUnit; |
| import java.util.stream.Collectors; |
| |
| import static org.apache.hadoop.hdds.scm.ScmConfigKeys.OZONE_DATANODE_PIPELINE_LIMIT; |
| import static org.apache.hadoop.hdds.scm.ScmConfigKeys.OZONE_DATANODE_PIPELINE_LIMIT_DEFAULT; |
| import static org.apache.hadoop.hdds.scm.ScmConfigKeys.OZONE_SCM_PIPELINE_ALLOCATED_TIMEOUT; |
| import static org.apache.hadoop.hdds.scm.pipeline.Pipeline.PipelineState.ALLOCATED; |
| import static org.apache.hadoop.test.MetricsAsserts.getLongCounter; |
| import static org.apache.hadoop.test.MetricsAsserts.getMetrics; |
| import static org.junit.Assert.assertTrue; |
| import static org.junit.Assert.fail; |
| |
| /** |
| * Tests for PipelineManagerImpl. |
| */ |
| public class TestPipelineManagerImpl { |
| private OzoneConfiguration conf; |
| private File testDir; |
| private DBStore dbStore; |
| private MockNodeManager nodeManager; |
| private int maxPipelineCount; |
| private SCMContext scmContext; |
| private SCMServiceManager serviceManager; |
| private StorageContainerManager scm; |
| |
| @Before |
| public void init() throws Exception { |
| conf = SCMTestUtils.getConf(); |
| testDir = GenericTestUtils.getTestDir( |
| TestPipelineManagerImpl.class.getSimpleName() + UUID.randomUUID()); |
| conf.set(HddsConfigKeys.OZONE_METADATA_DIRS, |
| GenericTestUtils.getRandomizedTempPath()); |
| scm = HddsTestUtils.getScm(conf); |
| conf.set(HddsConfigKeys.OZONE_METADATA_DIRS, testDir.getAbsolutePath()); |
| dbStore = DBStoreBuilder.createDBStore(conf, new SCMDBDefinition()); |
| nodeManager = new MockNodeManager(true, 20); |
| maxPipelineCount = nodeManager.getNodeCount( |
| HddsProtos.NodeOperationalState.IN_SERVICE, |
| HddsProtos.NodeState.HEALTHY) * |
| conf.getInt(OZONE_DATANODE_PIPELINE_LIMIT, |
| OZONE_DATANODE_PIPELINE_LIMIT_DEFAULT) / |
| HddsProtos.ReplicationFactor.THREE.getNumber(); |
| scmContext = new SCMContext.Builder().setIsInSafeMode(true) |
| .setLeader(true).setIsPreCheckComplete(true) |
| .setSCM(scm).build(); |
| serviceManager = new SCMServiceManager(); |
| } |
| |
| @After |
| public void cleanup() throws Exception { |
| if (dbStore != null) { |
| dbStore.close(); |
| } |
| FileUtil.fullyDelete(testDir); |
| } |
| |
| private PipelineManagerImpl createPipelineManager(boolean isLeader) |
| throws IOException { |
| return PipelineManagerImpl.newPipelineManager(conf, |
| SCMHAManagerStub.getInstance(isLeader), |
| new MockNodeManager(true, 20), |
| SCMDBDefinition.PIPELINES.getTable(dbStore), |
| new EventQueue(), |
| scmContext, |
| serviceManager); |
| } |
| |
| private PipelineManagerImpl createPipelineManager( |
| boolean isLeader, SCMHADBTransactionBuffer buffer) throws IOException { |
| return PipelineManagerImpl.newPipelineManager(conf, |
| SCMHAManagerStub.getInstance(isLeader, buffer), |
| new MockNodeManager(true, 20), |
| SCMDBDefinition.PIPELINES.getTable(dbStore), |
| new EventQueue(), |
| SCMContext.emptyContext(), |
| serviceManager); |
| } |
| |
| @Test |
| public void testCreatePipeline() throws Exception { |
| SCMHADBTransactionBuffer buffer1 = |
| new SCMHADBTransactionBufferStub(dbStore); |
| PipelineManagerImpl pipelineManager = |
| createPipelineManager(true, buffer1); |
| Assert.assertTrue(pipelineManager.getPipelines().isEmpty()); |
| Pipeline pipeline1 = pipelineManager.createPipeline( |
| RatisReplicationConfig.getInstance(ReplicationFactor.THREE)); |
| Assert.assertEquals(1, pipelineManager.getPipelines().size()); |
| Assert.assertTrue(pipelineManager.containsPipeline(pipeline1.getId())); |
| |
| Pipeline pipeline2 = pipelineManager.createPipeline( |
| RatisReplicationConfig.getInstance(ReplicationFactor.ONE)); |
| Assert.assertEquals(2, pipelineManager.getPipelines().size()); |
| Assert.assertTrue(pipelineManager.containsPipeline(pipeline2.getId())); |
| buffer1.close(); |
| pipelineManager.close(); |
| |
| SCMHADBTransactionBuffer buffer2 = |
| new SCMHADBTransactionBufferStub(dbStore); |
| PipelineManagerImpl pipelineManager2 = |
| createPipelineManager(true, buffer2); |
| // Should be able to load previous pipelines. |
| Assert.assertFalse(pipelineManager2.getPipelines().isEmpty()); |
| Assert.assertEquals(2, pipelineManager.getPipelines().size()); |
| Pipeline pipeline3 = pipelineManager2.createPipeline( |
| RatisReplicationConfig.getInstance(ReplicationFactor.THREE)); |
| buffer2.close(); |
| Assert.assertEquals(3, pipelineManager2.getPipelines().size()); |
| Assert.assertTrue(pipelineManager2.containsPipeline(pipeline3.getId())); |
| |
| pipelineManager2.close(); |
| } |
| |
| @Test |
| public void testCreatePipelineShouldFailOnFollower() throws Exception { |
| PipelineManagerImpl pipelineManager = createPipelineManager(false); |
| Assert.assertTrue(pipelineManager.getPipelines().isEmpty()); |
| try { |
| pipelineManager |
| .createPipeline(RatisReplicationConfig |
| .getInstance(ReplicationFactor.THREE)); |
| } catch (NotLeaderException ex) { |
| pipelineManager.close(); |
| return; |
| } |
| // Should not reach here. |
| Assert.fail(); |
| } |
| |
| @Test |
| public void testUpdatePipelineStates() throws Exception { |
| SCMHADBTransactionBuffer buffer = new SCMHADBTransactionBufferStub(dbStore); |
| PipelineManagerImpl pipelineManager = |
| createPipelineManager(true, buffer); |
| Table<PipelineID, Pipeline> pipelineStore = |
| SCMDBDefinition.PIPELINES.getTable(dbStore); |
| Pipeline pipeline = pipelineManager.createPipeline( |
| RatisReplicationConfig.getInstance(ReplicationFactor.THREE)); |
| Assert.assertEquals(1, pipelineManager.getPipelines().size()); |
| Assert.assertTrue(pipelineManager.containsPipeline(pipeline.getId())); |
| Assert.assertEquals(ALLOCATED, pipeline.getPipelineState()); |
| buffer.flush(); |
| Assert.assertEquals(ALLOCATED, |
| pipelineStore.get(pipeline.getId()).getPipelineState()); |
| PipelineID pipelineID = pipeline.getId(); |
| |
| pipelineManager.openPipeline(pipelineID); |
| pipelineManager.addContainerToPipeline(pipelineID, ContainerID.valueOf(1)); |
| Assert.assertTrue(pipelineManager |
| .getPipelines(RatisReplicationConfig |
| .getInstance(ReplicationFactor.THREE), |
| Pipeline.PipelineState.OPEN).contains(pipeline)); |
| buffer.flush(); |
| Assert.assertTrue(pipelineStore.get(pipeline.getId()).isOpen()); |
| |
| pipelineManager.deactivatePipeline(pipeline.getId()); |
| Assert.assertEquals(Pipeline.PipelineState.DORMANT, |
| pipelineManager.getPipeline(pipelineID).getPipelineState()); |
| buffer.flush(); |
| Assert.assertEquals(Pipeline.PipelineState.DORMANT, |
| pipelineStore.get(pipeline.getId()).getPipelineState()); |
| Assert.assertFalse(pipelineManager |
| .getPipelines(RatisReplicationConfig |
| .getInstance(ReplicationFactor.THREE), |
| Pipeline.PipelineState.OPEN).contains(pipeline)); |
| Assert.assertEquals(1, pipelineManager.getPipelineCount( |
| RatisReplicationConfig.getInstance(ReplicationFactor.THREE), |
| Pipeline.PipelineState.DORMANT)); |
| |
| pipelineManager.activatePipeline(pipeline.getId()); |
| Assert.assertTrue(pipelineManager |
| .getPipelines(RatisReplicationConfig |
| .getInstance(ReplicationFactor.THREE), |
| Pipeline.PipelineState.OPEN).contains(pipeline)); |
| Assert.assertEquals(1, pipelineManager.getPipelineCount( |
| RatisReplicationConfig.getInstance(ReplicationFactor.THREE), |
| Pipeline.PipelineState.OPEN)); |
| buffer.flush(); |
| Assert.assertTrue(pipelineStore.get(pipeline.getId()).isOpen()); |
| pipelineManager.close(); |
| } |
| |
| @Test |
| public void testOpenPipelineShouldFailOnFollower() throws Exception { |
| PipelineManagerImpl pipelineManager = createPipelineManager(true); |
| Pipeline pipeline = pipelineManager.createPipeline( |
| RatisReplicationConfig.getInstance(ReplicationFactor.THREE)); |
| Assert.assertEquals(1, pipelineManager.getPipelines().size()); |
| Assert.assertTrue(pipelineManager.containsPipeline(pipeline.getId())); |
| Assert.assertEquals(ALLOCATED, pipeline.getPipelineState()); |
| // Change to follower |
| assert pipelineManager.getScmhaManager() instanceof SCMHAManagerStub; |
| ((SCMHAManagerStub) pipelineManager.getScmhaManager()).setIsLeader(false); |
| try { |
| pipelineManager.openPipeline(pipeline.getId()); |
| } catch (NotLeaderException ex) { |
| pipelineManager.close(); |
| return; |
| } |
| // Should not reach here. |
| Assert.fail(); |
| } |
| |
| @Test |
| public void testActivatePipelineShouldFailOnFollower() throws Exception { |
| PipelineManagerImpl pipelineManager = createPipelineManager(true); |
| Pipeline pipeline = pipelineManager.createPipeline( |
| RatisReplicationConfig.getInstance(ReplicationFactor.THREE)); |
| Assert.assertEquals(1, pipelineManager.getPipelines().size()); |
| Assert.assertTrue(pipelineManager.containsPipeline(pipeline.getId())); |
| Assert.assertEquals(ALLOCATED, pipeline.getPipelineState()); |
| // Change to follower |
| assert pipelineManager.getScmhaManager() instanceof SCMHAManagerStub; |
| ((SCMHAManagerStub) pipelineManager.getScmhaManager()).setIsLeader(false); |
| try { |
| pipelineManager.activatePipeline(pipeline.getId()); |
| } catch (NotLeaderException ex) { |
| pipelineManager.close(); |
| return; |
| } |
| // Should not reach here. |
| Assert.fail(); |
| } |
| |
| @Test |
| public void testDeactivatePipelineShouldFailOnFollower() throws Exception { |
| PipelineManagerImpl pipelineManager = createPipelineManager(true); |
| Pipeline pipeline = pipelineManager.createPipeline( |
| RatisReplicationConfig.getInstance(ReplicationFactor.THREE)); |
| Assert.assertEquals(1, pipelineManager.getPipelines().size()); |
| Assert.assertTrue(pipelineManager.containsPipeline(pipeline.getId())); |
| Assert.assertEquals(ALLOCATED, pipeline.getPipelineState()); |
| // Change to follower |
| assert pipelineManager.getScmhaManager() instanceof SCMHAManagerStub; |
| ((SCMHAManagerStub) pipelineManager.getScmhaManager()).setIsLeader(false); |
| try { |
| pipelineManager.deactivatePipeline(pipeline.getId()); |
| } catch (NotLeaderException ex) { |
| pipelineManager.close(); |
| return; |
| } |
| // Should not reach here. |
| Assert.fail(); |
| } |
| |
| @Test |
| public void testRemovePipeline() throws Exception { |
| PipelineManagerImpl pipelineManager = createPipelineManager(true); |
| pipelineManager.setScmContext(scmContext); |
| // Create a pipeline |
| Pipeline pipeline = pipelineManager.createPipeline( |
| RatisReplicationConfig.getInstance(ReplicationFactor.THREE)); |
| Assert.assertEquals(1, pipelineManager.getPipelines().size()); |
| Assert.assertTrue(pipelineManager.containsPipeline(pipeline.getId())); |
| Assert.assertEquals(ALLOCATED, pipeline.getPipelineState()); |
| |
| // Open the pipeline |
| pipelineManager.openPipeline(pipeline.getId()); |
| ContainerManager containerManager = scm.getContainerManager(); |
| ContainerInfo containerInfo = HddsTestUtils. |
| getContainer(HddsProtos.LifeCycleState.CLOSED, pipeline.getId()); |
| ContainerID containerID = containerInfo.containerID(); |
| //Add Container to ContainerMap |
| containerManager.getContainerStateManager(). |
| addContainer(containerInfo.getProtobuf()); |
| //Add Container to PipelineStateMap |
| pipelineManager.addContainerToPipeline(pipeline.getId(), containerID); |
| Assert.assertTrue(pipelineManager |
| .getPipelines(RatisReplicationConfig |
| .getInstance(ReplicationFactor.THREE), |
| Pipeline.PipelineState.OPEN).contains(pipeline)); |
| |
| try { |
| pipelineManager.removePipeline(pipeline); |
| fail(); |
| } catch (IOException ioe) { |
| // Should not be able to remove the OPEN pipeline. |
| Assert.assertEquals(1, pipelineManager.getPipelines().size()); |
| } catch (Exception e) { |
| Assert.fail("Should not reach here."); |
| } |
| |
| // Destroy pipeline |
| pipelineManager.closePipeline(pipeline, false); |
| try { |
| pipelineManager.getPipeline(pipeline.getId()); |
| fail("Pipeline should not have been retrieved"); |
| } catch (PipelineNotFoundException e) { |
| // There may be pipelines created by BackgroundPipelineCreator |
| // exist in pipelineManager, just ignore them. |
| } |
| |
| pipelineManager.close(); |
| } |
| |
| @Test |
| public void testClosePipelineShouldFailOnFollower() throws Exception { |
| PipelineManagerImpl pipelineManager = createPipelineManager(true); |
| pipelineManager.setScmContext(scmContext); |
| Pipeline pipeline = pipelineManager.createPipeline( |
| RatisReplicationConfig.getInstance(ReplicationFactor.THREE)); |
| Assert.assertEquals(1, pipelineManager.getPipelines().size()); |
| Assert.assertTrue(pipelineManager.containsPipeline(pipeline.getId())); |
| Assert.assertEquals(ALLOCATED, pipeline.getPipelineState()); |
| // Change to follower |
| assert pipelineManager.getScmhaManager() instanceof SCMHAManagerStub; |
| ((SCMHAManagerStub) pipelineManager.getScmhaManager()).setIsLeader(false); |
| try { |
| pipelineManager.closePipeline(pipeline, false); |
| } catch (NotLeaderException ex) { |
| pipelineManager.close(); |
| return; |
| } |
| // Should not reach here. |
| Assert.fail(); |
| } |
| |
| @Test |
| public void testPipelineReport() throws Exception { |
| PipelineManagerImpl pipelineManager = createPipelineManager(true); |
| pipelineManager.setScmContext(scmContext); |
| SCMSafeModeManager scmSafeModeManager = |
| new SCMSafeModeManager(conf, new ArrayList<>(), null, pipelineManager, |
| new EventQueue(), serviceManager, scmContext); |
| Pipeline pipeline = pipelineManager |
| .createPipeline(RatisReplicationConfig |
| .getInstance(ReplicationFactor.THREE)); |
| |
| // pipeline is not healthy until all dns report |
| List<DatanodeDetails> nodes = pipeline.getNodes(); |
| Assert.assertFalse( |
| pipelineManager.getPipeline(pipeline.getId()).isHealthy()); |
| // get pipeline report from each dn in the pipeline |
| PipelineReportHandler pipelineReportHandler = |
| new PipelineReportHandler(scmSafeModeManager, pipelineManager, |
| SCMContext.emptyContext(), conf); |
| nodes.subList(0, 2).forEach(dn -> sendPipelineReport(dn, pipeline, |
| pipelineReportHandler, false)); |
| sendPipelineReport(nodes.get(nodes.size() - 1), pipeline, |
| pipelineReportHandler, true); |
| |
| // pipeline is healthy when all dns report |
| Assert |
| .assertTrue(pipelineManager.getPipeline(pipeline.getId()).isHealthy()); |
| // pipeline should now move to open state |
| Assert |
| .assertTrue(pipelineManager.getPipeline(pipeline.getId()).isOpen()); |
| |
| // close the pipeline |
| pipelineManager.closePipeline(pipeline, false); |
| |
| // pipeline report for destroyed pipeline should be ignored |
| nodes.subList(0, 2).forEach(dn -> sendPipelineReport(dn, pipeline, |
| pipelineReportHandler, false)); |
| sendPipelineReport(nodes.get(nodes.size() - 1), pipeline, |
| pipelineReportHandler, true); |
| |
| try { |
| pipelineManager.getPipeline(pipeline.getId()); |
| fail("Pipeline should not have been retrieved"); |
| } catch (PipelineNotFoundException e) { |
| // should reach here |
| } |
| |
| // clean up |
| pipelineManager.close(); |
| } |
| |
| @Test |
| public void testPipelineCreationFailedMetric() throws Exception { |
| PipelineManagerImpl pipelineManager = createPipelineManager(true); |
| |
| // No pipeline at start |
| MetricsRecordBuilder metrics = getMetrics( |
| SCMPipelineMetrics.class.getSimpleName()); |
| long numPipelineAllocated = getLongCounter("NumPipelineAllocated", |
| metrics); |
| Assert.assertEquals(0, numPipelineAllocated); |
| |
| // 3 DNs are unhealthy. |
| // Create 5 pipelines (Use up 15 Datanodes) |
| |
| for (int i = 0; i < maxPipelineCount; i++) { |
| Pipeline pipeline = pipelineManager |
| .createPipeline(RatisReplicationConfig |
| .getInstance(ReplicationFactor.THREE)); |
| Assert.assertNotNull(pipeline); |
| } |
| |
| metrics = getMetrics( |
| SCMPipelineMetrics.class.getSimpleName()); |
| numPipelineAllocated = getLongCounter("NumPipelineAllocated", metrics); |
| Assert.assertEquals(maxPipelineCount, numPipelineAllocated); |
| |
| long numPipelineCreateFailed = getLongCounter( |
| "NumPipelineCreationFailed", metrics); |
| Assert.assertEquals(0, numPipelineCreateFailed); |
| |
| //This should fail... |
| try { |
| pipelineManager |
| .createPipeline(RatisReplicationConfig |
| .getInstance(ReplicationFactor.THREE)); |
| fail(); |
| } catch (SCMException ioe) { |
| // pipeline creation failed this time. |
| Assert.assertEquals(SCMException.ResultCodes.FAILED_TO_FIND_SUITABLE_NODE, |
| ioe.getResult()); |
| } |
| |
| metrics = getMetrics( |
| SCMPipelineMetrics.class.getSimpleName()); |
| numPipelineAllocated = getLongCounter("NumPipelineAllocated", metrics); |
| Assert.assertEquals(maxPipelineCount, numPipelineAllocated); |
| |
| numPipelineCreateFailed = getLongCounter( |
| "NumPipelineCreationFailed", metrics); |
| Assert.assertEquals(1, numPipelineCreateFailed); |
| |
| // clean up |
| pipelineManager.close(); |
| } |
| |
| @Test |
| public void testPipelineOpenOnlyWhenLeaderReported() throws Exception { |
| SCMHADBTransactionBuffer buffer1 = |
| new SCMHADBTransactionBufferStub(dbStore); |
| PipelineManagerImpl pipelineManager = |
| createPipelineManager(true, buffer1); |
| |
| Pipeline pipeline = pipelineManager |
| .createPipeline(RatisReplicationConfig |
| .getInstance(ReplicationFactor.THREE)); |
| // close manager |
| buffer1.close(); |
| pipelineManager.close(); |
| // new pipeline manager loads the pipelines from the db in ALLOCATED state |
| pipelineManager = createPipelineManager(true); |
| Assert.assertEquals(Pipeline.PipelineState.ALLOCATED, |
| pipelineManager.getPipeline(pipeline.getId()).getPipelineState()); |
| |
| SCMSafeModeManager scmSafeModeManager = |
| new SCMSafeModeManager(new OzoneConfiguration(), new ArrayList<>(), |
| null, pipelineManager, new EventQueue(), |
| serviceManager, scmContext); |
| PipelineReportHandler pipelineReportHandler = |
| new PipelineReportHandler(scmSafeModeManager, pipelineManager, |
| SCMContext.emptyContext(), conf); |
| |
| // Report pipelines with leaders |
| List<DatanodeDetails> nodes = pipeline.getNodes(); |
| Assert.assertEquals(3, nodes.size()); |
| // Send report for all but no leader |
| nodes.forEach(dn -> sendPipelineReport(dn, pipeline, pipelineReportHandler, |
| false)); |
| |
| Assert.assertEquals(Pipeline.PipelineState.ALLOCATED, |
| pipelineManager.getPipeline(pipeline.getId()).getPipelineState()); |
| |
| nodes.subList(0, 2).forEach(dn -> sendPipelineReport(dn, pipeline, |
| pipelineReportHandler, false)); |
| sendPipelineReport(nodes.get(nodes.size() - 1), pipeline, |
| pipelineReportHandler, true); |
| |
| Assert.assertEquals(Pipeline.PipelineState.OPEN, |
| pipelineManager.getPipeline(pipeline.getId()).getPipelineState()); |
| |
| pipelineManager.close(); |
| } |
| |
| @Test |
| public void testScrubPipeline() throws Exception { |
| // No timeout for pipeline scrubber. |
| conf.setTimeDuration( |
| OZONE_SCM_PIPELINE_ALLOCATED_TIMEOUT, -1, |
| TimeUnit.MILLISECONDS); |
| |
| PipelineManagerImpl pipelineManager = createPipelineManager(true); |
| pipelineManager.setScmContext(scmContext); |
| Pipeline pipeline = pipelineManager |
| .createPipeline(RatisReplicationConfig |
| .getInstance(ReplicationFactor.THREE)); |
| // At this point, pipeline is not at OPEN stage. |
| Assert.assertEquals(Pipeline.PipelineState.ALLOCATED, |
| pipeline.getPipelineState()); |
| |
| // pipeline should be seen in pipelineManager as ALLOCATED. |
| Assert.assertTrue(pipelineManager |
| .getPipelines(RatisReplicationConfig |
| .getInstance(ReplicationFactor.THREE), |
| Pipeline.PipelineState.ALLOCATED).contains(pipeline)); |
| pipelineManager |
| .scrubPipeline(RatisReplicationConfig |
| .getInstance(ReplicationFactor.THREE)); |
| |
| // pipeline should be scrubbed. |
| Assert.assertFalse(pipelineManager |
| .getPipelines(RatisReplicationConfig |
| .getInstance(ReplicationFactor.THREE), |
| Pipeline.PipelineState.ALLOCATED).contains(pipeline)); |
| |
| pipelineManager.close(); |
| } |
| |
| @Test |
| public void testScrubPipelineShouldFailOnFollower() throws Exception { |
| // No timeout for pipeline scrubber. |
| conf.setTimeDuration( |
| OZONE_SCM_PIPELINE_ALLOCATED_TIMEOUT, -1, |
| TimeUnit.MILLISECONDS); |
| |
| PipelineManagerImpl pipelineManager = createPipelineManager(true); |
| pipelineManager.setScmContext(scmContext); |
| Pipeline pipeline = pipelineManager |
| .createPipeline(RatisReplicationConfig |
| .getInstance(ReplicationFactor.THREE)); |
| // At this point, pipeline is not at OPEN stage. |
| Assert.assertEquals(Pipeline.PipelineState.ALLOCATED, |
| pipeline.getPipelineState()); |
| |
| // pipeline should be seen in pipelineManager as ALLOCATED. |
| Assert.assertTrue(pipelineManager |
| .getPipelines(RatisReplicationConfig |
| .getInstance(ReplicationFactor.THREE), |
| Pipeline.PipelineState.ALLOCATED).contains(pipeline)); |
| |
| // Change to follower |
| assert pipelineManager.getScmhaManager() instanceof SCMHAManagerStub; |
| ((SCMHAManagerStub) pipelineManager.getScmhaManager()).setIsLeader(false); |
| |
| try { |
| pipelineManager |
| .scrubPipeline(RatisReplicationConfig |
| .getInstance(ReplicationFactor.THREE)); |
| } catch (NotLeaderException ex) { |
| pipelineManager.close(); |
| return; |
| } |
| // Should not reach here. |
| Assert.fail(); |
| } |
| |
| @Test |
| public void testPipelineNotCreatedUntilSafeModePrecheck() throws Exception { |
| // No timeout for pipeline scrubber. |
| conf.setTimeDuration( |
| OZONE_SCM_PIPELINE_ALLOCATED_TIMEOUT, -1, |
| TimeUnit.MILLISECONDS); |
| |
| scmContext.updateSafeModeStatus( |
| new SCMSafeModeManager.SafeModeStatus(true, false)); |
| |
| PipelineManagerImpl pipelineManager = createPipelineManager(true); |
| try { |
| pipelineManager |
| .createPipeline(RatisReplicationConfig |
| .getInstance(ReplicationFactor.THREE)); |
| fail("Pipelines should not have been created"); |
| } catch (IOException e) { |
| // No pipeline is created. |
| Assert.assertTrue(pipelineManager.getPipelines().isEmpty()); |
| } |
| |
| // Ensure a pipeline of factor ONE can be created - no exceptions should be |
| // raised. |
| Pipeline pipeline = pipelineManager |
| .createPipeline(RatisReplicationConfig |
| .getInstance(ReplicationFactor.ONE)); |
| Assert.assertTrue(pipelineManager |
| .getPipelines(RatisReplicationConfig |
| .getInstance(ReplicationFactor.ONE)) |
| .contains(pipeline)); |
| |
| // Simulate safemode check exiting. |
| scmContext.updateSafeModeStatus( |
| new SCMSafeModeManager.SafeModeStatus(true, true)); |
| GenericTestUtils.waitFor(new Supplier<Boolean>() { |
| @Override |
| public Boolean get() { |
| return pipelineManager.getPipelines().size() != 0; |
| } |
| }, 100, 10000); |
| pipelineManager.close(); |
| } |
| |
| @Test |
| public void testSafeModeUpdatedOnSafemodeExit() throws Exception { |
| // No timeout for pipeline scrubber. |
| conf.setTimeDuration( |
| OZONE_SCM_PIPELINE_ALLOCATED_TIMEOUT, -1, |
| TimeUnit.MILLISECONDS); |
| |
| PipelineManagerImpl pipelineManager = createPipelineManager(true); |
| |
| scmContext.updateSafeModeStatus( |
| new SCMSafeModeManager.SafeModeStatus(true, false)); |
| Assert.assertTrue(pipelineManager.getSafeModeStatus()); |
| Assert.assertFalse(pipelineManager.isPipelineCreationAllowed()); |
| |
| // First pass pre-check as true, but safemode still on |
| // Simulate safemode check exiting. |
| scmContext.updateSafeModeStatus( |
| new SCMSafeModeManager.SafeModeStatus(true, true)); |
| Assert.assertTrue(pipelineManager.getSafeModeStatus()); |
| Assert.assertTrue(pipelineManager.isPipelineCreationAllowed()); |
| |
| // Then also turn safemode off |
| scmContext.updateSafeModeStatus( |
| new SCMSafeModeManager.SafeModeStatus(false, true)); |
| Assert.assertFalse(pipelineManager.getSafeModeStatus()); |
| Assert.assertTrue(pipelineManager.isPipelineCreationAllowed()); |
| pipelineManager.close(); |
| } |
| |
| @Test |
| public void testAddContainerWithClosedPipeline() throws Exception { |
| GenericTestUtils.LogCapturer logCapturer = GenericTestUtils.LogCapturer. |
| captureLogs(LoggerFactory.getLogger(PipelineStateMap.class)); |
| SCMHADBTransactionBuffer buffer = new SCMHADBTransactionBufferStub(dbStore); |
| PipelineManagerImpl pipelineManager = |
| createPipelineManager(true, buffer); |
| Table<PipelineID, Pipeline> pipelineStore = |
| SCMDBDefinition.PIPELINES.getTable(dbStore); |
| Pipeline pipeline = pipelineManager.createPipeline( |
| RatisReplicationConfig |
| .getInstance(HddsProtos.ReplicationFactor.THREE)); |
| PipelineID pipelineID = pipeline.getId(); |
| pipelineManager.addContainerToPipeline(pipelineID, ContainerID.valueOf(1)); |
| pipelineManager.getStateManager().updatePipelineState( |
| pipelineID.getProtobuf(), HddsProtos.PipelineState.PIPELINE_CLOSED); |
| buffer.flush(); |
| Assert.assertTrue(pipelineStore.get(pipelineID).isClosed()); |
| pipelineManager.addContainerToPipelineSCMStart(pipelineID, |
| ContainerID.valueOf(2)); |
| assertTrue(logCapturer.getOutput().contains("Container " + |
| ContainerID.valueOf(2) + " in open state for pipeline=" + |
| pipelineID + " in closed state")); |
| } |
| |
| @Test |
| public void testPipelineCloseFlow() throws IOException { |
| GenericTestUtils.LogCapturer logCapturer = GenericTestUtils.LogCapturer |
| .captureLogs(LoggerFactory.getLogger(PipelineManagerImpl.class)); |
| PipelineManagerImpl pipelineManager = createPipelineManager(true); |
| pipelineManager.setScmContext(scmContext); |
| Pipeline pipeline = pipelineManager.createPipeline( |
| RatisReplicationConfig |
| .getInstance(HddsProtos.ReplicationFactor.THREE)); |
| PipelineID pipelineID = pipeline.getId(); |
| ContainerManager containerManager = scm.getContainerManager(); |
| ContainerInfo containerInfo = HddsTestUtils. |
| getContainer(HddsProtos.LifeCycleState.CLOSED, pipelineID); |
| ContainerID containerID = containerInfo.containerID(); |
| //Add Container to ContainerMap |
| containerManager.getContainerStateManager(). |
| addContainer(containerInfo.getProtobuf()); |
| //Add Container to PipelineStateMap |
| pipelineManager.addContainerToPipeline(pipelineID, containerID); |
| pipelineManager.closePipeline(pipeline, false); |
| String containerExpectedOutput = "Container " + containerID + |
| " closed for pipeline=" + pipelineID; |
| String pipelineExpectedOutput = |
| "Pipeline " + pipeline + " moved to CLOSED state"; |
| String logOutput = logCapturer.getOutput(); |
| assertTrue(logOutput.contains(containerExpectedOutput)); |
| assertTrue(logOutput.contains(pipelineExpectedOutput)); |
| |
| int containerLogIdx = logOutput.indexOf(containerExpectedOutput); |
| int pipelineLogIdx = logOutput.indexOf(pipelineExpectedOutput); |
| assertTrue(containerLogIdx < pipelineLogIdx); |
| } |
| |
| @Test |
| public void testCreatePipelineForRead() throws IOException { |
| PipelineManager pipelineManager = createPipelineManager(true); |
| List<DatanodeDetails> dns = nodeManager |
| .getNodes(NodeStatus.inServiceHealthy()) |
| .stream() |
| .limit(3) |
| .collect(Collectors.toList()); |
| Set<ContainerReplica> replicas = createContainerReplicasList(dns); |
| Pipeline pipeline = pipelineManager.createPipelineForRead( |
| RatisReplicationConfig.getInstance(ReplicationFactor.THREE), replicas); |
| Assert.assertEquals(3, pipeline.getNodes().size()); |
| for (DatanodeDetails dn : pipeline.getNodes()) { |
| Assert.assertTrue(dns.contains(dn)); |
| } |
| } |
| |
| private Set<ContainerReplica> createContainerReplicasList( |
| List <DatanodeDetails> dns) { |
| Set<ContainerReplica> replicas = new HashSet<>(); |
| for (DatanodeDetails dn : dns) { |
| ContainerReplica r = ContainerReplica.newBuilder() |
| .setBytesUsed(1) |
| .setContainerID(ContainerID.valueOf(1)) |
| .setContainerState(StorageContainerDatanodeProtocolProtos |
| .ContainerReplicaProto.State.CLOSED) |
| .setKeyCount(1) |
| .setOriginNodeId(UUID.randomUUID()) |
| .setSequenceId(1) |
| .setReplicaIndex(0) |
| .setDatanodeDetails(dn) |
| .build(); |
| replicas.add(r); |
| } |
| return replicas; |
| } |
| |
| private void sendPipelineReport( |
| DatanodeDetails dn, Pipeline pipeline, |
| PipelineReportHandler pipelineReportHandler, |
| boolean isLeader) { |
| SCMDatanodeHeartbeatDispatcher.PipelineReportFromDatanode report = |
| HddsTestUtils.getPipelineReportFromDatanode(dn, pipeline.getId(), |
| isLeader); |
| pipelineReportHandler.onMessage(report, new EventQueue()); |
| } |
| } |