blob: 49545b5a66249be38c1553cbfc80b7e872461963 [file] [log] [blame]
/**
* Licensed to the Apache Software Foundation (ASF) under one
* or more contributor license agreements. See the NOTICE file
* distributed with this work for additional information
* regarding copyright ownership. The ASF licenses this file
* to you under the Apache License, Version 2.0 (the
* "License"); you may not use this file except in compliance
* with the License. You may obtain a copy of the License at
* <p>
* http://www.apache.org/licenses/LICENSE-2.0
* <p>
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/
package org.apache.hadoop.hdds.scm.pipeline;
import java.io.File;
import java.io.IOException;
import java.util.ArrayList;
import java.util.Arrays;
import java.util.HashMap;
import java.util.HashSet;
import java.util.List;
import java.util.Map;
import java.util.NoSuchElementException;
import java.util.Set;
import java.util.UUID;
import java.util.concurrent.TimeUnit;
import java.util.concurrent.TimeoutException;
import java.util.stream.Collectors;
import org.apache.hadoop.fs.FileUtil;
import org.apache.hadoop.hdds.HddsConfigKeys;
import org.apache.hadoop.hdds.conf.OzoneConfiguration;
import org.apache.hadoop.hdds.protocol.DatanodeDetails;
import org.apache.hadoop.hdds.protocol.proto.HddsProtos;
import org.apache.hadoop.hdds.scm.TestUtils;
import org.apache.hadoop.hdds.scm.container.ContainerID;
import org.apache.hadoop.hdds.scm.container.MockNodeManager;
import org.apache.hadoop.hdds.scm.exceptions.SCMException;
import org.apache.hadoop.hdds.scm.metadata.PipelineIDCodec;
import org.apache.hadoop.hdds.scm.metadata.SCMMetadataStore;
import org.apache.hadoop.hdds.scm.metadata.SCMMetadataStoreImpl;
import org.apache.hadoop.hdds.scm.node.NodeStatus;
import org.apache.hadoop.hdds.scm.safemode.SCMSafeModeManager;
import org.apache.hadoop.hdds.scm.server.SCMDatanodeHeartbeatDispatcher.PipelineReportFromDatanode;
import org.apache.hadoop.hdds.server.events.EventQueue;
import org.apache.hadoop.hdds.utils.db.Table;
import org.apache.hadoop.hdds.utils.db.Table.KeyValue;
import org.apache.hadoop.hdds.utils.db.TableIterator;
import org.apache.hadoop.metrics2.MetricsRecordBuilder;
import org.apache.hadoop.test.GenericTestUtils;
import com.google.common.base.Supplier;
import static org.apache.hadoop.hdds.scm.ScmConfigKeys.OZONE_DATANODE_PIPELINE_LIMIT;
import static org.apache.hadoop.hdds.scm.ScmConfigKeys.OZONE_SCM_PIPELINE_ALLOCATED_TIMEOUT;
import static org.apache.hadoop.test.MetricsAsserts.getLongCounter;
import static org.apache.hadoop.test.MetricsAsserts.getMetrics;
import org.apache.hadoop.test.GenericTestUtils.LogCapturer;
import org.junit.After;
import org.junit.Assert;
import static org.junit.Assert.assertEquals;
import static org.junit.Assert.fail;
import org.junit.Before;
import org.junit.Test;
import org.junit.rules.TemporaryFolder;
import org.mockito.InOrder;
import static org.mockito.Mockito.doReturn;
import static org.mockito.Mockito.inOrder;
import static org.mockito.Mockito.mock;
import static org.mockito.Mockito.never;
import static org.mockito.Mockito.verify;
import static org.mockito.Mockito.when;
import static org.slf4j.event.Level.INFO;
/**
* Test cases to verify PipelineManager.
*/
public class TestSCMPipelineManager {
private static MockNodeManager nodeManager;
private static File testDir;
private static OzoneConfiguration conf;
private static SCMMetadataStore scmMetadataStore;
@Before
public void setUp() throws Exception {
conf = new OzoneConfiguration();
conf.setInt(OZONE_DATANODE_PIPELINE_LIMIT, 1);
testDir = GenericTestUtils
.getTestDir(TestSCMPipelineManager.class.getSimpleName());
conf.set(HddsConfigKeys.OZONE_METADATA_DIRS, testDir.getAbsolutePath());
conf.setBoolean(HddsConfigKeys.HDDS_SCM_SAFEMODE_PIPELINE_CREATION, false);
boolean folderExisted = testDir.exists() || testDir.mkdirs();
if (!folderExisted) {
throw new IOException("Unable to create test directory path");
}
nodeManager = new MockNodeManager(true, 20);
scmMetadataStore = new SCMMetadataStoreImpl(conf);
}
@After
public void cleanup() throws Exception {
scmMetadataStore.getStore().close();
FileUtil.fullyDelete(testDir);
}
@Test
public void testPipelineReload() throws IOException {
SCMPipelineManager pipelineManager =
new SCMPipelineManager(conf,
nodeManager,
scmMetadataStore.getPipelineTable(),
new EventQueue());
pipelineManager.allowPipelineCreation();
PipelineProvider mockRatisProvider =
new MockRatisPipelineProvider(nodeManager,
pipelineManager.getStateManager(), conf);
pipelineManager.setPipelineProvider(HddsProtos.ReplicationType.RATIS,
mockRatisProvider);
int pipelineNum = 5;
Set<Pipeline> pipelines = new HashSet<>();
for (int i = 0; i < pipelineNum; i++) {
Pipeline pipeline = pipelineManager
.createPipeline(HddsProtos.ReplicationType.RATIS,
HddsProtos.ReplicationFactor.THREE);
pipelineManager.openPipeline(pipeline.getId());
pipelines.add(pipeline);
}
pipelineManager.close();
// new pipeline manager should be able to load the pipelines from the db
pipelineManager =
new SCMPipelineManager(conf, nodeManager,
scmMetadataStore.getPipelineTable(), new EventQueue());
pipelineManager.allowPipelineCreation();
mockRatisProvider =
new MockRatisPipelineProvider(nodeManager,
pipelineManager.getStateManager(), conf);
pipelineManager.setPipelineProvider(HddsProtos.ReplicationType.RATIS,
mockRatisProvider);
for (Pipeline p : pipelines) {
// After reload, pipelines should be in open state
Assert.assertTrue(pipelineManager.getPipeline(p.getId()).isOpen());
}
List<Pipeline> pipelineList =
pipelineManager.getPipelines(HddsProtos.ReplicationType.RATIS);
Assert.assertEquals(pipelines, new HashSet<>(pipelineList));
Set<Set<DatanodeDetails>> originalPipelines = pipelineList.stream()
.map(Pipeline::getNodeSet).collect(Collectors.toSet());
Set<Set<DatanodeDetails>> reloadedPipelineHash = pipelines.stream()
.map(Pipeline::getNodeSet).collect(Collectors.toSet());
Assert.assertEquals(reloadedPipelineHash, originalPipelines);
Assert.assertEquals(pipelineNum, originalPipelines.size());
// clean up
for (Pipeline pipeline : pipelines) {
pipelineManager.finalizeAndDestroyPipeline(pipeline, false);
}
pipelineManager.close();
}
@Test
public void testRemovePipeline() throws IOException {
SCMPipelineManager pipelineManager =
new SCMPipelineManager(conf, nodeManager,
scmMetadataStore.getPipelineTable(), new EventQueue());
pipelineManager.allowPipelineCreation();
PipelineProvider mockRatisProvider =
new MockRatisPipelineProvider(nodeManager,
pipelineManager.getStateManager(), conf);
pipelineManager.setPipelineProvider(HddsProtos.ReplicationType.RATIS,
mockRatisProvider);
Pipeline pipeline = pipelineManager
.createPipeline(HddsProtos.ReplicationType.RATIS,
HddsProtos.ReplicationFactor.THREE);
pipelineManager.openPipeline(pipeline.getId());
pipelineManager
.addContainerToPipeline(pipeline.getId(), ContainerID.valueof(1));
pipelineManager.finalizeAndDestroyPipeline(pipeline, false);
pipelineManager.close();
// new pipeline manager should not be able to load removed pipelines
pipelineManager =
new SCMPipelineManager(conf, nodeManager,
scmMetadataStore.getPipelineTable(), new EventQueue());
try {
pipelineManager.getPipeline(pipeline.getId());
fail("Pipeline should not have been retrieved");
} catch (IOException e) {
Assert.assertTrue(e.getMessage().contains("not found"));
}
// clean up
pipelineManager.close();
}
@Test
public void testPipelineReport() throws IOException {
EventQueue eventQueue = new EventQueue();
SCMPipelineManager pipelineManager =
new SCMPipelineManager(conf, nodeManager,
scmMetadataStore.getPipelineTable(), eventQueue);
pipelineManager.allowPipelineCreation();
PipelineProvider mockRatisProvider =
new MockRatisPipelineProvider(nodeManager,
pipelineManager.getStateManager(), conf);
pipelineManager.setPipelineProvider(HddsProtos.ReplicationType.RATIS,
mockRatisProvider);
SCMSafeModeManager scmSafeModeManager =
new SCMSafeModeManager(conf, new ArrayList<>(), pipelineManager,
eventQueue);
// create a pipeline in allocated state with no dns yet reported
Pipeline pipeline = pipelineManager
.createPipeline(HddsProtos.ReplicationType.RATIS,
HddsProtos.ReplicationFactor.THREE);
Assert
.assertFalse(pipelineManager.getPipeline(pipeline.getId()).isHealthy());
Assert
.assertFalse(pipelineManager.getPipeline(pipeline.getId()).isOpen());
// pipeline is not healthy until all dns report
List<DatanodeDetails> nodes = pipeline.getNodes();
Assert.assertFalse(
pipelineManager.getPipeline(pipeline.getId()).isHealthy());
// get pipeline report from each dn in the pipeline
PipelineReportHandler pipelineReportHandler =
new PipelineReportHandler(scmSafeModeManager, pipelineManager, conf);
nodes.subList(0, 2).forEach(dn -> sendPipelineReport(dn, pipeline,
pipelineReportHandler, false, eventQueue));
sendPipelineReport(nodes.get(nodes.size() - 1), pipeline,
pipelineReportHandler, true, eventQueue);
// pipeline is healthy when all dns report
Assert
.assertTrue(pipelineManager.getPipeline(pipeline.getId()).isHealthy());
// pipeline should now move to open state
Assert
.assertTrue(pipelineManager.getPipeline(pipeline.getId()).isOpen());
// close the pipeline
pipelineManager.finalizeAndDestroyPipeline(pipeline, false);
// pipeline report for destroyed pipeline should be ignored
nodes.subList(0, 2).forEach(dn -> sendPipelineReport(dn, pipeline,
pipelineReportHandler, false, eventQueue));
sendPipelineReport(nodes.get(nodes.size() - 1), pipeline,
pipelineReportHandler, true, eventQueue);
try {
pipelineManager.getPipeline(pipeline.getId());
fail("Pipeline should not have been retrieved");
} catch (IOException e) {
Assert.assertTrue(e.getMessage().contains("not found"));
}
// clean up
pipelineManager.close();
}
@Test
public void testPipelineCreationFailedMetric() throws Exception {
MockNodeManager nodeManagerMock = new MockNodeManager(true,
20);
SCMPipelineManager pipelineManager =
new SCMPipelineManager(conf, nodeManagerMock,
scmMetadataStore.getPipelineTable(), new EventQueue());
pipelineManager.allowPipelineCreation();
nodeManagerMock.setNumPipelinePerDatanode(1);
PipelineProvider mockRatisProvider =
new MockRatisPipelineProvider(nodeManagerMock,
pipelineManager.getStateManager(), conf);
pipelineManager.setPipelineProvider(HddsProtos.ReplicationType.RATIS,
mockRatisProvider);
MetricsRecordBuilder metrics = getMetrics(
SCMPipelineMetrics.class.getSimpleName());
long numPipelineAllocated = getLongCounter("NumPipelineAllocated",
metrics);
Assert.assertEquals(0, numPipelineAllocated);
// 3 DNs are unhealthy.
// Create 5 pipelines (Use up 15 Datanodes)
for (int i = 0; i < 5; i++) {
Pipeline pipeline = pipelineManager
.createPipeline(HddsProtos.ReplicationType.RATIS,
HddsProtos.ReplicationFactor.THREE);
Assert.assertNotNull(pipeline);
}
metrics = getMetrics(
SCMPipelineMetrics.class.getSimpleName());
numPipelineAllocated = getLongCounter("NumPipelineAllocated", metrics);
Assert.assertEquals(5, numPipelineAllocated);
long numPipelineCreateFailed = getLongCounter(
"NumPipelineCreationFailed", metrics);
Assert.assertEquals(0, numPipelineCreateFailed);
LogCapturer logs = LogCapturer.captureLogs(SCMPipelineManager.getLog());
GenericTestUtils.setLogLevel(SCMPipelineManager.getLog(), INFO);
//This should fail...
try {
pipelineManager.createPipeline(HddsProtos.ReplicationType.RATIS,
HddsProtos.ReplicationFactor.THREE);
fail();
} catch (SCMException ioe) {
// pipeline creation failed this time.
Assert.assertEquals(SCMException.ResultCodes.FAILED_TO_FIND_SUITABLE_NODE,
ioe.getResult());
Assert.assertFalse(logs.getOutput().contains(
"Failed to create pipeline of type"));
} finally {
logs.stopCapturing();
}
metrics = getMetrics(
SCMPipelineMetrics.class.getSimpleName());
numPipelineAllocated = getLongCounter("NumPipelineAllocated", metrics);
Assert.assertEquals(5, numPipelineAllocated);
numPipelineCreateFailed = getLongCounter(
"NumPipelineCreationFailed", metrics);
Assert.assertEquals(1, numPipelineCreateFailed);
// clean up
pipelineManager.close();
}
@Test
public void testPipelineLimit() throws Exception {
int numMetaDataVolumes = 2;
final OzoneConfiguration config = new OzoneConfiguration();
config.set(HddsConfigKeys.OZONE_METADATA_DIRS, testDir.getAbsolutePath());
config.setBoolean(HddsConfigKeys.HDDS_SCM_SAFEMODE_PIPELINE_CREATION,
false);
// turning off this config will ensure, pipeline creation is determined by
// metadata volume count.
config.setInt(OZONE_DATANODE_PIPELINE_LIMIT, 0);
MockNodeManager nodeManagerMock = new MockNodeManager(true,
3);
nodeManagerMock.setNumMetaDataVolumes(numMetaDataVolumes);
int pipelinePerDn = numMetaDataVolumes *
MockNodeManager.NUM_PIPELINE_PER_METADATA_DISK;
nodeManagerMock.setNumPipelinePerDatanode(pipelinePerDn);
SCMPipelineManager pipelineManager =
new SCMPipelineManager(config, nodeManagerMock,
scmMetadataStore.getPipelineTable(), new EventQueue());
pipelineManager.allowPipelineCreation();
PipelineProvider mockRatisProvider =
new MockRatisPipelineProvider(nodeManagerMock,
pipelineManager.getStateManager(), config);
pipelineManager.setPipelineProvider(HddsProtos.ReplicationType.RATIS,
mockRatisProvider);
MetricsRecordBuilder metrics = getMetrics(
SCMPipelineMetrics.class.getSimpleName());
long numPipelineAllocated = getLongCounter("NumPipelineAllocated",
metrics);
Assert.assertEquals(0, numPipelineAllocated);
// one node pipeline creation will not be accounted for
// pipeline limit determination
pipelineManager.createPipeline(HddsProtos.ReplicationType.RATIS,
HddsProtos.ReplicationFactor.ONE);
// max limit on no of pipelines is 4
for (int i = 0; i < pipelinePerDn; i++) {
Pipeline pipeline = pipelineManager
.createPipeline(HddsProtos.ReplicationType.RATIS,
HddsProtos.ReplicationFactor.THREE);
Assert.assertNotNull(pipeline);
}
metrics = getMetrics(
SCMPipelineMetrics.class.getSimpleName());
numPipelineAllocated = getLongCounter("NumPipelineAllocated", metrics);
Assert.assertEquals(5, numPipelineAllocated);
long numPipelineCreateFailed = getLongCounter(
"NumPipelineCreationFailed", metrics);
Assert.assertEquals(0, numPipelineCreateFailed);
//This should fail...
try {
pipelineManager.createPipeline(HddsProtos.ReplicationType.RATIS,
HddsProtos.ReplicationFactor.THREE);
fail();
} catch (SCMException ioe) {
// pipeline creation failed this time.
Assert.assertEquals(SCMException.ResultCodes.FAILED_TO_FIND_SUITABLE_NODE,
ioe.getResult());
}
metrics = getMetrics(
SCMPipelineMetrics.class.getSimpleName());
numPipelineAllocated = getLongCounter("NumPipelineAllocated", metrics);
Assert.assertEquals(5, numPipelineAllocated);
numPipelineCreateFailed = getLongCounter(
"NumPipelineCreationFailed", metrics);
Assert.assertEquals(1, numPipelineCreateFailed);
// clean up
pipelineManager.close();
}
@Test
public void testActivateDeactivatePipeline() throws IOException {
final SCMPipelineManager pipelineManager =
new SCMPipelineManager(conf, nodeManager,
scmMetadataStore.getPipelineTable(), new EventQueue());
pipelineManager.allowPipelineCreation();
final PipelineProvider mockRatisProvider =
new MockRatisPipelineProvider(nodeManager,
pipelineManager.getStateManager(), conf);
pipelineManager.setPipelineProvider(HddsProtos.ReplicationType.RATIS,
mockRatisProvider);
final Pipeline pipeline = pipelineManager
.createPipeline(HddsProtos.ReplicationType.RATIS,
HddsProtos.ReplicationFactor.THREE);
final PipelineID pid = pipeline.getId();
pipelineManager.openPipeline(pid);
pipelineManager.addContainerToPipeline(pid, ContainerID.valueof(1));
Assert.assertTrue(pipelineManager
.getPipelines(HddsProtos.ReplicationType.RATIS,
HddsProtos.ReplicationFactor.THREE,
Pipeline.PipelineState.OPEN).contains(pipeline));
Assert.assertEquals(Pipeline.PipelineState.OPEN,
pipelineManager.getPipeline(pid).getPipelineState());
pipelineManager.deactivatePipeline(pid);
Assert.assertEquals(Pipeline.PipelineState.DORMANT,
pipelineManager.getPipeline(pid).getPipelineState());
Assert.assertFalse(pipelineManager
.getPipelines(HddsProtos.ReplicationType.RATIS,
HddsProtos.ReplicationFactor.THREE,
Pipeline.PipelineState.OPEN).contains(pipeline));
pipelineManager.activatePipeline(pid);
Assert.assertTrue(pipelineManager
.getPipelines(HddsProtos.ReplicationType.RATIS,
HddsProtos.ReplicationFactor.THREE,
Pipeline.PipelineState.OPEN).contains(pipeline));
pipelineManager.close();
}
@Test
public void testPipelineOpenOnlyWhenLeaderReported() throws Exception {
EventQueue eventQueue = new EventQueue();
SCMPipelineManager pipelineManager =
new SCMPipelineManager(conf, nodeManager,
scmMetadataStore.getPipelineTable(), eventQueue);
pipelineManager.allowPipelineCreation();
PipelineProvider mockRatisProvider =
new MockRatisPipelineProvider(nodeManager,
pipelineManager.getStateManager(), conf);
pipelineManager.setPipelineProvider(HddsProtos.ReplicationType.RATIS,
mockRatisProvider);
pipelineManager.onMessage(
new SCMSafeModeManager.SafeModeStatus(true, true), null);
Pipeline pipeline = pipelineManager
.createPipeline(HddsProtos.ReplicationType.RATIS,
HddsProtos.ReplicationFactor.THREE);
// close manager
pipelineManager.close();
// new pipeline manager loads the pipelines from the db in ALLOCATED state
pipelineManager =
new SCMPipelineManager(conf, nodeManager,
scmMetadataStore.getPipelineTable(), eventQueue);
mockRatisProvider =
new MockRatisPipelineProvider(nodeManager,
pipelineManager.getStateManager(), conf);
pipelineManager.setPipelineProvider(HddsProtos.ReplicationType.RATIS,
mockRatisProvider);
Assert.assertEquals(Pipeline.PipelineState.ALLOCATED,
pipelineManager.getPipeline(pipeline.getId()).getPipelineState());
SCMSafeModeManager scmSafeModeManager =
new SCMSafeModeManager(new OzoneConfiguration(),
new ArrayList<>(), pipelineManager, eventQueue);
PipelineReportHandler pipelineReportHandler =
new PipelineReportHandler(scmSafeModeManager, pipelineManager, conf);
// Report pipelines with leaders
List<DatanodeDetails> nodes = pipeline.getNodes();
Assert.assertEquals(3, nodes.size());
// Send report for all but no leader
nodes.forEach(dn -> sendPipelineReport(dn, pipeline, pipelineReportHandler,
false, eventQueue));
Assert.assertEquals(Pipeline.PipelineState.ALLOCATED,
pipelineManager.getPipeline(pipeline.getId()).getPipelineState());
nodes.subList(0, 2).forEach(dn -> sendPipelineReport(dn, pipeline,
pipelineReportHandler, false, eventQueue));
sendPipelineReport(nodes.get(nodes.size() - 1), pipeline,
pipelineReportHandler, true, eventQueue);
Assert.assertEquals(Pipeline.PipelineState.OPEN,
pipelineManager.getPipeline(pipeline.getId()).getPipelineState());
pipelineManager.close();
}
@Test
public void testScrubPipeline() throws IOException {
// No timeout for pipeline scrubber.
conf.setTimeDuration(
OZONE_SCM_PIPELINE_ALLOCATED_TIMEOUT, -1,
TimeUnit.MILLISECONDS);
EventQueue eventQueue = new EventQueue();
final SCMPipelineManager pipelineManager =
new SCMPipelineManager(conf, nodeManager,
scmMetadataStore.getPipelineTable(), eventQueue);
pipelineManager.allowPipelineCreation();
final PipelineProvider ratisProvider = new MockRatisPipelineProvider(
nodeManager, pipelineManager.getStateManager(), conf, eventQueue,
false);
pipelineManager.setPipelineProvider(HddsProtos.ReplicationType.RATIS,
ratisProvider);
Pipeline pipeline = pipelineManager
.createPipeline(HddsProtos.ReplicationType.RATIS,
HddsProtos.ReplicationFactor.THREE);
// At this point, pipeline is not at OPEN stage.
Assert.assertEquals(Pipeline.PipelineState.ALLOCATED,
pipeline.getPipelineState());
// pipeline should be seen in pipelineManager as ALLOCATED.
Assert.assertTrue(pipelineManager
.getPipelines(HddsProtos.ReplicationType.RATIS,
HddsProtos.ReplicationFactor.THREE,
Pipeline.PipelineState.ALLOCATED).contains(pipeline));
pipelineManager.scrubPipeline(HddsProtos.ReplicationType.RATIS,
HddsProtos.ReplicationFactor.THREE);
// pipeline should be scrubbed.
Assert.assertFalse(pipelineManager
.getPipelines(HddsProtos.ReplicationType.RATIS,
HddsProtos.ReplicationFactor.THREE,
Pipeline.PipelineState.ALLOCATED).contains(pipeline));
pipelineManager.close();
}
@Test
public void testPipelineNotCreatedUntilSafeModePrecheck()
throws IOException, TimeoutException, InterruptedException {
// No timeout for pipeline scrubber.
conf.setTimeDuration(
OZONE_SCM_PIPELINE_ALLOCATED_TIMEOUT, -1,
TimeUnit.MILLISECONDS);
EventQueue eventQueue = new EventQueue();
SCMPipelineManager pipelineManager =
new SCMPipelineManager(conf, nodeManager,
scmMetadataStore.getPipelineTable(), eventQueue);
final PipelineProvider ratisProvider = new MockRatisPipelineProvider(
nodeManager, pipelineManager.getStateManager(), conf, eventQueue,
false);
pipelineManager.setPipelineProvider(HddsProtos.ReplicationType.RATIS,
ratisProvider);
try {
Pipeline pipeline = pipelineManager
.createPipeline(HddsProtos.ReplicationType.RATIS,
HddsProtos.ReplicationFactor.THREE);
fail("Pipelines should not have been created");
} catch (IOException e) {
// expected
}
// Ensure a pipeline of factor ONE can be created - no exceptions should be
// raised.
Pipeline pipeline = pipelineManager
.createPipeline(HddsProtos.ReplicationType.RATIS,
HddsProtos.ReplicationFactor.ONE);
// Simulate safemode check exiting.
pipelineManager.onMessage(
new SCMSafeModeManager.SafeModeStatus(true, true), null);
GenericTestUtils.waitFor(new Supplier<Boolean>() {
@Override
public Boolean get() {
return pipelineManager.getPipelines().size() != 0;
}
}, 100, 10000);
pipelineManager.close();
}
@Test
public void testSafeModeUpdatedOnSafemodeExit()
throws IOException, TimeoutException, InterruptedException {
// No timeout for pipeline scrubber.
conf.setTimeDuration(
OZONE_SCM_PIPELINE_ALLOCATED_TIMEOUT, -1,
TimeUnit.MILLISECONDS);
EventQueue eventQueue = new EventQueue();
SCMPipelineManager pipelineManager =
new SCMPipelineManager(conf, nodeManager,
scmMetadataStore.getPipelineTable(), eventQueue);
final PipelineProvider ratisProvider = new MockRatisPipelineProvider(
nodeManager, pipelineManager.getStateManager(), conf, eventQueue,
false);
pipelineManager.setPipelineProvider(HddsProtos.ReplicationType.RATIS,
ratisProvider);
assertEquals(true, pipelineManager.getSafeModeStatus());
assertEquals(false, pipelineManager.isPipelineCreationAllowed());
// First pass pre-check as true, but safemode still on
pipelineManager.onMessage(
new SCMSafeModeManager.SafeModeStatus(true, true), null);
assertEquals(true, pipelineManager.getSafeModeStatus());
assertEquals(true, pipelineManager.isPipelineCreationAllowed());
// Then also turn safemode off
pipelineManager.onMessage(
new SCMSafeModeManager.SafeModeStatus(false, true), null);
assertEquals(false, pipelineManager.getSafeModeStatus());
assertEquals(true, pipelineManager.isPipelineCreationAllowed());
pipelineManager.close();
}
/**
* This test was created for HDDS-3925 to check whether the db handling is
* proper at the SCMPipelineManager level. We should remove this test
* when we remove the key swap from the SCMPipelineManager code.
*
* The test emulates internally the values that the iterator will provide
* back to the check-fix code path. The iterator internally deserialize the
* key stored in RocksDB using the PipelineIDCodec. The older version of the
* codec serialized the PipelineIDs by taking the byte[] representation of
* the protobuf representation of the PipelineID, and deserialization was not
* implemented.
*
* In order to be able to check and fix the change, the deserialization was
* introduced, and deserialisation of the old protobuf byte representation
* with the new deserialization logic of the keys are
* checked against the PipelineID serialized in the value as well via
* protobuf.
* The DB is storing the keys now based on a byte[] serialized from the UUID
* inside the PipelineID.
* For this we emulate the getKey of the KeyValue returned by the
* iterator to return a PipelineID that is deserialized from the byte[]
* representation of the protobuf representation of the PipelineID in the
* test, as that would be the value we get from the iterator when iterating
* through a table with the old key format.
*
* @throws Exception when something goes wrong
*/
@Test
public void testPipelineDBKeyFormatChange() throws Exception {
Pipeline p1 = pipelineStub();
Pipeline p2 = pipelineStub();
Pipeline p3 = pipelineStub();
TableIterator<PipelineID, KeyValue<PipelineID, Pipeline>> iteratorMock =
mock(TableIterator.class);
KeyValue<PipelineID, Pipeline> kv1 =
mockKeyValueToProvideOldKeyFormat(p1);
KeyValue<PipelineID, Pipeline> kv2 =
mockKeyValueToProvideNormalFormat(p2);
KeyValue<PipelineID, Pipeline> kv3 =
mockKeyValueToProvideOldKeyFormat(p3);
when(iteratorMock.next())
.thenReturn(kv1, kv2, kv3)
.thenThrow(new NoSuchElementException());
when(iteratorMock.hasNext())
.thenReturn(true, true, true, false);
Table<PipelineID, Pipeline> pipelineStore = mock(Table.class);
doReturn(iteratorMock).when(pipelineStore).iterator();
when(pipelineStore.isEmpty()).thenReturn(false);
InOrder inorderVerifier = inOrder(pipelineStore, iteratorMock);
new SCMPipelineManager(conf, nodeManager, pipelineStore, new EventQueue());
inorderVerifier.verify(iteratorMock).removeFromDB();
inorderVerifier.verify(pipelineStore).put(p1.getId(), p1);
inorderVerifier.verify(iteratorMock).removeFromDB();
inorderVerifier.verify(pipelineStore).put(p3.getId(), p3);
verify(pipelineStore, never()).put(p2.getId(), p2);
}
@Test
public void testScmWithPipelineDBKeyFormatChange() throws Exception {
TemporaryFolder tempDir = new TemporaryFolder();
tempDir.create();
File dir = tempDir.newFolder();
conf.set(HddsConfigKeys.OZONE_METADATA_DIRS, dir.getAbsolutePath());
SCMMetadataStore scmDbWithOldKeyFormat = null;
Map<UUID, Pipeline> oldPipelines = new HashMap<>();
try {
scmDbWithOldKeyFormat =
new TestSCMStoreImplWithOldPipelineIDKeyFormat(conf);
// Create 3 pipelines.
for (int i = 0; i < 3; i++) {
Pipeline pipeline = pipelineStub();
scmDbWithOldKeyFormat.getPipelineTable()
.put(pipeline.getId(), pipeline);
oldPipelines.put(pipeline.getId().getId(), pipeline);
}
} finally {
if (scmDbWithOldKeyFormat != null) {
scmDbWithOldKeyFormat.stop();
}
}
LogCapturer logCapturer =
LogCapturer.captureLogs(SCMPipelineManager.getLog());
// Create SCMPipelineManager with new DBDefinition.
SCMMetadataStore newScmMetadataStore = null;
try {
newScmMetadataStore = new SCMMetadataStoreImpl(conf);
SCMPipelineManager pipelineManager = new SCMPipelineManager(conf,
nodeManager,
newScmMetadataStore.getPipelineTable(),
new EventQueue());
waitForLog(logCapturer);
assertEquals(3, pipelineManager.getPipelines().size());
oldPipelines.values().forEach(p ->
pipelineManager.containsPipeline(p.getId()));
} finally {
newScmMetadataStore.stop();
}
// Mimicking another restart.
try {
logCapturer.clearOutput();
newScmMetadataStore = new SCMMetadataStoreImpl(conf);
SCMPipelineManager pipelineManager = new SCMPipelineManager(conf,
nodeManager,
newScmMetadataStore.getPipelineTable(),
new EventQueue());
try {
waitForLog(logCapturer);
Assert.fail("Unexpected log: " + logCapturer.getOutput());
} catch (TimeoutException ex) {
Assert.assertTrue(ex.getMessage().contains("Timed out"));
}
assertEquals(3, pipelineManager.getPipelines().size());
oldPipelines.values().forEach(p ->
pipelineManager.containsPipeline(p.getId()));
} finally {
newScmMetadataStore.stop();
}
}
private static void waitForLog(LogCapturer logCapturer)
throws TimeoutException, InterruptedException {
GenericTestUtils.waitFor(() -> logCapturer.getOutput()
.contains("Found pipeline in old format key"),
1000, 5000);
}
private Pipeline pipelineStub() {
return Pipeline.newBuilder()
.setId(PipelineID.randomId())
.setType(HddsProtos.ReplicationType.RATIS)
.setFactor(HddsProtos.ReplicationFactor.ONE)
.setState(Pipeline.PipelineState.OPEN)
.setNodes(
Arrays.asList(
nodeManager.getNodes(NodeStatus.inServiceHealthy()).get(0)
)
)
.setNodesInOrder(Arrays.asList(0))
.build();
}
private KeyValue<PipelineID, Pipeline>
mockKeyValueToProvideOldKeyFormat(Pipeline pipeline)
throws IOException {
KeyValue<PipelineID, Pipeline> kv = mock(KeyValue.class);
when(kv.getValue()).thenReturn(pipeline);
when(kv.getKey())
.thenReturn(
new PipelineIDCodec().fromPersistedFormat(
pipeline.getId().getProtobuf().toByteArray()
));
return kv;
}
private KeyValue<PipelineID, Pipeline>
mockKeyValueToProvideNormalFormat(Pipeline pipeline)
throws IOException {
KeyValue<PipelineID, Pipeline> kv = mock(KeyValue.class);
when(kv.getValue()).thenReturn(pipeline);
when(kv.getKey()).thenReturn(pipeline.getId());
return kv;
}
private void sendPipelineReport(DatanodeDetails dn,
Pipeline pipeline, PipelineReportHandler pipelineReportHandler,
boolean isLeader, EventQueue eventQueue) {
PipelineReportFromDatanode report =
TestUtils.getPipelineReportFromDatanode(dn, pipeline.getId(), isLeader);
pipelineReportHandler.onMessage(report, eventQueue);
}
}