| /* |
| * Licensed to the Apache Software Foundation (ASF) under one or more |
| * contributor license agreements. See the NOTICE file distributed with this |
| * work for additional information regarding copyright ownership. The ASF |
| * licenses this file to you under the Apache License, Version 2.0 (the |
| * "License"); you may not use this file except in compliance with the License. |
| * You may obtain a copy of the License at |
| * |
| * http://www.apache.org/licenses/LICENSE-2.0 |
| * |
| * Unless required by applicable law or agreed to in writing, software |
| * distributed under the License is distributed on an "AS IS" BASIS, WITHOUT |
| * WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. See the |
| * License for the specific language governing permissions and limitations under |
| * the License. |
| * |
| */ |
| |
| package org.apache.hadoop.hdds.upgrade; |
| |
| import static java.lang.Thread.sleep; |
| import static java.nio.charset.StandardCharsets.UTF_8; |
| import static org.apache.hadoop.hdds.HddsConfigKeys.HDDS_PIPELINE_REPORT_INTERVAL; |
| import static org.apache.hadoop.hdds.protocol.datanode.proto.ContainerProtos.ContainerDataProto.State.CLOSED; |
| import static org.apache.hadoop.hdds.protocol.datanode.proto.ContainerProtos.ContainerDataProto.State.QUASI_CLOSED; |
| import static org.apache.hadoop.hdds.protocol.proto.HddsProtos.NodeState.HEALTHY; |
| import static org.apache.hadoop.hdds.protocol.proto.HddsProtos.NodeState.HEALTHY_READONLY; |
| import static org.apache.hadoop.hdds.scm.ScmConfigKeys.OZONE_DATANODE_PIPELINE_LIMIT; |
| import static org.apache.hadoop.hdds.scm.pipeline.Pipeline.PipelineState.OPEN; |
| import static org.apache.hadoop.hdds.upgrade.HDDSLayoutFeature.INITIAL_VERSION; |
| import static org.apache.hadoop.ozone.upgrade.InjectedUpgradeFinalizationExecutor.UpgradeTestInjectionPoints.AFTER_COMPLETE_FINALIZATION; |
| import static org.apache.hadoop.ozone.upgrade.InjectedUpgradeFinalizationExecutor.UpgradeTestInjectionPoints.AFTER_POST_FINALIZE_UPGRADE; |
| import static org.apache.hadoop.ozone.upgrade.InjectedUpgradeFinalizationExecutor.UpgradeTestInjectionPoints.AFTER_PRE_FINALIZE_UPGRADE; |
| import static org.apache.hadoop.ozone.upgrade.InjectedUpgradeFinalizationExecutor.UpgradeTestInjectionPoints.BEFORE_PRE_FINALIZE_UPGRADE; |
| import static org.apache.hadoop.ozone.upgrade.UpgradeFinalizer.Status.ALREADY_FINALIZED; |
| import static org.apache.hadoop.ozone.upgrade.UpgradeFinalizer.Status.FINALIZATION_DONE; |
| import static org.apache.hadoop.ozone.upgrade.UpgradeFinalizer.Status.FINALIZATION_REQUIRED; |
| import static org.apache.hadoop.ozone.upgrade.UpgradeFinalizer.Status.STARTING_FINALIZATION; |
| |
| import java.io.IOException; |
| import java.util.ArrayList; |
| import java.util.HashMap; |
| import java.util.Iterator; |
| import java.util.List; |
| import java.util.Set; |
| import java.util.concurrent.TimeUnit; |
| import java.util.concurrent.TimeoutException; |
| import java.util.stream.Collectors; |
| import java.util.concurrent.atomic.AtomicBoolean; |
| |
| import org.apache.hadoop.hdds.client.ReplicationConfig; |
| import org.apache.hadoop.hdds.client.ReplicationFactor; |
| import org.apache.hadoop.hdds.client.ReplicationType; |
| import org.apache.hadoop.hdds.conf.OzoneConfiguration; |
| import org.apache.hadoop.hdds.protocol.DatanodeDetails; |
| import org.apache.hadoop.hdds.protocol.datanode.proto.ContainerProtos; |
| import org.apache.hadoop.hdds.protocol.proto.HddsProtos; |
| import org.apache.hadoop.hdds.protocol.proto.HddsProtos.NodeState; |
| import org.apache.hadoop.hdds.scm.XceiverClientManager; |
| import org.apache.hadoop.hdds.scm.XceiverClientSpi; |
| import org.apache.hadoop.hdds.scm.container.ContainerInfo; |
| import org.apache.hadoop.hdds.scm.container.ContainerManagerV2; |
| import org.apache.hadoop.hdds.scm.exceptions.SCMException; |
| import org.apache.hadoop.hdds.scm.node.states.NodeNotFoundException; |
| import org.apache.hadoop.hdds.scm.pipeline.Pipeline; |
| import org.apache.hadoop.hdds.scm.pipeline.PipelineID; |
| import org.apache.hadoop.hdds.scm.pipeline.PipelineManager; |
| import org.apache.hadoop.hdds.scm.server.StorageContainerManager; |
| import org.apache.hadoop.hdds.scm.storage.ContainerProtocolCalls; |
| import org.apache.hadoop.ozone.HddsDatanodeService; |
| import org.apache.hadoop.ozone.MiniOzoneCluster; |
| import org.apache.hadoop.ozone.client.ObjectStore; |
| import org.apache.hadoop.ozone.client.OzoneClient; |
| import org.apache.hadoop.ozone.client.OzoneClientFactory; |
| import org.apache.hadoop.ozone.client.io.OzoneOutputStream; |
| import org.apache.hadoop.ozone.container.common.interfaces.Container; |
| import org.apache.hadoop.ozone.container.common.statemachine.DatanodeStateMachine; |
| import org.apache.hadoop.ozone.upgrade.BasicUpgradeFinalizer; |
| import org.apache.hadoop.ozone.upgrade.InjectedUpgradeFinalizationExecutor; |
| import org.apache.hadoop.ozone.upgrade.InjectedUpgradeFinalizationExecutor.UpgradeTestInjectionPoints; |
| import org.apache.hadoop.ozone.upgrade.UpgradeFinalizer.StatusAndMessages; |
| import org.apache.hadoop.security.authentication.client.AuthenticationException; |
| import org.apache.ozone.test.GenericTestUtils; |
| import org.apache.ozone.test.LambdaTestUtils; |
| import org.junit.After; |
| import org.junit.Assert; |
| import org.junit.Before; |
| import org.junit.Ignore; |
| import org.junit.Rule; |
| import org.junit.Test; |
| import org.junit.rules.Timeout; |
| import org.slf4j.Logger; |
| import org.slf4j.LoggerFactory; |
| |
| /** |
| * Test SCM and DataNode Upgrade sequence. |
| */ |
| public class TestHDDSUpgrade { |
| |
| /** |
| * Set a timeout for each test. |
| */ |
| @Rule |
| public Timeout timeout = new Timeout(11000000); |
| private static final Logger LOG = |
| LoggerFactory.getLogger(TestHDDSUpgrade.class); |
| private static final int NUM_DATA_NODES = 3; |
| |
| private MiniOzoneCluster cluster; |
| private OzoneConfiguration conf; |
| private StorageContainerManager scm; |
| private ContainerManagerV2 scmContainerManager; |
| private PipelineManager scmPipelineManager; |
| private final int numContainersCreated = 1; |
| private HDDSLayoutVersionManager scmVersionManager; |
| private AtomicBoolean testPassed = new AtomicBoolean(true); |
| |
| private static final ReplicationConfig RATIS_THREE = |
| ReplicationConfig.fromTypeAndFactor(HddsProtos.ReplicationType.RATIS, |
| HddsProtos.ReplicationFactor.THREE); |
| |
| /** |
| * Create a MiniDFSCluster for testing. |
| * |
| * @throws IOException |
| */ |
| @Before |
| public void setUp() throws Exception { |
| init(); |
| } |
| |
| @After |
| public void tearDown() throws Exception { |
| shutdown(); |
| } |
| |
| public void init() throws Exception { |
| conf = new OzoneConfiguration(); |
| conf.setTimeDuration(HDDS_PIPELINE_REPORT_INTERVAL, 1000, |
| TimeUnit.MILLISECONDS); |
| conf.set(OZONE_DATANODE_PIPELINE_LIMIT, "1"); |
| cluster = MiniOzoneCluster.newBuilder(conf) |
| .setNumDatanodes(NUM_DATA_NODES) |
| // allow only one FACTOR THREE pipeline. |
| .setTotalPipelineNumLimit(NUM_DATA_NODES + 1) |
| .setHbInterval(500) |
| .setHbProcessorInterval(500) |
| .setScmLayoutVersion(INITIAL_VERSION.layoutVersion()) |
| .setDnLayoutVersion(INITIAL_VERSION.layoutVersion()) |
| .build(); |
| cluster.waitForClusterToBeReady(); |
| loadSCMState(); |
| } |
| |
| /** |
| * Shutdown MiniDFSCluster. |
| */ |
| public void shutdown() { |
| if (cluster != null) { |
| cluster.shutdown(); |
| } |
| } |
| |
| /* |
| * Some tests repeatedly modify the cluster. Helper function to reload the |
| * latest SCM state. |
| */ |
| private void loadSCMState(){ |
| scm = cluster.getStorageContainerManager(); |
| scmContainerManager = scm.getContainerManager(); |
| scmPipelineManager = scm.getPipelineManager(); |
| scmVersionManager = scm.getLayoutVersionManager(); |
| } |
| |
| |
| /* |
| * helper function to create a Key. |
| */ |
| private void createKey() throws IOException { |
| final String uniqueId = "testhddsupgrade"; |
| OzoneClient client = OzoneClientFactory.getRpcClient(conf); |
| ObjectStore objectStore = client.getObjectStore(); |
| objectStore.createVolume(uniqueId); |
| objectStore.getVolume(uniqueId).createBucket(uniqueId); |
| OzoneOutputStream key = |
| objectStore.getVolume(uniqueId).getBucket(uniqueId) |
| .createKey(uniqueId, 1024, ReplicationType.RATIS, |
| ReplicationFactor.THREE, new HashMap<>()); |
| key.write(uniqueId.getBytes(UTF_8)); |
| key.flush(); |
| key.close(); |
| } |
| |
| /* |
| * Helper function to test Pre-Upgrade conditions on the SCM |
| */ |
| private void testPreUpgradeConditionsSCM() { |
| Assert.assertEquals(INITIAL_VERSION.layoutVersion(), |
| scmVersionManager.getMetadataLayoutVersion()); |
| for (ContainerInfo ci : scmContainerManager.getContainers()) { |
| Assert.assertEquals(HddsProtos.LifeCycleState.OPEN, ci.getState()); |
| } |
| } |
| |
| /* |
| * Helper function to test Post-Upgrade conditions on the SCM |
| */ |
| private void testPostUpgradeConditionsSCM() { |
| loadSCMState(); |
| Assert.assertEquals(scmVersionManager.getSoftwareLayoutVersion(), |
| scmVersionManager.getMetadataLayoutVersion()); |
| Assert.assertTrue(scmVersionManager.getMetadataLayoutVersion() >= 1); |
| |
| // SCM should not return from finalization until there is at least one |
| // pipeline to use. |
| try { |
| GenericTestUtils.waitFor(() -> { |
| int pipelineCount = scmPipelineManager.getPipelines(RATIS_THREE, OPEN) |
| .size(); |
| if (pipelineCount >= 1) { |
| return true; |
| } |
| return false; |
| }, 500, 60000); |
| } catch (TimeoutException | InterruptedException e) { |
| Assert.fail("Timeout waiting for Upgrade to complete on SCM."); |
| } |
| |
| // SCM will not return from finalization until there is at least one |
| // RATIS 3 pipeline. For this to exist, all three of our datanodes must |
| // be in the HEALTHY state. |
| testDataNodesStateOnSCM(HEALTHY, HEALTHY_READONLY); |
| |
| int countContainers = 0; |
| for (ContainerInfo ci : scmContainerManager.getContainers()) { |
| HddsProtos.LifeCycleState ciState = ci.getState(); |
| LOG.info("testPostUpgradeConditionsSCM: container state is {}", |
| ciState.name()); |
| Assert.assertTrue((ciState == HddsProtos.LifeCycleState.CLOSED) || |
| (ciState == HddsProtos.LifeCycleState.CLOSING) || |
| (ciState == HddsProtos.LifeCycleState.DELETING) || |
| (ciState == HddsProtos.LifeCycleState.DELETED) || |
| (ciState == HddsProtos.LifeCycleState.QUASI_CLOSED)); |
| countContainers++; |
| } |
| Assert.assertTrue(countContainers >= numContainersCreated); |
| } |
| |
| /* |
| * Helper function to test Pre-Upgrade conditions on all the DataNodes. |
| */ |
| private void testPreUpgradeConditionsDataNodes() { |
| for (HddsDatanodeService dataNode : cluster.getHddsDatanodes()) { |
| DatanodeStateMachine dsm = dataNode.getDatanodeStateMachine(); |
| HDDSLayoutVersionManager dnVersionManager = |
| dsm.getLayoutVersionManager(); |
| Assert.assertEquals(0, dnVersionManager.getMetadataLayoutVersion()); |
| } |
| |
| int countContainers = 0; |
| for (HddsDatanodeService dataNode : cluster.getHddsDatanodes()) { |
| DatanodeStateMachine dsm = dataNode.getDatanodeStateMachine(); |
| // Also verify that all the existing containers are open. |
| for (Iterator<Container<?>> it = |
| dsm.getContainer().getController().getContainers(); it.hasNext();) { |
| Container container = it.next(); |
| Assert.assertTrue(container.getContainerState() == |
| ContainerProtos.ContainerDataProto.State.OPEN); |
| countContainers++; |
| } |
| } |
| Assert.assertTrue(countContainers >= 1); |
| } |
| |
| /* |
| * Helper function to test Post-Upgrade conditions on all the DataNodes. |
| */ |
| private void testPostUpgradeConditionsDataNodes() { |
| try { |
| GenericTestUtils.waitFor(() -> { |
| for (HddsDatanodeService dataNode : cluster.getHddsDatanodes()) { |
| DatanodeStateMachine dsm = dataNode.getDatanodeStateMachine(); |
| try { |
| if ((dsm.queryUpgradeStatus().status() != FINALIZATION_DONE) && |
| (dsm.queryUpgradeStatus().status() != ALREADY_FINALIZED)) { |
| return false; |
| } |
| } catch (IOException e) { |
| LOG.error("Exception. ", e); |
| return false; |
| } |
| } |
| return true; |
| }, 500, 60000); |
| } catch (TimeoutException | InterruptedException e) { |
| Assert.fail("Timeout waiting for Upgrade to complete on Data Nodes."); |
| } |
| |
| int countContainers = 0; |
| for (HddsDatanodeService dataNode : cluster.getHddsDatanodes()) { |
| DatanodeStateMachine dsm = dataNode.getDatanodeStateMachine(); |
| HDDSLayoutVersionManager dnVersionManager = |
| dsm.getLayoutVersionManager(); |
| Assert.assertEquals(dnVersionManager.getSoftwareLayoutVersion(), |
| dnVersionManager.getMetadataLayoutVersion()); |
| Assert.assertTrue(dnVersionManager.getMetadataLayoutVersion() >= 1); |
| |
| // Also verify that all the existing containers are closed. |
| for (Iterator<Container<?>> it = |
| dsm.getContainer().getController().getContainers(); it.hasNext();) { |
| Container container = it.next(); |
| Assert.assertTrue(container.getContainerState() == CLOSED || |
| container.getContainerState() == QUASI_CLOSED); |
| countContainers++; |
| } |
| } |
| Assert.assertTrue(countContainers >= 1); |
| } |
| |
| /* |
| * Helper function to test that we can create new pipelines Post-Upgrade. |
| */ |
| private void testPostUpgradePipelineCreation() throws IOException { |
| Pipeline ratisPipeline1 = scmPipelineManager.createPipeline(RATIS_THREE); |
| scmPipelineManager.openPipeline(ratisPipeline1.getId()); |
| Assert.assertEquals(0, |
| scmPipelineManager.getNumberOfContainers(ratisPipeline1.getId())); |
| PipelineID pid = scmContainerManager.allocateContainer(RATIS_THREE, |
| "Owner1").getPipelineID(); |
| Assert.assertEquals(1, scmPipelineManager.getNumberOfContainers(pid)); |
| Assert.assertEquals(pid, ratisPipeline1.getId()); |
| } |
| |
| /* |
| * Helper function to test DataNode state on the SCM. Note that due to |
| * timing constraints, sometime the node-state can transition to the next |
| * state. This function expects the DataNode to be in NodeState "state" or |
| * "alternateState". Some tests can enforce a unique NodeState test by |
| * setting "alternateState = null". |
| */ |
| private void testDataNodesStateOnSCM(NodeState state, |
| NodeState alternateState) { |
| int countNodes = 0; |
| for (DatanodeDetails dn : scm.getScmNodeManager().getAllNodes()) { |
| try { |
| NodeState dnState = |
| scm.getScmNodeManager().getNodeStatus(dn).getHealth(); |
| Assert.assertTrue((dnState == state) || |
| (alternateState == null ? false : dnState == alternateState)); |
| } catch (NodeNotFoundException e) { |
| e.printStackTrace(); |
| Assert.fail("Node not found"); |
| } |
| ++countNodes; |
| } |
| Assert.assertEquals(NUM_DATA_NODES, countNodes); |
| } |
| |
| /* |
| * Helper function to wait for Pipeline creation. |
| */ |
| private void waitForPipelineCreated() throws Exception { |
| LambdaTestUtils.await(10000, 500, () -> { |
| List<Pipeline> pipelines = |
| scmPipelineManager.getPipelines(RATIS_THREE, OPEN); |
| return pipelines.size() == 1; |
| }); |
| } |
| |
| /* |
| * Helper function for container creation. |
| */ |
| private void createTestContainers() throws IOException { |
| XceiverClientManager xceiverClientManager = new XceiverClientManager(conf); |
| ContainerInfo ci1 = scmContainerManager.allocateContainer( |
| RATIS_THREE, "Owner1"); |
| Pipeline ratisPipeline1 = |
| scmPipelineManager.getPipeline(ci1.getPipelineID()); |
| scmPipelineManager.openPipeline(ratisPipeline1.getId()); |
| XceiverClientSpi client1 = |
| xceiverClientManager.acquireClient(ratisPipeline1); |
| ContainerProtocolCalls.createContainer(client1, |
| ci1.getContainerID(), null); |
| xceiverClientManager.releaseClient(client1, false); |
| } |
| |
| /* |
| * Happy Path Test Case. |
| */ |
| @Test |
| public void testFinalizationFromInitialVersionToLatestVersion() |
| throws Exception { |
| |
| waitForPipelineCreated(); |
| createTestContainers(); |
| |
| // Test the Pre-Upgrade conditions on SCM as well as DataNodes. |
| testPreUpgradeConditionsSCM(); |
| testPreUpgradeConditionsDataNodes(); |
| |
| Set<PipelineID> preUpgradeOpenPipelines = |
| scmPipelineManager.getPipelines(RATIS_THREE, OPEN) |
| .stream() |
| .map(Pipeline::getId) |
| .collect(Collectors.toSet()); |
| |
| // Trigger Finalization on the SCM |
| StatusAndMessages status = scm.finalizeUpgrade("xyz"); |
| Assert.assertEquals(STARTING_FINALIZATION, status.status()); |
| |
| // Wait for the Finalization to complete on the SCM. |
| while (status.status() != FINALIZATION_DONE) { |
| status = scm.queryUpgradeFinalizationProgress("xyz", false, false); |
| } |
| |
| Set<PipelineID> postUpgradeOpenPipelines = |
| scmPipelineManager.getPipelines(RATIS_THREE, OPEN) |
| .stream() |
| .map(Pipeline::getId) |
| .collect(Collectors.toSet()); |
| |
| // No pipelines from before the upgrade should still be open after the |
| // upgrade. |
| long numPreUpgradeOpenPipelines = preUpgradeOpenPipelines |
| .stream() |
| .filter(postUpgradeOpenPipelines::contains) |
| .count(); |
| Assert.assertEquals(0, numPreUpgradeOpenPipelines); |
| |
| // Verify Post-Upgrade conditions on the SCM. |
| testPostUpgradeConditionsSCM(); |
| |
| // All datanodes on the SCM should have moved to HEALTHY-READONLY state. |
| testDataNodesStateOnSCM(HEALTHY_READONLY, HEALTHY); |
| |
| // Verify the SCM has driven all the DataNodes through Layout Upgrade. |
| testPostUpgradeConditionsDataNodes(); |
| |
| // Test that we can use a pipeline after upgrade. |
| // Will fail with exception if there are no pipelines. |
| ObjectStore store = cluster.getClient().getObjectStore(); |
| store.createVolume("vol1"); |
| store.getVolume("vol1").createBucket("buc1"); |
| store.getVolume("vol1").getBucket("buc1").createKey("key1", 100, |
| ReplicationType.RATIS, ReplicationFactor.THREE, new HashMap<>()); |
| |
| } |
| |
| /* |
| * All the subsequent tests here are failure cases. Some of the tests below |
| * could simultaneously fail one or more nodes at specific execution points |
| * and in different thread contexts. |
| * Upgrade path key execution points are defined in |
| * UpgradeFinalizer:UpgradeTestInjectionPoints. |
| */ |
| |
| /* |
| * Helper function to inject SCM failure and a SCM restart at a given |
| * execution point during SCM-Upgrade. |
| * |
| * Injects Failure in : SCM |
| * Executing-Thread-Context : SCM-Upgrade |
| */ |
| private Boolean injectSCMFailureDuringSCMUpgrade() |
| throws InterruptedException, TimeoutException, AuthenticationException, |
| IOException { |
| // For some tests this could get called in a different thread context. |
| // We need to guard concurrent updates to the cluster. |
| synchronized(cluster) { |
| cluster.restartStorageContainerManager(true); |
| loadSCMState(); |
| } |
| // The ongoing current SCM Upgrade is getting aborted at this point. We |
| // need to schedule a new SCM Upgrade on a different thread context. |
| Thread t = new Thread(new Runnable() { |
| @Override |
| public void run() { |
| try { |
| loadSCMState(); |
| scm.finalizeUpgrade("xyz"); |
| } catch (IOException e) { |
| e.printStackTrace(); |
| testPassed.set(false); |
| } |
| } |
| }); |
| t.start(); |
| return true; |
| } |
| |
| /* |
| * Helper function to inject DataNode failures and DataNode restarts at a |
| * given execution point during SCM-Upgrade. Please note that it fails all |
| * the DataNodes in the cluster and is part of test cases that simulate |
| * multi-node failure at specific code-execution points during SCM Upgrade. |
| * Please note that this helper function should be called in the thread |
| * context of an SCM-Upgrade only. The return value has a significance that |
| * it does not abort the currently ongoing SCM upgrade. because this |
| * failure injection does not fail the SCM node and only impacts datanodes, |
| * we do not need to schedule another scm-finalize-upgrade here. |
| * |
| * Injects Failure in : All the DataNodes |
| * Executing-Thread-Context : SCM-Upgrade |
| */ |
| private Boolean injectDataNodeFailureDuringSCMUpgrade() { |
| try { |
| // Work on a Copy of current set of DataNodes to avoid |
| // running into tricky situations. |
| List<HddsDatanodeService> currentDataNodes = |
| new ArrayList<>(cluster.getHddsDatanodes()); |
| for (HddsDatanodeService ds: currentDataNodes) { |
| DatanodeDetails dn = ds.getDatanodeDetails(); |
| LOG.info("Restarting datanode {}", dn.getUuidString()); |
| cluster.restartHddsDatanode(dn, false); |
| } |
| cluster.waitForClusterToBeReady(); |
| } catch (Exception e) { |
| LOG.info("DataNode Restarts Failed!"); |
| testPassed.set(false); |
| } |
| loadSCMState(); |
| // returning false from injection function, continues currently ongoing |
| // SCM-Upgrade-Finalization. |
| return false; |
| } |
| |
| /* |
| * Helper function to inject a DataNode failure and restart for a specific |
| * DataNode. This injection function can target a specific DataNode and |
| * thus facilitates getting called in the upgrade-finalization thread context |
| * of that specific DataNode. |
| * |
| * Injects Failure in : Given DataNodes |
| * Executing-Thread-Context : the same DataNode that we are failing here. |
| */ |
| private Thread injectDataNodeFailureDuringDataNodeUpgrade( |
| DatanodeDetails dn) { |
| Thread t = null; |
| try { |
| // Schedule the DataNode restart on a separate thread context |
| // otherwise DataNode restart will hang. Also any cluster modification |
| // needs to be guarded since it could get modified in multiple independent |
| // threads. |
| t = new Thread(new Runnable() { |
| @Override |
| public void run() { |
| try { |
| synchronized (cluster) { |
| cluster.restartHddsDatanode(dn, true); |
| } |
| } catch (Exception e) { |
| e.printStackTrace(); |
| testPassed.set(false); |
| } |
| } |
| }); |
| } catch (Exception e) { |
| LOG.info("DataNode Restart Failed!"); |
| Assert.fail(e.getMessage()); |
| } |
| return t; |
| } |
| |
| /* |
| * Helper function to inject coordinated failures and restarts across |
| * all the DataNode as well as SCM. This can help create targeted test cases |
| * to inject such comprehensive failures in SCM-Upgrade-Context as well as |
| * DataNode-Upgrade-Context. |
| * |
| * Injects Failure in : SCM as well as ALL the DataNodes. |
| * Executing-Thread-Context : Either the SCM-Upgrade-Finalizer or the |
| * DataNode-Upgrade-Finalizer. |
| */ |
| private Thread injectSCMAndDataNodeFailureTogetherAtTheSameTime() |
| throws InterruptedException, TimeoutException, AuthenticationException, |
| IOException { |
| // This needs to happen in a separate thread context otherwise |
| // DataNode restart will hang. |
| return new Thread(new Runnable() { |
| @Override |
| public void run() { |
| try { |
| // Since we are modifying cluster in an independent thread context, |
| // we synchronize access to it to avoid concurrent modification |
| // exception. |
| synchronized (cluster) { |
| // Work on a Copy of current set of DataNodes to avoid |
| // running into tricky situations. |
| List<HddsDatanodeService> currentDataNodes = |
| new ArrayList<>(cluster.getHddsDatanodes()); |
| for (HddsDatanodeService ds: currentDataNodes) { |
| DatanodeDetails dn = ds.getDatanodeDetails(); |
| cluster.restartHddsDatanode(dn, false); |
| } |
| cluster.restartStorageContainerManager(false); |
| cluster.waitForClusterToBeReady(); |
| } |
| } catch (Exception e) { |
| e.printStackTrace(); |
| testPassed.set(false); |
| } |
| } |
| }); |
| } |
| |
| /* |
| * We have various test cases to target single-node or multi-node failures |
| * below. |
| **/ |
| |
| /* |
| * One node(SCM) failure case: |
| * Thread-Context : SCM-Upgrade |
| * |
| * Test SCM failure During SCM Upgrade before execution point |
| * "PreFinalizeUpgrade". All meaningful Upgrade execution points |
| * are defined in UpgradeFinalizer:UpgradeTestInjectionPoints. |
| */ |
| @Test |
| public void testScmFailuresBeforeScmPreFinalizeUpgrade() |
| throws Exception { |
| testPassed.set(true); |
| InjectedUpgradeFinalizationExecutor scmFinalizationExecutor = |
| new InjectedUpgradeFinalizationExecutor(); |
| ((BasicUpgradeFinalizer)scm.getUpgradeFinalizer()) |
| .setFinalizationExecutor(scmFinalizationExecutor); |
| scmFinalizationExecutor.configureTestInjectionFunction( |
| BEFORE_PRE_FINALIZE_UPGRADE, |
| () -> { |
| return this.injectSCMFailureDuringSCMUpgrade(); |
| }); |
| testFinalizationWithFailureInjectionHelper(null); |
| Assert.assertTrue(testPassed.get()); |
| } |
| |
| /* |
| * One node(SCM) failure case: |
| * Thread-Context : SCM-Upgrade |
| * |
| * Test SCM failure During SCM Upgrade after execution point |
| * "PreFinalizeUpgrade". All meaningful Upgrade execution points |
| * are defined in UpgradeFinalizer:UpgradeTestInjectionPoints. |
| */ |
| @Test |
| public void testScmFailuresAfterScmPreFinalizeUpgrade() |
| throws Exception { |
| testPassed.set(true); |
| InjectedUpgradeFinalizationExecutor scmFinalizationExecutor = |
| new InjectedUpgradeFinalizationExecutor(); |
| ((BasicUpgradeFinalizer)scm.getUpgradeFinalizer()) |
| .setFinalizationExecutor(scmFinalizationExecutor); |
| scmFinalizationExecutor.configureTestInjectionFunction( |
| AFTER_PRE_FINALIZE_UPGRADE, |
| () -> { |
| return this.injectSCMFailureDuringSCMUpgrade(); |
| }); |
| testFinalizationWithFailureInjectionHelper(null); |
| Assert.assertTrue(testPassed.get()); |
| } |
| |
| /* |
| * One node(SCM) failure case: |
| * Thread-Context : SCM-Upgrade |
| * |
| * Test SCM failure During SCM Upgrade after execution point |
| * "CompleteFinalization". All meaningful Upgrade execution points |
| * are defined in UpgradeFinalizer:UpgradeTestInjectionPoints. |
| */ |
| @Test |
| public void testScmFailuresAfterScmCompleteFinalization() |
| throws Exception { |
| testPassed.set(true); |
| InjectedUpgradeFinalizationExecutor scmFinalizationExecutor = |
| new InjectedUpgradeFinalizationExecutor(); |
| ((BasicUpgradeFinalizer)scm.getUpgradeFinalizer()) |
| .setFinalizationExecutor(scmFinalizationExecutor); |
| scmFinalizationExecutor.configureTestInjectionFunction( |
| AFTER_COMPLETE_FINALIZATION, |
| () -> { |
| return this.injectSCMFailureDuringSCMUpgrade(); |
| }); |
| testFinalizationWithFailureInjectionHelper(null); |
| Assert.assertTrue(testPassed.get()); |
| } |
| |
| /* |
| * One node(SCM) failure case: |
| * Thread-Context : SCM-Upgrade |
| * |
| * Test SCM failure During SCM Upgrade after execution point |
| * "PostFinalizeUpgrade". All meaningful Upgrade execution points |
| * are defined in UpgradeFinalizer:UpgradeTestInjectionPoints. |
| */ |
| @Test |
| public void testScmFailuresAfterScmPostFinalizeUpgrade() |
| throws Exception { |
| testPassed.set(true); |
| InjectedUpgradeFinalizationExecutor scmFinalizationExecutor = |
| new InjectedUpgradeFinalizationExecutor(); |
| ((BasicUpgradeFinalizer)scm.getUpgradeFinalizer()) |
| .setFinalizationExecutor(scmFinalizationExecutor); |
| scmFinalizationExecutor.configureTestInjectionFunction( |
| AFTER_POST_FINALIZE_UPGRADE, |
| () -> { |
| return this.injectSCMFailureDuringSCMUpgrade(); |
| }); |
| testFinalizationWithFailureInjectionHelper(null); |
| Assert.assertTrue(testPassed.get()); |
| } |
| |
| /* |
| * Multi node(all DataNodes) failure case: |
| * Thread-Context : SCM-Upgrade |
| * |
| * Test all DataNode failures During SCM Upgrade before execution point |
| * "PreFinalizeUpgrade". All meaningful Upgrade execution points |
| * are defined in UpgradeFinalizer:UpgradeTestInjectionPoints. |
| */ |
| @Test |
| public void testAllDataNodeFailuresBeforeScmPreFinalizeUpgrade() |
| throws Exception { |
| testPassed.set(true); |
| InjectedUpgradeFinalizationExecutor scmFinalizationExecutor = |
| new InjectedUpgradeFinalizationExecutor(); |
| ((BasicUpgradeFinalizer)scm.getUpgradeFinalizer()) |
| .setFinalizationExecutor(scmFinalizationExecutor); |
| scmFinalizationExecutor.configureTestInjectionFunction( |
| BEFORE_PRE_FINALIZE_UPGRADE, |
| () -> { |
| return injectDataNodeFailureDuringSCMUpgrade(); |
| }); |
| testFinalizationWithFailureInjectionHelper(null); |
| Assert.assertTrue(testPassed.get()); |
| } |
| |
| /* |
| * Multi node(all DataNodes) failure case: |
| * Thread-Context : SCM-Upgrade |
| * |
| * Test all DataNode failures During SCM Upgrade before execution point |
| * "PreFinalizeUpgrade". All meaningful Upgrade execution points |
| * are defined in UpgradeFinalizer:UpgradeTestInjectionPoints. |
| */ |
| @Test |
| public void testAllDataNodeFailuresAfterScmPreFinalizeUpgrade() |
| throws Exception { |
| testPassed.set(true); |
| InjectedUpgradeFinalizationExecutor scmFinalizationExecutor = |
| new InjectedUpgradeFinalizationExecutor(); |
| ((BasicUpgradeFinalizer)scm.getUpgradeFinalizer()) |
| .setFinalizationExecutor(scmFinalizationExecutor); |
| scmFinalizationExecutor.configureTestInjectionFunction( |
| AFTER_PRE_FINALIZE_UPGRADE, |
| () -> { |
| return injectDataNodeFailureDuringSCMUpgrade(); |
| }); |
| testFinalizationWithFailureInjectionHelper(null); |
| Assert.assertTrue(testPassed.get()); |
| } |
| |
| /* |
| * Multi node(all DataNodes) failure case: |
| * Thread-Context : SCM-Upgrade |
| * |
| * Test all DataNode failures During SCM Upgrade after execution point |
| * "CompleteFinalization". All meaningful Upgrade execution points |
| * are defined in UpgradeFinalizer:UpgradeTestInjectionPoints. |
| */ |
| @Test |
| public void testAllDataNodeFailuresAfterScmCompleteFinalization() |
| throws Exception { |
| testPassed.set(true); |
| InjectedUpgradeFinalizationExecutor scmFinalizationExecutor = |
| new InjectedUpgradeFinalizationExecutor(); |
| ((BasicUpgradeFinalizer)scm.getUpgradeFinalizer()) |
| .setFinalizationExecutor(scmFinalizationExecutor); |
| scmFinalizationExecutor.configureTestInjectionFunction( |
| AFTER_COMPLETE_FINALIZATION, |
| () -> { |
| return injectDataNodeFailureDuringSCMUpgrade(); |
| }); |
| testFinalizationWithFailureInjectionHelper(null); |
| Assert.assertTrue(testPassed.get()); |
| } |
| |
| /* |
| * Multi node(all DataNodes) failure case: |
| * Thread-Context : SCM-Upgrade |
| * |
| * Test all DataNode failures During SCM Upgrade after execution point |
| * "PostFinalizeUpgrade". All meaningful Upgrade execution points |
| * are defined in UpgradeFinalizer:UpgradeTestInjectionPoints. |
| */ |
| @Test |
| public void testAllDataNodeFailuresAfterScmPostFinalizeUpgrade() |
| throws Exception { |
| testPassed.set(true); |
| InjectedUpgradeFinalizationExecutor scmFinalizationExecutor = |
| new InjectedUpgradeFinalizationExecutor(); |
| ((BasicUpgradeFinalizer)scm.getUpgradeFinalizer()) |
| .setFinalizationExecutor(scmFinalizationExecutor); |
| scmFinalizationExecutor.configureTestInjectionFunction( |
| AFTER_POST_FINALIZE_UPGRADE, |
| () -> { |
| return injectDataNodeFailureDuringSCMUpgrade(); |
| }); |
| testFinalizationWithFailureInjectionHelper(null); |
| Assert.assertTrue(testPassed.get()); |
| } |
| |
| /* |
| * Single node(targeted DataNode) failure case: |
| * Thread-Context : DataNode-Upgrade. |
| * |
| * Fail the same DataNode that is going through Upgrade-processing at a |
| * specific code execution point. This test covers all the meaningful |
| * Upgrade execution points as defined in |
| * UpgradeFinalizer:UpgradeTestInjectionPoints. |
| */ |
| @Ignore |
| @Test |
| public void testDataNodeFailuresDuringDataNodeUpgrade() |
| throws Exception { |
| for (UpgradeTestInjectionPoints injectionPoint: |
| UpgradeTestInjectionPoints.values()) { |
| testPassed.set(true); |
| // Configure a given data node to fail itself when it's |
| // corresponding Upgrade-Finalizer reaches a specific point in it's |
| // execution. |
| HddsDatanodeService ds = cluster.getHddsDatanodes().get(1); |
| Thread failureInjectionThread = |
| injectDataNodeFailureDuringDataNodeUpgrade(ds.getDatanodeDetails()); |
| InjectedUpgradeFinalizationExecutor dataNodeFinalizationExecutor = |
| new InjectedUpgradeFinalizationExecutor(); |
| dataNodeFinalizationExecutor.configureTestInjectionFunction( |
| injectionPoint, () -> { |
| failureInjectionThread.start(); |
| return true; |
| }); |
| ((BasicUpgradeFinalizer)ds.getDatanodeStateMachine() |
| .getUpgradeFinalizer()) |
| .setFinalizationExecutor(dataNodeFinalizationExecutor); |
| testFinalizationWithFailureInjectionHelper(failureInjectionThread); |
| Assert.assertTrue(testPassed.get()); |
| synchronized (cluster) { |
| shutdown(); |
| init(); |
| } |
| LOG.info("testDataNodeFailuresDuringDataNodeUpgrade: Failure Injection " + |
| "Point {} passed.", injectionPoint.name()); |
| } |
| } |
| |
| /* |
| * Two nodes(SCM and a targeted DataNode) combination failure case: |
| * Thread-Contexts : |
| * DataNode failure in its own DataNode-Upgrade-Context . |
| * SCM failure in its own SCM-Upgrade-Context . |
| * |
| * Fail the same DataNode that is going through its own Upgrade-processing |
| * at a specific code execution point. Also fail the SCM when SCM is going |
| * through upgrade-finalization. This test covers all the combinations of |
| * SCM-Upgrade-execution points and DataNode-Upgrade-execution points. |
| */ |
| @Ignore |
| @Test |
| public void testAllPossibleDataNodeFailuresAndSCMFailures() |
| throws Exception { |
| // Configure a given data node to restart itself when it's |
| // corresponding Upgrade-Finalizer reaches a specific point in it's |
| // execution. |
| for (UpgradeTestInjectionPoints scmInjectionPoint : |
| UpgradeTestInjectionPoints.values()) { |
| InjectedUpgradeFinalizationExecutor scmFinalizationExecutor = |
| new InjectedUpgradeFinalizationExecutor(); |
| scmFinalizationExecutor.configureTestInjectionFunction( |
| scmInjectionPoint, |
| () -> { |
| return this.injectSCMFailureDuringSCMUpgrade(); |
| }); |
| ((BasicUpgradeFinalizer)scm.getUpgradeFinalizer()) |
| .setFinalizationExecutor(scmFinalizationExecutor); |
| |
| for (UpgradeTestInjectionPoints datanodeInjectionPoint : |
| UpgradeTestInjectionPoints.values()) { |
| HddsDatanodeService ds = cluster.getHddsDatanodes().get(1); |
| testPassed.set(true); |
| Thread dataNodefailureInjectionThread = |
| injectDataNodeFailureDuringDataNodeUpgrade(ds.getDatanodeDetails()); |
| InjectedUpgradeFinalizationExecutor dataNodeFinalizationExecutor = |
| new InjectedUpgradeFinalizationExecutor(); |
| dataNodeFinalizationExecutor.configureTestInjectionFunction( |
| datanodeInjectionPoint, () -> { |
| dataNodefailureInjectionThread.start(); |
| return true; |
| }); |
| ((BasicUpgradeFinalizer)ds.getDatanodeStateMachine() |
| .getUpgradeFinalizer()) |
| .setFinalizationExecutor(dataNodeFinalizationExecutor); |
| testFinalizationWithFailureInjectionHelper( |
| dataNodefailureInjectionThread); |
| Assert.assertTrue(testPassed.get()); |
| synchronized (cluster) { |
| shutdown(); |
| init(); |
| } |
| LOG.info("testAllPossibleDataNodeFailuresAndSCMFailures: " + |
| "DataNode-Failure-Injection-Point={} with " + |
| "Scm-FailureInjection-Point={} passed.", |
| datanodeInjectionPoint.name(), scmInjectionPoint.name()); |
| } |
| } |
| } |
| |
| /* |
| * Two nodes(SCM and a targeted DataNode together at the same time) |
| * combination failure case: |
| * Thread-Contexts : |
| * SCM-Upgrade-Finalizer-Context |
| * |
| * Fail the DataNode and the SCM together when the SCM is going |
| * through upgrade. This test covers all the combinations of |
| * SCM-Upgrade-execution points. |
| */ |
| @Ignore |
| @Test |
| public void testDataNodeAndSCMFailuresTogetherDuringSCMUpgrade() |
| throws Exception { |
| for (UpgradeTestInjectionPoints injectionPoint : |
| UpgradeTestInjectionPoints.values()) { |
| testPassed.set(true); |
| Thread helpingFailureInjectionThread = |
| injectSCMAndDataNodeFailureTogetherAtTheSameTime(); |
| InjectedUpgradeFinalizationExecutor scmFinalizationExecutor = |
| new InjectedUpgradeFinalizationExecutor(); |
| scmFinalizationExecutor.configureTestInjectionFunction( |
| injectionPoint, () -> { |
| helpingFailureInjectionThread.start(); |
| return true; |
| }); |
| ((BasicUpgradeFinalizer)scm.getUpgradeFinalizer()) |
| .setFinalizationExecutor(scmFinalizationExecutor); |
| testFinalizationWithFailureInjectionHelper(helpingFailureInjectionThread); |
| Assert.assertTrue(testPassed.get()); |
| synchronized (cluster) { |
| shutdown(); |
| init(); |
| } |
| LOG.info("testDataNodeAndSCMFailuresTogetherDuringSCMUpgrade: Failure " + |
| "Injection Point {} passed.", injectionPoint.name()); |
| } |
| } |
| |
| /* |
| * Two nodes(SCM and a targeted DataNode together at the same time) |
| * combination failure case: |
| * Thread-Contexts : |
| * DataNode-Upgrade-Finalizer-Context. |
| * |
| * Fail the DataNode and the SCM together when the DataNode is going |
| * through upgrade. This test covers all the combinations of |
| * DataNode-Upgrade-execution points. |
| */ |
| @Ignore |
| @Test |
| public void testDataNodeAndSCMFailuresTogetherDuringDataNodeUpgrade() |
| throws Exception { |
| for (UpgradeTestInjectionPoints injectionPoint : |
| UpgradeTestInjectionPoints.values()) { |
| testPassed.set(true); |
| Thread helpingFailureInjectionThread = |
| injectSCMAndDataNodeFailureTogetherAtTheSameTime(); |
| HddsDatanodeService ds = cluster.getHddsDatanodes().get(1); |
| InjectedUpgradeFinalizationExecutor dataNodeFinalizationExecutor = |
| new InjectedUpgradeFinalizationExecutor(); |
| dataNodeFinalizationExecutor.configureTestInjectionFunction( |
| injectionPoint, () -> { |
| helpingFailureInjectionThread.start(); |
| return true; |
| }); |
| ((BasicUpgradeFinalizer)ds.getDatanodeStateMachine() |
| .getUpgradeFinalizer()) |
| .setFinalizationExecutor(dataNodeFinalizationExecutor); |
| testFinalizationWithFailureInjectionHelper(helpingFailureInjectionThread); |
| Assert.assertTrue(testPassed.get()); |
| synchronized (cluster) { |
| shutdown(); |
| init(); |
| } |
| LOG.info("testDataNodeAndSCMFailuresTogetherDuringDataNodeUpgrade: " + |
| "Failure Injection Point {} passed.", injectionPoint.name()); |
| } |
| } |
| |
| public void testFinalizationWithFailureInjectionHelper( |
| Thread failureInjectionThread) throws Exception { |
| |
| waitForPipelineCreated(); |
| createTestContainers(); |
| createKey(); |
| |
| // Test the Pre-Upgrade conditions on SCM as well as DataNodes. |
| testPreUpgradeConditionsSCM(); |
| testPreUpgradeConditionsDataNodes(); |
| |
| // Trigger Finalization on the SCM |
| StatusAndMessages status = scm.finalizeUpgrade("xyz"); |
| Assert.assertEquals(STARTING_FINALIZATION, status.status()); |
| |
| // Make sure that any outstanding thread created by failure injection |
| // has completed its job. |
| if (failureInjectionThread != null) { |
| failureInjectionThread.join(); |
| } |
| |
| // Wait for the Finalization to complete on the SCM. |
| // Failure injection could have restarted the SCM and it could be in |
| // ALREADY_FINALIZED state as well. |
| while ((status.status() != FINALIZATION_DONE) && |
| (status.status() != ALREADY_FINALIZED)) { |
| loadSCMState(); |
| status = scm.queryUpgradeFinalizationProgress("xyz", true, false); |
| if (status.status() == FINALIZATION_REQUIRED) { |
| status = scm.finalizeUpgrade("xyz"); |
| } |
| } |
| |
| // Verify Post-Upgrade conditions on the SCM. |
| // With failure injection |
| testPostUpgradeConditionsSCM(); |
| |
| // All datanodes on the SCM should have moved to HEALTHY-READONLY state. |
| // Due to timing constraint also allow a "HEALTHY" state. |
| loadSCMState(); |
| testDataNodesStateOnSCM(HEALTHY_READONLY, HEALTHY); |
| |
| // Need to wait for post finalization heartbeat from DNs. |
| LambdaTestUtils.await(600000, 500, () -> { |
| try { |
| loadSCMState(); |
| testDataNodesStateOnSCM(HEALTHY, null); |
| sleep(100); |
| } catch (Throwable ex) { |
| LOG.info(ex.getMessage()); |
| return false; |
| } |
| return true; |
| }); |
| |
| // Verify the SCM has driven all the DataNodes through Layout Upgrade. |
| testPostUpgradeConditionsDataNodes(); |
| |
| // Verify that new pipeline can be created with upgraded datanodes. |
| try { |
| testPostUpgradePipelineCreation(); |
| } catch(SCMException e) { |
| // If pipeline creation fails, make sure that there is a valid reason |
| // for this i.e. all datanodes are already part of some pipeline. |
| for (HddsDatanodeService dataNode : cluster.getHddsDatanodes()) { |
| DatanodeStateMachine dsm = dataNode.getDatanodeStateMachine(); |
| Set<PipelineID> pipelines = |
| scm.getScmNodeManager().getPipelines(dsm.getDatanodeDetails()); |
| Assert.assertTrue(pipelines != null); |
| } |
| } |
| } |
| } |