| /* |
| * Licensed to the Apache Software Foundation (ASF) under one or more |
| * contributor license agreements. See the NOTICE file distributed with this |
| * work for additional information regarding copyright ownership. The ASF |
| * licenses this file to you under the Apache License, Version 2.0 (the |
| * "License"); you may not use this file except in compliance with the License. |
| * You may obtain a copy of the License at |
| * <p> |
| * http://www.apache.org/licenses/LICENSE-2.0 |
| * <p> |
| * Unless required by applicable law or agreed to in writing, software |
| * distributed under the License is distributed on an "AS IS" BASIS, WITHOUT |
| * WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. See the |
| * License for the specific language governing permissions and limitations under |
| * the License. |
| */ |
| |
| package org.apache.hadoop.ozone.om; |
| |
| import static java.nio.charset.StandardCharsets.UTF_8; |
| |
| import java.io.File; |
| import java.io.IOException; |
| import java.nio.file.Paths; |
| import java.util.ArrayList; |
| import java.util.Arrays; |
| import java.util.HashSet; |
| import java.util.List; |
| import java.util.Set; |
| import java.util.UUID; |
| import java.util.concurrent.Callable; |
| import java.util.concurrent.ExecutionException; |
| import java.util.concurrent.ExecutorService; |
| import java.util.concurrent.Executors; |
| import java.util.concurrent.Future; |
| |
| import org.apache.hadoop.hdds.client.ReplicationFactor; |
| import org.apache.hadoop.hdds.client.ReplicationType; |
| import org.apache.hadoop.ozone.MiniOzoneHAClusterImpl; |
| import org.apache.hadoop.ozone.client.ObjectStore; |
| import org.apache.hadoop.ozone.client.OzoneVolume; |
| import org.apache.hadoop.ozone.client.io.OzoneOutputStream; |
| import org.apache.hadoop.ozone.client.protocol.ClientProtocol; |
| import org.apache.hadoop.ozone.container.ContainerTestHelper; |
| import org.apache.hadoop.ozone.container.TestHelper; |
| import org.apache.hadoop.ozone.om.exceptions.OMException; |
| import org.apache.hadoop.ozone.om.helpers.OmKeyInfo; |
| import org.apache.hadoop.ozone.protocol.proto.OzoneManagerProtocolProtos.PrepareStatusResponse.PrepareStatus; |
| import org.apache.ozone.test.GenericTestUtils; |
| import org.apache.ozone.test.LambdaTestUtils; |
| import org.apache.ozone.test.tag.Slow; |
| import org.apache.ratis.util.ExitUtils; |
| import org.junit.Assert; |
| import org.apache.ozone.test.tag.Flaky; |
| import org.junit.jupiter.api.BeforeEach; |
| import org.junit.jupiter.api.Disabled; |
| import org.junit.jupiter.api.Test; |
| import org.slf4j.Logger; |
| import org.slf4j.LoggerFactory; |
| |
| /** |
| * Test OM prepare against actual mini cluster. |
| */ |
| @Flaky("HDDS-5990") |
| public class TestOzoneManagerPrepare extends TestOzoneManagerHA { |
| private static final String BUCKET = "bucket"; |
| private static final String VOLUME = "volume"; |
| private static final String KEY_PREFIX = "key"; |
| |
| // Maximum time to wait for conditions involving Ratis logs. |
| private static final int WAIT_TIMEOUT_MILLIS = 120000; |
| private static final long PREPARE_FLUSH_WAIT_TIMEOUT_SECONDS = 120L; |
| private static final long PREPARE_FLUSH_INTERVAL_SECONDS = 5L; |
| |
| private MiniOzoneHAClusterImpl cluster; |
| private ClientProtocol clientProtocol; |
| private ObjectStore store; |
| |
| private static final Logger LOG = |
| LoggerFactory.getLogger(TestOzoneManagerPrepare.class); |
| |
| public void setup() throws Exception { |
| cluster = getCluster(); |
| store = getObjectStore(); |
| clientProtocol = store.getClientProxy(); |
| } |
| |
| /** |
| * Make sure OM is out of Prepare state before executing individual tests. |
| * @throws Exception |
| */ |
| @BeforeEach |
| public void initOM() throws Exception { |
| setup(); |
| LOG.info("Waiting for OM leader election"); |
| GenericTestUtils.waitFor(() -> cluster.getOMLeader() != null, |
| 1000, 120_000); |
| submitCancelPrepareRequest(); |
| assertClusterNotPrepared(); |
| } |
| |
| /** |
| * Writes data to the cluster via the leader OM, and then prepares it. |
| * Checks that every OM is prepared successfully. |
| */ |
| @Test |
| public void testPrepareWithTransactions() throws Exception { |
| long prepareIndex = submitPrepareRequest(); |
| assertClusterPrepared(prepareIndex); |
| assertRatisLogsCleared(); |
| |
| submitCancelPrepareRequest(); |
| assertClusterNotPrepared(); |
| |
| String volumeName = VOLUME + UUID.randomUUID().toString(); |
| Set<String> writtenKeys = writeKeysAndWaitForLogs(volumeName, 50, |
| cluster.getOzoneManagersList()); |
| prepareIndex = submitPrepareRequest(); |
| |
| // Make sure all OMs are prepared and all OMs still have their data. |
| assertClusterPrepared(prepareIndex); |
| assertRatisLogsCleared(); |
| assertKeysWritten(volumeName, writtenKeys); |
| |
| // Should be able to "prepare" the OM group again. |
| assertShouldBeAbleToPrepare(); |
| } |
| |
| /** |
| * Writes data to the cluster. |
| * Shuts down one OM. |
| * Writes more data to the cluster. |
| * Submits prepare as ratis request. |
| * Checks that two live OMs are prepared. |
| * Revives the third OM |
| * Checks that third OM received all transactions and is prepared. |
| * @throws Exception |
| */ |
| @Test |
| @Disabled("RATIS-1481") // until upgrade to Ratis 2.3.0 |
| public void testPrepareDownedOM() throws Exception { |
| // Index of the OM that will be shut down during this test. |
| final int shutdownOMIndex = 2; |
| List<OzoneManager> runningOms = cluster.getOzoneManagersList(); |
| |
| String volumeName1 = VOLUME + UUID.randomUUID().toString(); |
| // Create keys with all 3 OMs up. |
| Set<String> writtenKeysBeforeOmShutDown = writeKeysAndWaitForLogs( |
| volumeName1, 10, runningOms); |
| |
| // Shut down one OM. |
| cluster.stopOzoneManager(shutdownOMIndex); |
| OzoneManager downedOM = cluster.getOzoneManager(shutdownOMIndex); |
| Assert.assertFalse(downedOM.isRunning()); |
| Assert.assertEquals(runningOms.remove(shutdownOMIndex), downedOM); |
| |
| // Write keys with the remaining OMs up. |
| String volumeName2 = VOLUME + UUID.randomUUID().toString(); |
| Set<String> writtenKeysAfterOmShutDown = |
| writeKeysAndWaitForLogs(volumeName2, 10, runningOms); |
| |
| long prepareIndex = submitPrepareRequest(); |
| |
| // Check that the two live OMs are prepared. |
| assertClusterPrepared(prepareIndex, runningOms); |
| |
| // Restart the downed OM and wait for it to catch up. |
| // Since prepare was the last Ratis transaction, it should have all data |
| // it missed once it receives the prepare transaction. |
| cluster.restartOzoneManager(downedOM, true); |
| runningOms.add(shutdownOMIndex, downedOM); |
| ExitUtils.assertNotTerminated(); |
| |
| // Make sure all OMs are prepared and still have data. |
| assertClusterPrepared(prepareIndex, runningOms); |
| assertKeysWritten(volumeName1, writtenKeysBeforeOmShutDown, runningOms); |
| assertKeysWritten(volumeName2, writtenKeysAfterOmShutDown, runningOms); |
| |
| // Cancelling prepare state of the cluster to try out an operation. |
| submitCancelPrepareRequest(); |
| assertClusterNotPrepared(); |
| |
| // Should be able to write data to all 3 OMs. |
| String volumeName3 = VOLUME + UUID.randomUUID().toString(); |
| store.createVolume(volumeName3); |
| for (OzoneManager om : runningOms) { |
| LambdaTestUtils.await(WAIT_TIMEOUT_MILLIS, 1000, () -> { |
| OMMetadataManager metadataManager = om.getMetadataManager(); |
| String volumeKey = metadataManager.getVolumeKey(volumeName3); |
| return metadataManager.getVolumeTable().get(volumeKey) != null; |
| }); |
| } |
| } |
| |
| @Test |
| public void testPrepareWithRestart() throws Exception { |
| // Create fresh cluster for this test to prevent timeout from restarting |
| // modified cluster. |
| shutdown(); |
| init(); |
| setup(); |
| |
| String volumeName = VOLUME + UUID.randomUUID().toString(); |
| writeKeysAndWaitForLogs(volumeName, 10); |
| |
| long prepareIndex = submitPrepareRequest(); |
| assertClusterPrepared(prepareIndex); |
| |
| // Restart all ozone managers. |
| cluster.restartOzoneManager(); |
| |
| // No check for cleared logs, since Ratis meta transactions may slip in |
| // on restart. |
| assertClusterPrepared(prepareIndex); |
| } |
| |
| @Slow("Saving on CI time since this is a pessimistic test. We should not " + |
| "be able to do anything with 2 OMs down.") |
| @Test |
| public void testPrepareFailsWhenTwoOmsAreDown() throws Exception { |
| // Shut down 2 OMs. |
| for (int i : Arrays.asList(1, 2)) { |
| cluster.stopOzoneManager(i); |
| OzoneManager downedOM = cluster.getOzoneManager(i); |
| Assert.assertFalse(downedOM.isRunning()); |
| } |
| |
| LambdaTestUtils.intercept(IOException.class, |
| () -> clientProtocol.getOzoneManagerClient().prepareOzoneManager( |
| PREPARE_FLUSH_WAIT_TIMEOUT_SECONDS, |
| PREPARE_FLUSH_INTERVAL_SECONDS)); |
| } |
| |
| /** |
| * Issues requests on ten different threads, for which one is a prepare and |
| * the rest are create volume. We cannot be sure of the exact order that |
| * the requests will execute, so this test checks that the cluster ends in |
| * a prepared state, and that create volume requests either succeed, or fail |
| * indicating the cluster was prepared before they were encountered. |
| * |
| * @throws Exception |
| */ |
| @Test |
| public void testPrepareWithMultipleThreads() throws Exception { |
| final int numThreads = 10; |
| final int prepareTaskIndex = 5; |
| |
| ExecutorService executorService = Executors.newFixedThreadPool(numThreads); |
| // For the prepare task, the future will return a log index. |
| // For the create volume tasks, 0 (dummy value) will be returned. |
| List<Future<Long>> tasks = new ArrayList<>(); |
| |
| for (int i = 0; i < numThreads; i++) { |
| Callable<Long> task; |
| if (i == prepareTaskIndex) { |
| task = this::submitPrepareRequest; |
| } else { |
| String volumeName = VOLUME + i; |
| task = () -> { |
| clientProtocol.createVolume(volumeName); |
| return 0L; |
| }; |
| } |
| tasks.add(executorService.submit(task)); |
| } |
| |
| // For each task, wait for it to complete and check its result. |
| for (int i = 0; i < numThreads; i++) { |
| Future<Long> future = tasks.get(i); |
| |
| if (i == prepareTaskIndex) { |
| assertClusterPrepared(future.get()); |
| assertRatisLogsCleared(); |
| } else { |
| try { |
| // If this throws an exception, it should be an OMException |
| // indicating failure because the cluster was already prepared. |
| // If no exception is thrown, the volume should be created. |
| future.get(); |
| String volumeName = VOLUME + i; |
| Assert.assertTrue(clientProtocol.listVolumes(volumeName, "", 1) |
| .stream() |
| .anyMatch((vol) -> vol.getName().equals(volumeName))); |
| } catch (ExecutionException ex) { |
| Throwable cause = ex.getCause(); |
| Assert.assertTrue(cause instanceof OMException); |
| Assert.assertEquals( |
| OMException.ResultCodes.NOT_SUPPORTED_OPERATION_WHEN_PREPARED, |
| ((OMException) cause).getResult()); |
| } |
| } |
| } |
| |
| // In the above loop, we have waited for all threads to terminate. |
| executorService.shutdown(); |
| } |
| |
| @Test |
| public void testCancelPrepare() throws Exception { |
| String volumeName = VOLUME + UUID.randomUUID().toString(); |
| Set<String> writtenKeys = writeKeysAndWaitForLogs(volumeName, 10); |
| long prepareIndex = submitPrepareRequest(); |
| |
| // Make sure all OMs are prepared and all OMs still have their data. |
| assertClusterPrepared(prepareIndex); |
| assertRatisLogsCleared(); |
| assertKeysWritten(volumeName, writtenKeys); |
| |
| // Cancel prepare and check that data is still present. |
| submitCancelPrepareRequest(); |
| assertClusterNotPrepared(); |
| assertKeysWritten(volumeName, writtenKeys); |
| |
| // Cancelling prepare again should have no effect. |
| submitCancelPrepareRequest(); |
| assertClusterNotPrepared(); |
| |
| // Write more data after cancelling prepare. |
| String volumeNameNew = VOLUME + UUID.randomUUID().toString(); |
| writtenKeys = writeKeysAndWaitForLogs(volumeNameNew, 10); |
| |
| // Cancelling prepare again should have no effect and new data should be |
| // preserved. |
| submitCancelPrepareRequest(); |
| assertClusterNotPrepared(); |
| assertKeysWritten(volumeNameNew, writtenKeys); |
| } |
| |
| private boolean logFilesPresentInRatisPeer(OzoneManager om) { |
| String ratisDir = om.getOmRatisServer().getServer().getProperties() |
| .get("raft.server.storage.dir"); |
| String groupIdDirName = |
| om.getOmRatisServer().getServer().getGroupIds().iterator() |
| .next().getUuid().toString(); |
| File logDir = Paths.get(ratisDir, groupIdDirName, "current") |
| .toFile(); |
| |
| File[] files = logDir.listFiles(); |
| if (files != null) { |
| for (File file : files) { |
| if (file.getName().startsWith("log")) { |
| return true; |
| } |
| } |
| } |
| return false; |
| } |
| |
| private Set<String> writeKeysAndWaitForLogs(String volumeName, |
| int numKeys) throws Exception { |
| return writeKeysAndWaitForLogs(volumeName, numKeys, |
| cluster.getOzoneManagersList()); |
| } |
| |
| private Set<String> writeKeysAndWaitForLogs(String volumeName, int numKeys, |
| List<OzoneManager> ozoneManagers) throws Exception { |
| |
| store.createVolume(volumeName); |
| OzoneVolume volume = store.getVolume(volumeName); |
| volume.createBucket(BUCKET); |
| |
| Set<String> writtenKeys = new HashSet<>(); |
| for (int i = 1; i <= numKeys; i++) { |
| String keyName = KEY_PREFIX + i; |
| writeTestData(volumeName, BUCKET, keyName); |
| writtenKeys.add(keyName); |
| } |
| |
| // Make sure all OMs have logs from writing data, so we can check that |
| // they are purged after prepare. |
| for (OzoneManager om : ozoneManagers) { |
| LambdaTestUtils.await(WAIT_TIMEOUT_MILLIS, 1000, |
| () -> logFilesPresentInRatisPeer(om)); |
| } |
| |
| return writtenKeys; |
| } |
| |
| private void writeTestData(String volumeName, |
| String bucketName, String keyName) throws Exception { |
| |
| String keyString = UUID.randomUUID().toString(); |
| byte[] data = ContainerTestHelper.getFixedLengthString( |
| keyString, 100).getBytes(UTF_8); |
| OzoneOutputStream keyStream = TestHelper.createKey( |
| keyName, ReplicationType.RATIS, ReplicationFactor.ONE, |
| 100, store, volumeName, bucketName); |
| keyStream.write(data); |
| keyStream.close(); |
| } |
| |
| private void assertKeysWritten(String volumeName, |
| Set<String> expectedKeys) throws Exception { |
| assertKeysWritten(volumeName, expectedKeys, cluster.getOzoneManagersList()); |
| } |
| |
| /** |
| * Checks that all provided OMs have {@code expectedKeys} in the volume |
| * {@code volumeName} and retries checking until the test timeout. |
| * All provided OMs are checked, not just a majority, so that we can |
| * test that downed OMs are able to make a full recovery after preparation, |
| * even though the cluster could appear healthy with just 2 OMs. |
| */ |
| private void assertKeysWritten(String volumeName, Set<String> expectedKeys, |
| List<OzoneManager> ozoneManagers) throws Exception { |
| for (OzoneManager om: ozoneManagers) { |
| // Wait for a potentially slow follower to apply all key writes. |
| LambdaTestUtils.await(WAIT_TIMEOUT_MILLIS, 1000, () -> { |
| List<OmKeyInfo> keys = om.getMetadataManager().listKeys(volumeName, |
| BUCKET, null, KEY_PREFIX, 100); |
| |
| boolean allKeysFound = (expectedKeys.size() == keys.size()); |
| if (!allKeysFound) { |
| LOG.info("In {} waiting for number of keys {} to equal " + |
| "expected number of keys {}.", om.getOMNodeId(), |
| keys.size(), expectedKeys.size()); |
| } else { |
| for (OmKeyInfo keyInfo : keys) { |
| if (!expectedKeys.contains(keyInfo.getKeyName())) { |
| allKeysFound = false; |
| LOG.info("In {} expected keys did not contain key {}", |
| om.getOMNodeId(), keyInfo.getKeyName()); |
| break; |
| } |
| } |
| } |
| |
| return allKeysFound; |
| }); |
| } |
| } |
| |
| private long submitPrepareRequest() throws Exception { |
| return clientProtocol.getOzoneManagerClient() |
| .prepareOzoneManager(PREPARE_FLUSH_WAIT_TIMEOUT_SECONDS, |
| PREPARE_FLUSH_INTERVAL_SECONDS); |
| } |
| |
| private void submitCancelPrepareRequest() throws Exception { |
| clientProtocol.getOzoneManagerClient().cancelOzoneManagerPrepare(); |
| } |
| |
| private void assertClusterPrepared(long expectedPreparedIndex) |
| throws Exception { |
| assertClusterPrepared(expectedPreparedIndex, |
| cluster.getOzoneManagersList()); |
| } |
| |
| private void assertClusterPrepared(long expectedPreparedIndex, |
| List<OzoneManager> ozoneManagers) throws Exception { |
| |
| for (OzoneManager om : ozoneManagers) { |
| // Wait for each OM to be running and transaction info to match to know |
| // it is prepared. |
| LambdaTestUtils.await(WAIT_TIMEOUT_MILLIS, |
| 1000, () -> { |
| if (!om.isRunning()) { |
| LOG.info("{} is not yet started.", om.getOMNodeId()); |
| return false; |
| } else { |
| OzoneManagerPrepareState.State state = |
| om.getPrepareState().getState(); |
| |
| LOG.info("{} has prepare status: {} prepare index: {}.", |
| om.getOMNodeId(), state.getStatus(), state.getIndex()); |
| |
| return (state.getStatus() == PrepareStatus.PREPARE_COMPLETED) && |
| (state.getIndex() >= expectedPreparedIndex); |
| } |
| }); |
| } |
| |
| // Submitting a read request should pass. |
| clientProtocol.listVolumes(VOLUME, "", 100); |
| |
| // Submitting write request should fail. |
| try { |
| clientProtocol.createVolume("vol"); |
| Assert.fail("Write request should fail when OM is in prepare mode."); |
| } catch (OMException ex) { |
| Assert.assertEquals( |
| OMException.ResultCodes.NOT_SUPPORTED_OPERATION_WHEN_PREPARED, |
| ex.getResult()); |
| } |
| } |
| |
| private void assertClusterNotPrepared() throws Exception { |
| assertClusterNotPrepared(cluster.getOzoneManagersList()); |
| } |
| |
| private void assertClusterNotPrepared(List<OzoneManager> ozoneManagers) |
| throws Exception { |
| for (OzoneManager om : ozoneManagers) { |
| LambdaTestUtils.await(WAIT_TIMEOUT_MILLIS, |
| 1000, () -> { |
| if (!om.isRunning()) { |
| return false; |
| } else { |
| return om.getPrepareState().getState().getStatus() == |
| PrepareStatus.NOT_PREPARED; |
| } |
| }); |
| } |
| |
| // Submitting a read request should pass. |
| clientProtocol.listVolumes(VOLUME, "", 100); |
| |
| // Submitting write request should also pass. |
| clientProtocol.createVolume("vol"); |
| clientProtocol.deleteVolume("vol"); |
| } |
| |
| private void assertRatisLogsCleared() throws Exception { |
| assertRatisLogsCleared(cluster.getOzoneManagersList()); |
| } |
| |
| private void assertRatisLogsCleared(List<OzoneManager> ozoneManagers) |
| throws Exception { |
| for (OzoneManager om: ozoneManagers) { |
| LambdaTestUtils.await(WAIT_TIMEOUT_MILLIS, 1000, |
| () -> !logFilesPresentInRatisPeer(om)); |
| } |
| } |
| |
| private void assertShouldBeAbleToPrepare() throws Exception { |
| long prepareIndex = submitPrepareRequest(); |
| assertClusterPrepared(prepareIndex); |
| assertRatisLogsCleared(); |
| } |
| } |