| /** |
| * Licensed to the Apache Software Foundation (ASF) under one or more |
| * contributor license agreements. See the NOTICE file distributed with this |
| * work for additional information regarding copyright ownership. The ASF |
| * licenses this file to you under the Apache License, Version 2.0 (the |
| * "License"); you may not use this file except in compliance with the License. |
| * You may obtain a copy of the License at |
| * <p> |
| * http://www.apache.org/licenses/LICENSE-2.0 |
| * <p> |
| * Unless required by applicable law or agreed to in writing, software |
| * distributed under the License is distributed on an "AS IS" BASIS, WITHOUT |
| * WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. See the |
| * License for the specific language governing permissions and limitations under |
| * the License. |
| */ |
| package org.apache.hadoop.ozone.om; |
| |
| import java.io.File; |
| import java.io.IOException; |
| import java.nio.file.Path; |
| import java.util.ArrayList; |
| import java.util.List; |
| import java.util.UUID; |
| import java.util.concurrent.ExecutorService; |
| import java.util.concurrent.Executors; |
| import java.util.concurrent.Future; |
| |
| import org.apache.commons.lang3.RandomStringUtils; |
| import org.apache.hadoop.hdds.conf.OzoneConfiguration; |
| import org.apache.hadoop.hdds.conf.StorageUnit; |
| import org.apache.hadoop.hdds.utils.db.DBCheckpoint; |
| import org.apache.hadoop.ozone.MiniOzoneCluster; |
| import org.apache.hadoop.ozone.MiniOzoneHAClusterImpl; |
| import org.apache.hadoop.ozone.client.ObjectStore; |
| import org.apache.hadoop.ozone.client.OzoneBucket; |
| import org.apache.hadoop.ozone.client.OzoneClientFactory; |
| import org.apache.hadoop.ozone.client.OzoneKeyDetails; |
| import org.apache.hadoop.ozone.client.OzoneVolume; |
| import org.apache.hadoop.ozone.client.VolumeArgs; |
| import org.apache.hadoop.hdds.utils.TransactionInfo; |
| import org.apache.hadoop.ozone.client.io.OzoneInputStream; |
| import org.apache.hadoop.ozone.om.helpers.BucketLayout; |
| import org.apache.hadoop.ozone.om.ratis.OzoneManagerRatisServer; |
| import org.apache.hadoop.ozone.om.ratis.utils.OzoneManagerRatisUtils; |
| import org.apache.hadoop.hdds.ExitManager; |
| import org.apache.ozone.test.GenericTestUtils; |
| import org.apache.ratis.server.protocol.TermIndex; |
| |
| import static org.apache.hadoop.ozone.om.TestOzoneManagerHAWithData.createKey; |
| import static org.junit.Assert.assertTrue; |
| |
| import org.assertj.core.api.Fail; |
| import org.junit.Assert; |
| import org.junit.Ignore; |
| import org.junit.jupiter.api.AfterEach; |
| import org.junit.jupiter.api.BeforeEach; |
| import org.junit.jupiter.api.Test; |
| import org.junit.jupiter.api.Timeout; |
| import org.slf4j.Logger; |
| import org.slf4j.event.Level; |
| |
| /** |
| * Tests the Ratis snapshots feature in OM. |
| */ |
| @Timeout(5000) |
| public class TestOMRatisSnapshots { |
| |
| private MiniOzoneHAClusterImpl cluster = null; |
| private ObjectStore objectStore; |
| private OzoneConfiguration conf; |
| private String clusterId; |
| private String scmId; |
| private String omServiceId; |
| private int numOfOMs = 3; |
| private OzoneBucket ozoneBucket; |
| private String volumeName; |
| private String bucketName; |
| |
| private static final long SNAPSHOT_THRESHOLD = 50; |
| private static final int LOG_PURGE_GAP = 50; |
| |
| /** |
| * Create a MiniOzoneCluster for testing. The cluster initially has one |
| * inactive OM. So at the start of the cluster, there will be 2 active and 1 |
| * inactive OM. |
| * |
| * @throws IOException |
| */ |
| @BeforeEach |
| public void init() throws Exception { |
| conf = new OzoneConfiguration(); |
| clusterId = UUID.randomUUID().toString(); |
| scmId = UUID.randomUUID().toString(); |
| omServiceId = "om-service-test1"; |
| conf.setInt(OMConfigKeys.OZONE_OM_RATIS_LOG_PURGE_GAP, LOG_PURGE_GAP); |
| conf.setStorageSize(OMConfigKeys.OZONE_OM_RATIS_SEGMENT_SIZE_KEY, 16, |
| StorageUnit.KB); |
| conf.setStorageSize(OMConfigKeys. |
| OZONE_OM_RATIS_SEGMENT_PREALLOCATED_SIZE_KEY, 16, StorageUnit.KB); |
| conf.setLong( |
| OMConfigKeys.OZONE_OM_RATIS_SNAPSHOT_AUTO_TRIGGER_THRESHOLD_KEY, |
| SNAPSHOT_THRESHOLD); |
| cluster = (MiniOzoneHAClusterImpl) MiniOzoneCluster.newOMHABuilder(conf) |
| .setClusterId(clusterId) |
| .setScmId(scmId) |
| .setOMServiceId("om-service-test1") |
| .setNumOfOzoneManagers(numOfOMs) |
| .setNumOfActiveOMs(2) |
| .build(); |
| cluster.waitForClusterToBeReady(); |
| objectStore = OzoneClientFactory.getRpcClient(omServiceId, conf) |
| .getObjectStore(); |
| |
| volumeName = "volume" + RandomStringUtils.randomNumeric(5); |
| bucketName = "bucket" + RandomStringUtils.randomNumeric(5); |
| |
| VolumeArgs createVolumeArgs = VolumeArgs.newBuilder() |
| .setOwner("user" + RandomStringUtils.randomNumeric(5)) |
| .setAdmin("admin" + RandomStringUtils.randomNumeric(5)) |
| .build(); |
| |
| objectStore.createVolume(volumeName, createVolumeArgs); |
| OzoneVolume retVolumeinfo = objectStore.getVolume(volumeName); |
| |
| retVolumeinfo.createBucket(bucketName); |
| ozoneBucket = retVolumeinfo.getBucket(bucketName); |
| } |
| |
| /** |
| * Shutdown MiniDFSCluster. |
| */ |
| @AfterEach |
| public void shutdown() { |
| if (cluster != null) { |
| cluster.shutdown(); |
| } |
| } |
| |
| @Test |
| public void testInstallSnapshot() throws Exception { |
| // Get the leader OM |
| String leaderOMNodeId = OmFailoverProxyUtil |
| .getFailoverProxyProvider(objectStore.getClientProxy()) |
| .getCurrentProxyOMNodeId(); |
| |
| OzoneManager leaderOM = cluster.getOzoneManager(leaderOMNodeId); |
| OzoneManagerRatisServer leaderRatisServer = leaderOM.getOmRatisServer(); |
| |
| // Find the inactive OM |
| String followerNodeId = leaderOM.getPeerNodes().get(0).getNodeId(); |
| if (cluster.isOMActive(followerNodeId)) { |
| followerNodeId = leaderOM.getPeerNodes().get(1).getNodeId(); |
| } |
| OzoneManager followerOM = cluster.getOzoneManager(followerNodeId); |
| |
| // Do some transactions so that the log index increases |
| List<String> keys = writeKeysToIncreaseLogIndex(leaderRatisServer, 200); |
| |
| // Get the latest db checkpoint from the leader OM. |
| TransactionInfo transactionInfo = |
| TransactionInfo.readTransactionInfo(leaderOM.getMetadataManager()); |
| TermIndex leaderOMTermIndex = |
| TermIndex.valueOf(transactionInfo.getTerm(), |
| transactionInfo.getTransactionIndex()); |
| long leaderOMSnapshotIndex = leaderOMTermIndex.getIndex(); |
| long leaderOMSnapshotTermIndex = leaderOMTermIndex.getTerm(); |
| |
| // Start the inactive OM. Checkpoint installation will happen spontaneously. |
| cluster.startInactiveOM(followerNodeId); |
| GenericTestUtils.LogCapturer logCapture = |
| GenericTestUtils.LogCapturer.captureLogs(OzoneManager.LOG); |
| |
| // The recently started OM should be lagging behind the leader OM. |
| // Wait & for follower to update transactions to leader snapshot index. |
| // Timeout error if follower does not load update within 3s |
| GenericTestUtils.waitFor(() -> { |
| return followerOM.getOmRatisServer().getLastAppliedTermIndex().getIndex() |
| >= leaderOMSnapshotIndex - 1; |
| }, 100, 3000); |
| |
| long followerOMLastAppliedIndex = |
| followerOM.getOmRatisServer().getLastAppliedTermIndex().getIndex(); |
| assertTrue( |
| followerOMLastAppliedIndex >= leaderOMSnapshotIndex - 1); |
| |
| // After the new checkpoint is installed, the follower OM |
| // lastAppliedIndex must >= the snapshot index of the checkpoint. It |
| // could be great than snapshot index if there is any conf entry from ratis. |
| followerOMLastAppliedIndex = followerOM.getOmRatisServer() |
| .getLastAppliedTermIndex().getIndex(); |
| assertTrue(followerOMLastAppliedIndex >= leaderOMSnapshotIndex); |
| assertTrue(followerOM.getOmRatisServer().getLastAppliedTermIndex() |
| .getTerm() >= leaderOMSnapshotTermIndex); |
| |
| // Verify checkpoint installation was happened. |
| String msg = "Reloaded OM state"; |
| Assert.assertTrue(logCapture.getOutput().contains(msg)); |
| |
| // Verify that the follower OM's DB contains the transactions which were |
| // made while it was inactive. |
| OMMetadataManager followerOMMetaMngr = followerOM.getMetadataManager(); |
| Assert.assertNotNull(followerOMMetaMngr.getVolumeTable().get( |
| followerOMMetaMngr.getVolumeKey(volumeName))); |
| Assert.assertNotNull(followerOMMetaMngr.getBucketTable().get( |
| followerOMMetaMngr.getBucketKey(volumeName, bucketName))); |
| for (String key : keys) { |
| Assert.assertNotNull(followerOMMetaMngr.getKeyTable( |
| getDefaultBucketLayout()) |
| .get(followerOMMetaMngr.getOzoneKey(volumeName, bucketName, key))); |
| } |
| |
| // Verify RPC server is running |
| GenericTestUtils.waitFor(() -> { |
| return followerOM.isOmRpcServerRunning(); |
| }, 100, 5000); |
| |
| Assert.assertTrue(logCapture.getOutput().contains( |
| "Install Checkpoint is finished")); |
| |
| // Read & Write after snapshot installed. |
| List<String> newKeys = writeKeys(1); |
| readKeys(newKeys); |
| // TODO: Enable this part after RATIS-1481 used |
| /* |
| Assert.assertNotNull(followerOMMetaMngr.getKeyTable( |
| getDefaultBucketLayout()).get(followerOMMetaMngr.getOzoneKey( |
| volumeName, bucketName, newKeys.get(0)))); |
| */ |
| } |
| |
| @Ignore("Enable this unit test after RATIS-1481 used") |
| public void testInstallSnapshotWithClientWrite() throws Exception { |
| // Get the leader OM |
| String leaderOMNodeId = OmFailoverProxyUtil |
| .getFailoverProxyProvider(objectStore.getClientProxy()) |
| .getCurrentProxyOMNodeId(); |
| |
| OzoneManager leaderOM = cluster.getOzoneManager(leaderOMNodeId); |
| OzoneManagerRatisServer leaderRatisServer = leaderOM.getOmRatisServer(); |
| |
| // Find the inactive OM |
| String followerNodeId = leaderOM.getPeerNodes().get(0).getNodeId(); |
| if (cluster.isOMActive(followerNodeId)) { |
| followerNodeId = leaderOM.getPeerNodes().get(1).getNodeId(); |
| } |
| OzoneManager followerOM = cluster.getOzoneManager(followerNodeId); |
| |
| // Do some transactions so that the log index increases |
| List<String> keys = writeKeysToIncreaseLogIndex(leaderRatisServer, 200); |
| |
| // Get the latest db checkpoint from the leader OM. |
| TransactionInfo transactionInfo = |
| TransactionInfo.readTransactionInfo(leaderOM.getMetadataManager()); |
| TermIndex leaderOMTermIndex = |
| TermIndex.valueOf(transactionInfo.getTerm(), |
| transactionInfo.getTransactionIndex()); |
| long leaderOMSnapshotIndex = leaderOMTermIndex.getIndex(); |
| long leaderOMSnapshotTermIndex = leaderOMTermIndex.getTerm(); |
| |
| // Start the inactive OM. Checkpoint installation will happen spontaneously. |
| cluster.startInactiveOM(followerNodeId); |
| GenericTestUtils.LogCapturer logCapture = |
| GenericTestUtils.LogCapturer.captureLogs(OzoneManager.LOG); |
| |
| // Continuously create new keys |
| ExecutorService executor = Executors.newFixedThreadPool(1); |
| Future<List<String>> writeFuture = executor.submit(() -> { |
| return writeKeys(200); |
| }); |
| List<String> newKeys = writeFuture.get(); |
| |
| // Wait checkpoint installation to finish |
| Thread.sleep(5000); |
| |
| // The recently started OM should be lagging behind the leader OM. |
| // Wait & for follower to update transactions to leader snapshot index. |
| // Timeout error if follower does not load update within 3s |
| GenericTestUtils.waitFor(() -> { |
| return followerOM.getOmRatisServer().getLastAppliedTermIndex().getIndex() |
| >= leaderOMSnapshotIndex - 1; |
| }, 100, 3000); |
| |
| // Verify checkpoint installation was happened. |
| String msg = "Reloaded OM state"; |
| Assert.assertTrue(logCapture.getOutput().contains(msg)); |
| Assert.assertTrue(logCapture.getOutput().contains( |
| "Install Checkpoint is finished")); |
| |
| long followerOMLastAppliedIndex = |
| followerOM.getOmRatisServer().getLastAppliedTermIndex().getIndex(); |
| assertTrue( |
| followerOMLastAppliedIndex >= leaderOMSnapshotIndex - 1); |
| |
| // After the new checkpoint is installed, the follower OM |
| // lastAppliedIndex must >= the snapshot index of the checkpoint. It |
| // could be great than snapshot index if there is any conf entry from ratis. |
| followerOMLastAppliedIndex = followerOM.getOmRatisServer() |
| .getLastAppliedTermIndex().getIndex(); |
| assertTrue(followerOMLastAppliedIndex >= leaderOMSnapshotIndex); |
| assertTrue(followerOM.getOmRatisServer().getLastAppliedTermIndex() |
| .getTerm() >= leaderOMSnapshotTermIndex); |
| |
| // Verify that the follower OM's DB contains the transactions which were |
| // made while it was inactive. |
| OMMetadataManager followerOMMetaMgr = followerOM.getMetadataManager(); |
| Assert.assertNotNull(followerOMMetaMgr.getVolumeTable().get( |
| followerOMMetaMgr.getVolumeKey(volumeName))); |
| Assert.assertNotNull(followerOMMetaMgr.getBucketTable().get( |
| followerOMMetaMgr.getBucketKey(volumeName, bucketName))); |
| for (String key : keys) { |
| Assert.assertNotNull(followerOMMetaMgr.getKeyTable( |
| getDefaultBucketLayout()) |
| .get(followerOMMetaMgr.getOzoneKey(volumeName, bucketName, key))); |
| } |
| OMMetadataManager leaderOmMetaMgr = leaderOM.getMetadataManager(); |
| for (String key : newKeys) { |
| Assert.assertNotNull(leaderOmMetaMgr.getKeyTable( |
| getDefaultBucketLayout()) |
| .get(followerOMMetaMgr.getOzoneKey(volumeName, bucketName, key))); |
| } |
| Thread.sleep(5000); |
| followerOMMetaMgr = followerOM.getMetadataManager(); |
| for (String key : newKeys) { |
| Assert.assertNotNull(followerOMMetaMgr.getKeyTable( |
| getDefaultBucketLayout()) |
| .get(followerOMMetaMgr.getOzoneKey(volumeName, bucketName, key))); |
| } |
| // Read newly created keys |
| readKeys(newKeys); |
| System.out.println("All data are replicated"); |
| } |
| |
| @Test |
| public void testInstallSnapshotWithClientRead() throws Exception { |
| // Get the leader OM |
| String leaderOMNodeId = OmFailoverProxyUtil |
| .getFailoverProxyProvider(objectStore.getClientProxy()) |
| .getCurrentProxyOMNodeId(); |
| |
| OzoneManager leaderOM = cluster.getOzoneManager(leaderOMNodeId); |
| OzoneManagerRatisServer leaderRatisServer = leaderOM.getOmRatisServer(); |
| |
| // Find the inactive OM |
| String followerNodeId = leaderOM.getPeerNodes().get(0).getNodeId(); |
| if (cluster.isOMActive(followerNodeId)) { |
| followerNodeId = leaderOM.getPeerNodes().get(1).getNodeId(); |
| } |
| OzoneManager followerOM = cluster.getOzoneManager(followerNodeId); |
| |
| // Do some transactions so that the log index increases |
| List<String> keys = writeKeysToIncreaseLogIndex(leaderRatisServer, 200); |
| |
| // Get transaction Index |
| TransactionInfo transactionInfo = |
| TransactionInfo.readTransactionInfo(leaderOM.getMetadataManager()); |
| TermIndex leaderOMTermIndex = |
| TermIndex.valueOf(transactionInfo.getTerm(), |
| transactionInfo.getTransactionIndex()); |
| long leaderOMSnapshotIndex = leaderOMTermIndex.getIndex(); |
| long leaderOMSnapshotTermIndex = leaderOMTermIndex.getTerm(); |
| |
| // Start the inactive OM. Checkpoint installation will happen spontaneously. |
| cluster.startInactiveOM(followerNodeId); |
| GenericTestUtils.LogCapturer logCapture = |
| GenericTestUtils.LogCapturer.captureLogs(OzoneManager.LOG); |
| |
| // Continuously read keys |
| ExecutorService executor = Executors.newFixedThreadPool(1); |
| Future<Void> readFuture = executor.submit(() -> { |
| try { |
| getKeys(keys, 10); |
| readKeys(keys); |
| } catch (IOException e) { |
| Fail.fail("Read Key failed", e); |
| } |
| return null; |
| }); |
| readFuture.get(); |
| |
| // The recently started OM should be lagging behind the leader OM. |
| // Wait & for follower to update transactions to leader snapshot index. |
| // Timeout error if follower does not load update within 3s |
| GenericTestUtils.waitFor(() -> { |
| return followerOM.getOmRatisServer().getLastAppliedTermIndex().getIndex() |
| >= leaderOMSnapshotIndex - 1; |
| }, 100, 3000); |
| |
| long followerOMLastAppliedIndex = |
| followerOM.getOmRatisServer().getLastAppliedTermIndex().getIndex(); |
| assertTrue( |
| followerOMLastAppliedIndex >= leaderOMSnapshotIndex - 1); |
| |
| // After the new checkpoint is installed, the follower OM |
| // lastAppliedIndex must >= the snapshot index of the checkpoint. It |
| // could be great than snapshot index if there is any conf entry from ratis. |
| followerOMLastAppliedIndex = followerOM.getOmRatisServer() |
| .getLastAppliedTermIndex().getIndex(); |
| assertTrue(followerOMLastAppliedIndex >= leaderOMSnapshotIndex); |
| assertTrue(followerOM.getOmRatisServer().getLastAppliedTermIndex() |
| .getTerm() >= leaderOMSnapshotTermIndex); |
| |
| // Verify that the follower OM's DB contains the transactions which were |
| // made while it was inactive. |
| OMMetadataManager followerOMMetaMngr = followerOM.getMetadataManager(); |
| Assert.assertNotNull(followerOMMetaMngr.getVolumeTable().get( |
| followerOMMetaMngr.getVolumeKey(volumeName))); |
| Assert.assertNotNull(followerOMMetaMngr.getBucketTable().get( |
| followerOMMetaMngr.getBucketKey(volumeName, bucketName))); |
| for (String key : keys) { |
| Assert.assertNotNull(followerOMMetaMngr.getKeyTable( |
| getDefaultBucketLayout()) |
| .get(followerOMMetaMngr.getOzoneKey(volumeName, bucketName, key))); |
| } |
| |
| // Wait installation finish |
| Thread.sleep(5000); |
| // Verify checkpoint installation was happened. |
| Assert.assertTrue(logCapture.getOutput().contains("Reloaded OM state")); |
| Assert.assertTrue(logCapture.getOutput().contains( |
| "Install Checkpoint is finished")); |
| } |
| |
| @Test |
| public void testInstallOldCheckpointFailure() throws Exception { |
| // Get the leader OM |
| String leaderOMNodeId = OmFailoverProxyUtil |
| .getFailoverProxyProvider(objectStore.getClientProxy()) |
| .getCurrentProxyOMNodeId(); |
| |
| OzoneManager leaderOM = cluster.getOzoneManager(leaderOMNodeId); |
| |
| // Find the inactive OM and start it |
| String followerNodeId = leaderOM.getPeerNodes().get(0).getNodeId(); |
| if (cluster.isOMActive(followerNodeId)) { |
| followerNodeId = leaderOM.getPeerNodes().get(1).getNodeId(); |
| } |
| cluster.startInactiveOM(followerNodeId); |
| GenericTestUtils.setLogLevel(OzoneManager.LOG, Level.INFO); |
| GenericTestUtils.LogCapturer logCapture = |
| GenericTestUtils.LogCapturer.captureLogs(OzoneManager.LOG); |
| |
| OzoneManager followerOM = cluster.getOzoneManager(followerNodeId); |
| OzoneManagerRatisServer followerRatisServer = followerOM.getOmRatisServer(); |
| |
| // Do some transactions so that the log index increases on follower OM |
| writeKeysToIncreaseLogIndex(followerRatisServer, 100); |
| |
| TermIndex leaderCheckpointTermIndex = leaderOM.getOmRatisServer() |
| .getLastAppliedTermIndex(); |
| DBCheckpoint leaderDbCheckpoint = leaderOM.getMetadataManager().getStore() |
| .getCheckpoint(false); |
| |
| // Do some more transactions to increase the log index further on |
| // follower OM such that it is more than the checkpoint index taken on |
| // leader OM. |
| writeKeysToIncreaseLogIndex(followerOM.getOmRatisServer(), |
| leaderCheckpointTermIndex.getIndex() + 100); |
| |
| // Install the old checkpoint on the follower OM. This should fail as the |
| // followerOM is already ahead of that transactionLogIndex and the OM |
| // state should be reloaded. |
| TermIndex followerTermIndex = followerRatisServer.getLastAppliedTermIndex(); |
| TermIndex newTermIndex = followerOM.installCheckpoint( |
| leaderOMNodeId, leaderDbCheckpoint); |
| |
| String errorMsg = "Cannot proceed with InstallSnapshot as OM is at " + |
| "TermIndex " + followerTermIndex + " and checkpoint has lower " + |
| "TermIndex"; |
| Assert.assertTrue(logCapture.getOutput().contains(errorMsg)); |
| Assert.assertNull("OM installed checkpoint even though checkpoint " + |
| "logIndex is less than it's lastAppliedIndex", newTermIndex); |
| Assert.assertEquals(followerTermIndex, |
| followerRatisServer.getLastAppliedTermIndex()); |
| String msg = "OM DB is not stopped. Started services with Term: " + |
| followerTermIndex.getTerm() + " and Index: " + |
| followerTermIndex.getIndex(); |
| Assert.assertTrue(logCapture.getOutput().contains(msg)); |
| } |
| |
| @Test |
| public void testInstallCorruptedCheckpointFailure() throws Exception { |
| // Get the leader OM |
| String leaderOMNodeId = OmFailoverProxyUtil |
| .getFailoverProxyProvider(objectStore.getClientProxy()) |
| .getCurrentProxyOMNodeId(); |
| |
| OzoneManager leaderOM = cluster.getOzoneManager(leaderOMNodeId); |
| OzoneManagerRatisServer leaderRatisServer = leaderOM.getOmRatisServer(); |
| |
| // Find the inactive OM |
| String followerNodeId = leaderOM.getPeerNodes().get(0).getNodeId(); |
| if (cluster.isOMActive(followerNodeId)) { |
| followerNodeId = leaderOM.getPeerNodes().get(1).getNodeId(); |
| } |
| OzoneManager followerOM = cluster.getOzoneManager(followerNodeId); |
| |
| // Do some transactions so that the log index increases |
| writeKeysToIncreaseLogIndex(leaderRatisServer, 100); |
| |
| DBCheckpoint leaderDbCheckpoint = leaderOM.getMetadataManager().getStore() |
| .getCheckpoint(false); |
| Path leaderCheckpointLocation = leaderDbCheckpoint.getCheckpointLocation(); |
| TransactionInfo leaderCheckpointTrxnInfo = OzoneManagerRatisUtils |
| .getTrxnInfoFromCheckpoint(conf, leaderCheckpointLocation); |
| |
| // Corrupt the leader checkpoint and install that on the OM. The |
| // operation should fail and OM should shutdown. |
| boolean delete = true; |
| for (File file : leaderCheckpointLocation.toFile() |
| .listFiles()) { |
| if (file.getName().contains(".sst")) { |
| if (delete) { |
| file.delete(); |
| delete = false; |
| } else { |
| delete = true; |
| } |
| } |
| } |
| |
| GenericTestUtils.setLogLevel(OzoneManager.LOG, Level.INFO); |
| GenericTestUtils.LogCapturer logCapture = |
| GenericTestUtils.LogCapturer.captureLogs(OzoneManager.LOG); |
| followerOM.setExitManagerForTesting(new DummyExitManager()); |
| // Install corrupted checkpoint |
| followerOM.installCheckpoint(leaderOMNodeId, leaderCheckpointLocation, |
| leaderCheckpointTrxnInfo); |
| |
| // Wait checkpoint installation to be finished. |
| GenericTestUtils.waitFor(() -> { |
| Assert.assertTrue(logCapture.getOutput().contains("System Exit: " + |
| "Failed to reload OM state and instantiate services.")); |
| return true; |
| }, 100, 3000); |
| String msg = "RPC server is stopped"; |
| Assert.assertTrue(logCapture.getOutput().contains(msg)); |
| } |
| |
| private List<String> writeKeysToIncreaseLogIndex( |
| OzoneManagerRatisServer omRatisServer, long targetLogIndex) |
| throws IOException, InterruptedException { |
| List<String> keys = new ArrayList<>(); |
| long logIndex = omRatisServer.getLastAppliedTermIndex().getIndex(); |
| while (logIndex < targetLogIndex) { |
| keys.add(createKey(ozoneBucket)); |
| Thread.sleep(100); |
| logIndex = omRatisServer.getLastAppliedTermIndex().getIndex(); |
| } |
| return keys; |
| } |
| |
| private List<String> writeKeys(long keyCount) throws IOException, |
| InterruptedException { |
| List<String> keys = new ArrayList<>(); |
| long index = 0; |
| while (index < keyCount) { |
| keys.add(createKey(ozoneBucket)); |
| index++; |
| } |
| return keys; |
| } |
| |
| private void getKeys(List<String> keys, int round) throws IOException { |
| while (round > 0) { |
| for (String keyName : keys) { |
| OzoneKeyDetails key = ozoneBucket.getKey(keyName); |
| Assert.assertEquals(keyName, key.getName()); |
| } |
| round--; |
| } |
| } |
| |
| private void readKeys(List<String> keys) throws IOException { |
| for (String keyName : keys) { |
| OzoneInputStream inputStream = ozoneBucket.readKey(keyName); |
| byte[] data = new byte[100]; |
| inputStream.read(data, 0, 100); |
| inputStream.close(); |
| } |
| } |
| |
| private static BucketLayout getDefaultBucketLayout() { |
| return BucketLayout.DEFAULT; |
| } |
| |
| private static class DummyExitManager extends ExitManager { |
| @Override |
| public void exitSystem(int status, String message, Throwable throwable, |
| Logger log) { |
| log.error("System Exit: " + message, throwable); |
| } |
| } |
| } |