| /* |
| * Licensed to the Apache Software Foundation (ASF) under one |
| * or more contributor license agreements. See the NOTICE file |
| * distributed with this work for additional information |
| * regarding copyright ownership. The ASF licenses this file |
| * to you under the Apache License, Version 2.0 (the |
| * "License"); you may not use this file except in compliance |
| * with the License. You may obtain a copy of the License at |
| * <p> |
| * http://www.apache.org/licenses/LICENSE-2.0 |
| * <p> |
| * Unless required by applicable law or agreed to in writing, software |
| * distributed under the License is distributed on an "AS IS" BASIS, |
| * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. |
| * See the License for the specific language governing permissions and |
| * limitations under the License. |
| */ |
| |
| package org.apache.hadoop.ozone.om; |
| |
| import javax.servlet.ServletContext; |
| import javax.servlet.ServletInputStream; |
| import javax.servlet.ServletOutputStream; |
| import javax.servlet.WriteListener; |
| import javax.servlet.http.HttpServletRequest; |
| import javax.servlet.http.HttpServletResponse; |
| import java.io.ByteArrayInputStream; |
| import java.io.File; |
| import java.io.FileOutputStream; |
| import java.io.IOException; |
| import java.io.InputStream; |
| import java.io.OutputStream; |
| import java.io.OutputStreamWriter; |
| import java.nio.charset.StandardCharsets; |
| import java.nio.file.Files; |
| import java.nio.file.Path; |
| import java.nio.file.Paths; |
| import java.security.Principal; |
| import java.util.ArrayList; |
| import java.util.Collection; |
| import java.util.Collections; |
| import java.util.Iterator; |
| import java.util.LinkedHashSet; |
| import java.util.HashSet; |
| import java.util.List; |
| import java.util.Set; |
| import java.util.UUID; |
| import java.util.concurrent.ExecutorService; |
| import java.util.concurrent.Executors; |
| import java.util.concurrent.Future; |
| import java.util.concurrent.TimeUnit; |
| import java.util.concurrent.TimeoutException; |
| import java.util.concurrent.atomic.AtomicReference; |
| import java.util.stream.Collectors; |
| import java.util.stream.Stream; |
| |
| import com.google.common.collect.Sets; |
| import org.apache.hadoop.fs.FileUtil; |
| import org.apache.hadoop.hdds.conf.OzoneConfiguration; |
| import org.apache.hadoop.hdds.utils.db.DBCheckpoint; |
| import org.apache.hadoop.hdds.utils.db.DBStore; |
| import org.apache.hadoop.ozone.MiniOzoneCluster; |
| import org.apache.hadoop.ozone.OzoneConsts; |
| import org.apache.hadoop.ozone.TestDataUtil; |
| import org.apache.hadoop.ozone.client.OzoneBucket; |
| import org.apache.hadoop.ozone.lock.BootstrapStateHandler; |
| import org.apache.hadoop.ozone.om.helpers.SnapshotInfo; |
| import org.apache.hadoop.ozone.om.protocol.OzoneManagerProtocol; |
| import org.apache.hadoop.security.UserGroupInformation; |
| |
| |
| import static org.apache.hadoop.hdds.recon.ReconConfig.ConfigStrings.OZONE_RECON_KERBEROS_PRINCIPAL_KEY; |
| import static org.apache.hadoop.hdds.utils.HddsServerUtil.OZONE_RATIS_SNAPSHOT_COMPLETE_FLAG_NAME; |
| import static org.apache.hadoop.ozone.OzoneConfigKeys.OZONE_ACL_ENABLED; |
| import static org.apache.hadoop.ozone.OzoneConfigKeys.OZONE_ADMINISTRATORS; |
| import static org.apache.hadoop.ozone.OzoneConfigKeys.OZONE_ADMINISTRATORS_WILDCARD; |
| import static org.apache.hadoop.ozone.OzoneConsts.MULTIPART_FORM_DATA_BOUNDARY; |
| import static org.apache.hadoop.ozone.OzoneConsts.OM_DB_NAME; |
| import static org.apache.hadoop.ozone.OzoneConsts.OM_KEY_PREFIX; |
| import static org.apache.hadoop.ozone.OzoneConsts.DB_COMPACTION_SST_BACKUP_DIR; |
| import static org.apache.hadoop.ozone.OzoneConsts.OM_SNAPSHOT_DIFF_DIR; |
| import static org.apache.hadoop.ozone.OzoneConsts.OM_SNAPSHOT_DIR; |
| import static org.apache.hadoop.ozone.OzoneConsts.OZONE_DB_CHECKPOINT_INCLUDE_SNAPSHOT_DATA; |
| import static org.apache.hadoop.ozone.OzoneConsts.OZONE_DB_CHECKPOINT_REQUEST_FLUSH; |
| import static org.apache.hadoop.ozone.OzoneConsts.OZONE_DB_CHECKPOINT_REQUEST_TO_EXCLUDE_SST; |
| import static org.apache.hadoop.ozone.om.OMConfigKeys.OZONE_OM_HTTP_AUTH_TYPE; |
| |
| import org.apache.ozone.test.GenericTestUtils; |
| |
| import org.junit.jupiter.api.AfterEach; |
| import org.junit.jupiter.api.BeforeEach; |
| import org.junit.jupiter.api.Test; |
| import org.junit.jupiter.api.Timeout; |
| import org.junit.jupiter.api.io.TempDir; |
| import org.junit.jupiter.params.ParameterizedTest; |
| import org.junit.jupiter.params.provider.Arguments; |
| import org.junit.jupiter.params.provider.MethodSource; |
| |
| import static org.apache.hadoop.ozone.om.OMConfigKeys.OZONE_OM_RATIS_ENABLE_KEY; |
| import static org.apache.hadoop.ozone.om.OmSnapshotManager.OM_HARDLINK_FILE; |
| import static org.apache.hadoop.ozone.om.snapshot.OmSnapshotUtils.DATA_PREFIX; |
| import static org.apache.hadoop.ozone.om.snapshot.OmSnapshotUtils.DATA_SUFFIX; |
| import static org.apache.hadoop.ozone.om.snapshot.OmSnapshotUtils.truncateFileName; |
| import static org.apache.hadoop.ozone.om.OmSnapshotManager.getSnapshotPath; |
| import static org.apache.ozone.rocksdiff.RocksDBCheckpointDiffer.COMPACTION_LOG_FILE_NAME_SUFFIX; |
| import static org.assertj.core.api.Assertions.assertThat; |
| import static org.junit.jupiter.api.Assertions.assertEquals; |
| import static org.junit.jupiter.api.Assertions.assertFalse; |
| import static org.junit.jupiter.api.Assertions.assertNotEquals; |
| import static org.junit.jupiter.api.Assertions.assertNotNull; |
| import static org.junit.jupiter.api.Assertions.assertNull; |
| import static org.junit.jupiter.api.Assertions.assertThrows; |
| import static org.junit.jupiter.api.Assertions.assertTrue; |
| import static org.junit.jupiter.params.provider.Arguments.arguments; |
| import static org.mockito.Mockito.any; |
| import static org.mockito.Mockito.anyBoolean; |
| import static org.mockito.Mockito.anyInt; |
| import static org.mockito.Mockito.anyString; |
| import static org.mockito.Mockito.doCallRealMethod; |
| import static org.mockito.Mockito.doNothing; |
| import static org.mockito.Mockito.doReturn; |
| import static org.mockito.Mockito.eq; |
| import static org.mockito.Mockito.mock; |
| import static org.mockito.Mockito.spy; |
| import static org.mockito.Mockito.times; |
| import static org.mockito.Mockito.verify; |
| import static org.mockito.Mockito.when; |
| |
| /** |
| * Class used for testing the OM DB Checkpoint provider servlet. |
| */ |
| @Timeout(240) |
| public class TestOMDbCheckpointServlet { |
| public static final String JAVA_IO_TMPDIR = "java.io.tmpdir"; |
| private OzoneConfiguration conf; |
| private File tempFile; |
| private ServletOutputStream servletOutputStream; |
| private MiniOzoneCluster cluster = null; |
| private OMMetrics omMetrics = null; |
| private HttpServletRequest requestMock = null; |
| private HttpServletResponse responseMock = null; |
| private OMDBCheckpointServlet omDbCheckpointServletMock = null; |
| private File metaDir; |
| private String snapshotDirName; |
| private String snapshotDirName2; |
| private Path compactionDirPath; |
| private DBCheckpoint dbCheckpoint; |
| private String method; |
| @TempDir |
| private Path folder; |
| private static final String FABRICATED_FILE_NAME = "fabricatedFile.sst"; |
| |
| /** |
| * Create a MiniDFSCluster for testing. |
| * <p> |
| * Ozone is made active by setting OZONE_ENABLED = true |
| * |
| * @throws Exception |
| */ |
| @BeforeEach |
| void init() throws Exception { |
| conf = new OzoneConfiguration(); |
| |
| final Path tempPath = folder.resolve("temp.tar"); |
| tempFile = tempPath.toFile(); |
| |
| servletOutputStream = new ServletOutputStream() { |
| private final OutputStream fileOutputStream = Files.newOutputStream(tempPath); |
| |
| @Override |
| public boolean isReady() { |
| return true; |
| } |
| |
| @Override |
| public void setWriteListener(WriteListener writeListener) { |
| } |
| |
| @Override |
| public void close() throws IOException { |
| fileOutputStream.close(); |
| super.close(); |
| } |
| |
| @Override |
| public void write(int b) throws IOException { |
| fileOutputStream.write(b); |
| } |
| }; |
| } |
| |
| /** |
| * Shutdown MiniDFSCluster. |
| */ |
| @AfterEach |
| public void shutdown() throws InterruptedException { |
| if (cluster != null) { |
| cluster.shutdown(); |
| } |
| } |
| |
| private void setupCluster() throws Exception { |
| cluster = MiniOzoneCluster.newBuilder(conf) |
| .setNumDatanodes(1) |
| .build(); |
| cluster.waitForClusterToBeReady(); |
| omMetrics = cluster.getOzoneManager().getMetrics(); |
| |
| omDbCheckpointServletMock = |
| mock(OMDBCheckpointServlet.class); |
| |
| BootstrapStateHandler.Lock lock = |
| new OMDBCheckpointServlet.Lock(cluster.getOzoneManager()); |
| doCallRealMethod().when(omDbCheckpointServletMock).init(); |
| assertNull( |
| doCallRealMethod().when(omDbCheckpointServletMock).getDbStore()); |
| |
| requestMock = mock(HttpServletRequest.class); |
| // Return current user short name when asked |
| when(requestMock.getRemoteUser()) |
| .thenReturn(UserGroupInformation.getCurrentUser().getShortUserName()); |
| responseMock = mock(HttpServletResponse.class); |
| |
| ServletContext servletContextMock = mock(ServletContext.class); |
| when(omDbCheckpointServletMock.getServletContext()) |
| .thenReturn(servletContextMock); |
| |
| when(servletContextMock.getAttribute(OzoneConsts.OM_CONTEXT_ATTRIBUTE)) |
| .thenReturn(cluster.getOzoneManager()); |
| when(requestMock.getParameter(OZONE_DB_CHECKPOINT_REQUEST_FLUSH)) |
| .thenReturn("true"); |
| |
| doCallRealMethod().when(omDbCheckpointServletMock).doGet(requestMock, |
| responseMock); |
| doCallRealMethod().when(omDbCheckpointServletMock).doPost(requestMock, |
| responseMock); |
| |
| doCallRealMethod().when(omDbCheckpointServletMock) |
| .writeDbDataToStream(any(), any(), any(), any(), any(), any()); |
| |
| when(omDbCheckpointServletMock.getBootstrapStateLock()) |
| .thenReturn(lock); |
| |
| doCallRealMethod().when(omDbCheckpointServletMock).getCheckpoint(any(), |
| anyBoolean()); |
| } |
| |
| @ParameterizedTest |
| @MethodSource("getHttpMethods") |
| public void testEndpoint(String httpMethod) throws Exception { |
| this.method = httpMethod; |
| |
| conf.setBoolean(OZONE_ACL_ENABLED, false); |
| conf.set(OZONE_ADMINISTRATORS, OZONE_ADMINISTRATORS_WILDCARD); |
| |
| setupCluster(); |
| |
| final OzoneManager om = cluster.getOzoneManager(); |
| doCallRealMethod().when(omDbCheckpointServletMock).initialize( |
| om.getMetadataManager().getStore(), |
| om.getMetrics().getDBCheckpointMetrics(), |
| om.getAclsEnabled(), |
| om.getOmAdminUsernames(), |
| om.getOmAdminGroups(), |
| om.isSpnegoEnabled()); |
| |
| doNothing().when(responseMock).setContentType("application/x-tar"); |
| doNothing().when(responseMock).setHeader(anyString(), anyString()); |
| |
| List<String> toExcludeList = new ArrayList<>(); |
| toExcludeList.add("sstFile1.sst"); |
| toExcludeList.add("sstFile2.sst"); |
| |
| setupHttpMethod(toExcludeList); |
| |
| when(responseMock.getOutputStream()).thenReturn(servletOutputStream); |
| |
| omDbCheckpointServletMock.init(); |
| long initialCheckpointCount = |
| omMetrics.getDBCheckpointMetrics().getNumCheckpoints(); |
| |
| doEndpoint(); |
| |
| assertThat(tempFile.length()).isGreaterThan(0); |
| assertThat(omMetrics.getDBCheckpointMetrics().getLastCheckpointCreationTimeTaken()) |
| .isGreaterThan(0); |
| assertThat(omMetrics.getDBCheckpointMetrics().getLastCheckpointStreamingTimeTaken()) |
| .isGreaterThan(0); |
| assertThat(omMetrics.getDBCheckpointMetrics().getNumCheckpoints()) |
| .isGreaterThan(initialCheckpointCount); |
| |
| verify(omDbCheckpointServletMock).writeDbDataToStream(any(), |
| any(), any(), eq(toExcludeList), any(), any()); |
| } |
| |
| @ParameterizedTest |
| @MethodSource("getHttpMethods") |
| public void testEndpointNotRatis(String httpMethod) throws Exception { |
| conf.setBoolean(OZONE_OM_RATIS_ENABLE_KEY, false); |
| testEndpoint(httpMethod); |
| } |
| |
| @Test |
| public void testDoPostWithInvalidContentType() throws Exception { |
| conf.setBoolean(OZONE_ACL_ENABLED, false); |
| conf.set(OZONE_ADMINISTRATORS, OZONE_ADMINISTRATORS_WILDCARD); |
| |
| setupCluster(); |
| |
| final OzoneManager om = cluster.getOzoneManager(); |
| |
| doCallRealMethod().when(omDbCheckpointServletMock).initialize( |
| om.getMetadataManager().getStore(), |
| om.getMetrics().getDBCheckpointMetrics(), |
| om.getAclsEnabled(), |
| om.getOmAdminUsernames(), |
| om.getOmAdminGroups(), |
| om.isSpnegoEnabled()); |
| |
| when(requestMock.getContentType()).thenReturn("application/json"); |
| doNothing().when(responseMock).setContentType("application/x-tar"); |
| doNothing().when(responseMock).setHeader(anyString(), |
| anyString()); |
| |
| when(responseMock.getOutputStream()).thenReturn(servletOutputStream); |
| |
| omDbCheckpointServletMock.init(); |
| omDbCheckpointServletMock.doPost(requestMock, responseMock); |
| |
| verify(responseMock).setStatus(HttpServletResponse.SC_BAD_REQUEST); |
| } |
| |
| @ParameterizedTest |
| @MethodSource("getHttpMethods") |
| public void testSpnegoEnabled(String httpMethod) throws Exception { |
| this.method = httpMethod; |
| |
| conf.setBoolean(OZONE_ACL_ENABLED, true); |
| conf.set(OZONE_ADMINISTRATORS, ""); |
| conf.set(OZONE_OM_HTTP_AUTH_TYPE, "kerberos"); |
| conf.set(OZONE_RECON_KERBEROS_PRINCIPAL_KEY, "recon/host1@REALM"); |
| |
| setupCluster(); |
| |
| final OzoneManager om = cluster.getOzoneManager(); |
| Collection<String> allowedUsers = |
| new LinkedHashSet<>(om.getOmAdminUsernames()); |
| allowedUsers.add("recon"); |
| |
| doCallRealMethod().when(omDbCheckpointServletMock).initialize( |
| om.getMetadataManager().getStore(), |
| om.getMetrics().getDBCheckpointMetrics(), |
| om.getAclsEnabled(), |
| allowedUsers, |
| Collections.emptySet(), |
| om.isSpnegoEnabled()); |
| |
| omDbCheckpointServletMock.init(); |
| |
| setupHttpMethod(new ArrayList<>()); |
| |
| doEndpoint(); |
| |
| // Response status should be set to 403 Forbidden since there was no user |
| // principal set in the request |
| verify(responseMock, times(1)).setStatus(HttpServletResponse.SC_FORBIDDEN); |
| |
| // Set the principal to DN in request |
| // This should also get denied since only OM and recon |
| // users should be granted access to the servlet |
| Principal userPrincipalMock = mock(Principal.class); |
| when(userPrincipalMock.getName()).thenReturn("dn/localhost@REALM"); |
| when(requestMock.getUserPrincipal()).thenReturn(userPrincipalMock); |
| |
| doEndpoint(); |
| |
| // Verify that the Response status is set to 403 again for DN user. |
| verify(responseMock, times(2)).setStatus(HttpServletResponse.SC_FORBIDDEN); |
| |
| // Now, set the principal to recon in request |
| when(userPrincipalMock.getName()).thenReturn("recon/localhost@REALM"); |
| |
| when(requestMock.getUserPrincipal()).thenReturn(userPrincipalMock); |
| when(responseMock.getOutputStream()).thenReturn(servletOutputStream); |
| |
| doEndpoint(); |
| |
| // Recon user should be able to access the servlet and download the |
| // snapshot |
| assertThat(tempFile.length()).isGreaterThan(0); |
| } |
| |
| @Test |
| public void testWriteDbDataToStream() throws Exception { |
| prepSnapshotData(); |
| // Set http param to include snapshot data. |
| when(requestMock.getParameter(OZONE_DB_CHECKPOINT_INCLUDE_SNAPSHOT_DATA)) |
| .thenReturn("true"); |
| |
| // Create a "spy" dbstore keep track of the checkpoint. |
| OzoneManager om = cluster.getOzoneManager(); |
| DBStore dbStore = om.getMetadataManager().getStore(); |
| DBStore spyDbStore = spy(dbStore); |
| |
| int metaDirLength = metaDir.toString().length() + 1; |
| String compactionLogDir = dbStore. |
| getRocksDBCheckpointDiffer().getCompactionLogDir(); |
| String sstBackupDir = dbStore. |
| getRocksDBCheckpointDiffer().getSSTBackupDir(); |
| |
| // Create files to be copied from the compaction pause |
| // temp directories so we can confirm they are correctly |
| // copied. The unexpected files should NOT be copied. |
| Path expectedLog = Paths.get(compactionLogDir, "expected" + |
| COMPACTION_LOG_FILE_NAME_SUFFIX); |
| String expectedLogStr = truncateFileName(metaDirLength, expectedLog); |
| Path expectedSst = Paths.get(sstBackupDir, "expected.sst"); |
| String expectedSstStr = truncateFileName(metaDirLength, expectedSst); |
| |
| // put "expected" fabricated files onto the fs before the files get |
| // copied to the temp dir. |
| Files.write(expectedLog, |
| "fabricatedData".getBytes(StandardCharsets.UTF_8)); |
| Files.write(expectedSst, |
| "fabricatedData".getBytes(StandardCharsets.UTF_8)); |
| |
| AtomicReference<DBCheckpoint> realCheckpoint = new AtomicReference<>(); |
| when(spyDbStore.getCheckpoint(true)).thenAnswer(b -> { |
| DBCheckpoint checkpoint = spy(dbStore.getCheckpoint(true)); |
| // Don't delete the checkpoint, because we need to compare it |
| // with the snapshot data. |
| doNothing().when(checkpoint).cleanupCheckpoint(); |
| realCheckpoint.set(checkpoint); |
| return checkpoint; |
| }); |
| |
| // Init the mock with the spyDbstore |
| doCallRealMethod().when(omDbCheckpointServletMock).initialize( |
| any(), any(), eq(false), any(), any(), eq(false)); |
| omDbCheckpointServletMock.initialize( |
| spyDbStore, om.getMetrics().getDBCheckpointMetrics(), |
| false, om.getOmAdminUsernames(), om.getOmAdminGroups(), false); |
| |
| // Get the tarball. |
| when(responseMock.getOutputStream()).thenReturn(servletOutputStream); |
| long tmpHardLinkFileCount = tmpHardLinkFileCount(); |
| omDbCheckpointServletMock.doGet(requestMock, responseMock); |
| assertEquals(tmpHardLinkFileCount, tmpHardLinkFileCount()); |
| dbCheckpoint = realCheckpoint.get(); |
| |
| // Untar the file into a temp folder to be examined. |
| String testDirName = folder.resolve("testDir").toString(); |
| int testDirLength = testDirName.length() + 1; |
| String newDbDirName = testDirName + OM_KEY_PREFIX + OM_DB_NAME; |
| int newDbDirLength = newDbDirName.length() + 1; |
| File newDbDir = new File(newDbDirName); |
| assertTrue(newDbDir.mkdirs()); |
| FileUtil.unTar(tempFile, newDbDir); |
| |
| // Move snapshot dir to correct location. |
| assertTrue(new File(newDbDirName, OM_SNAPSHOT_DIR) |
| .renameTo(new File(newDbDir.getParent(), OM_SNAPSHOT_DIR))); |
| |
| // Confirm the checkpoint directories match, (after remove extras). |
| Path checkpointLocation = dbCheckpoint.getCheckpointLocation(); |
| Set<String> initialCheckpointSet = getFiles(checkpointLocation, |
| checkpointLocation.toString().length() + 1); |
| Path finalCheckpointLocation = Paths.get(newDbDirName); |
| Set<String> finalCheckpointSet = getFiles(finalCheckpointLocation, |
| newDbDirLength); |
| |
| assertThat(finalCheckpointSet).withFailMessage("hardlink file exists in checkpoint dir") |
| .contains(OM_HARDLINK_FILE); |
| finalCheckpointSet.remove(OM_HARDLINK_FILE); |
| assertEquals(initialCheckpointSet, finalCheckpointSet); |
| |
| String shortSnapshotLocation = |
| truncateFileName(metaDirLength, Paths.get(snapshotDirName)); |
| String shortSnapshotLocation2 = |
| truncateFileName(metaDirLength, Paths.get(snapshotDirName2)); |
| String shortCompactionDirLocation = |
| truncateFileName(metaDirLength, compactionDirPath); |
| |
| Set<String> finalFullSet = |
| getFiles(Paths.get(testDirName, OM_SNAPSHOT_DIR), testDirLength); |
| |
| // Check each line in the hard link file. |
| List<String> fabricatedLinkLines = new ArrayList<>(); |
| try (Stream<String> lines = Files.lines(Paths.get(newDbDirName, |
| OM_HARDLINK_FILE))) { |
| |
| for (String line : lines.collect(Collectors.toList())) { |
| assertFalse(line.contains("CURRENT"), |
| "CURRENT file is not a hard link"); |
| if (line.contains(FABRICATED_FILE_NAME)) { |
| fabricatedLinkLines.add(line); |
| } else { |
| checkLine(shortSnapshotLocation, shortSnapshotLocation2, line); |
| // add links to the final set |
| finalFullSet.add(line.split("\t")[0]); |
| } |
| } |
| } |
| Set<String> directories = Sets.newHashSet( |
| shortSnapshotLocation, shortSnapshotLocation2, |
| shortCompactionDirLocation); |
| checkFabricatedLines(directories, fabricatedLinkLines, testDirName); |
| |
| Set<String> initialFullSet = |
| getFiles(Paths.get(metaDir.toString(), OM_SNAPSHOT_DIR), metaDirLength); |
| assertThat(finalFullSet).contains(expectedLogStr); |
| assertThat(finalFullSet).contains(expectedSstStr); |
| assertEquals(initialFullSet, finalFullSet, "expected snapshot files not found"); |
| } |
| |
| private static long tmpHardLinkFileCount() throws IOException { |
| Path tmpDirPath = Paths.get(System.getProperty(JAVA_IO_TMPDIR)); |
| try (Stream<Path> tmpFiles = Files.list(tmpDirPath)) { |
| return tmpFiles |
| .filter(path -> { |
| String regex = DATA_PREFIX + ".*" + DATA_SUFFIX; |
| return path.getFileName().toString().matches(regex); |
| }) |
| .count(); |
| } |
| } |
| |
| @Test |
| public void testWriteDbDataWithoutOmSnapshot() |
| throws Exception { |
| prepSnapshotData(); |
| |
| doCallRealMethod().when(omDbCheckpointServletMock).initialize( |
| any(), any(), anyBoolean(), any(), any(), anyBoolean()); |
| omDbCheckpointServletMock.init(); |
| |
| // Set http param to exclude snapshot data. |
| when(requestMock.getParameter(OZONE_DB_CHECKPOINT_INCLUDE_SNAPSHOT_DATA)) |
| .thenReturn(null); |
| |
| // Get the tarball. |
| Path tmpdir = folder.resolve("bootstrapData"); |
| try (FileOutputStream fileOutputStream = new FileOutputStream(tempFile)) { |
| omDbCheckpointServletMock.writeDbDataToStream(dbCheckpoint, requestMock, |
| fileOutputStream, new ArrayList<>(), new ArrayList<>(), tmpdir); |
| } |
| |
| // Untar the file into a temp folder to be examined. |
| String testDirName = folder.resolve("testDir").toString(); |
| int testDirLength = testDirName.length() + 1; |
| FileUtil.unTar(tempFile, new File(testDirName)); |
| |
| // Confirm the checkpoint directories match. |
| Path checkpointLocation = dbCheckpoint.getCheckpointLocation(); |
| Set<String> initialCheckpointSet = getFiles(checkpointLocation, |
| checkpointLocation.toString().length() + 1); |
| Path finalCheckpointLocation = Paths.get(testDirName); |
| Set<String> finalCheckpointSet = getFiles(finalCheckpointLocation, |
| testDirLength); |
| |
| assertEquals(initialCheckpointSet, finalCheckpointSet); |
| } |
| |
| @Test |
| public void testWriteDbDataWithToExcludeFileList() |
| throws Exception { |
| prepSnapshotData(); |
| |
| doCallRealMethod().when(omDbCheckpointServletMock).initialize( |
| any(), any(), anyBoolean(), any(), any(), anyBoolean()); |
| omDbCheckpointServletMock.init(); |
| |
| File dummyFile = new File(dbCheckpoint.getCheckpointLocation().toString(), |
| "dummy.sst"); |
| try (OutputStreamWriter writer = new OutputStreamWriter( |
| new FileOutputStream(dummyFile), StandardCharsets.UTF_8)) { |
| writer.write("Dummy data."); |
| } |
| assertTrue(dummyFile.exists()); |
| List<String> toExcludeList = new ArrayList<>(); |
| List<String> excludedList = new ArrayList<>(); |
| toExcludeList.add(dummyFile.getName()); |
| |
| // Set http param to exclude snapshot data. |
| when(requestMock.getParameter(OZONE_DB_CHECKPOINT_INCLUDE_SNAPSHOT_DATA)) |
| .thenReturn(null); |
| |
| // Get the tarball. |
| Path tmpdir = folder.resolve("bootstrapData"); |
| try (FileOutputStream fileOutputStream = new FileOutputStream(tempFile)) { |
| omDbCheckpointServletMock.writeDbDataToStream(dbCheckpoint, requestMock, |
| fileOutputStream, toExcludeList, excludedList, tmpdir); |
| } |
| |
| // Untar the file into a temp folder to be examined. |
| String testDirName = folder.resolve("testDir").toString(); |
| int testDirLength = testDirName.length() + 1; |
| FileUtil.unTar(tempFile, new File(testDirName)); |
| |
| // Confirm the checkpoint directories match. |
| Path checkpointLocation = dbCheckpoint.getCheckpointLocation(); |
| Set<String> initialCheckpointSet = getFiles(checkpointLocation, |
| checkpointLocation.toString().length() + 1); |
| Path finalCheckpointLocation = Paths.get(testDirName); |
| Set<String> finalCheckpointSet = getFiles(finalCheckpointLocation, |
| testDirLength); |
| |
| initialCheckpointSet.removeAll(finalCheckpointSet); |
| assertThat(initialCheckpointSet).contains(dummyFile.getName()); |
| } |
| |
| /** |
| * Calls endpoint in regards to parametrized HTTP method. |
| */ |
| private void doEndpoint() { |
| if (method.equals("POST")) { |
| omDbCheckpointServletMock.doPost(requestMock, responseMock); |
| } else { |
| omDbCheckpointServletMock.doGet(requestMock, responseMock); |
| } |
| } |
| |
| /** |
| * Parametrizes test with HTTP method. |
| * @return HTTP method. |
| */ |
| private static Stream<Arguments> getHttpMethods() { |
| return Stream.of(arguments("POST"), arguments("GET")); |
| } |
| |
| /** |
| * Setups HTTP method details depending on parametrized HTTP method. |
| * @param toExcludeList SST file names to be excluded. |
| * @throws IOException |
| */ |
| private void setupHttpMethod(List<String> toExcludeList) throws IOException { |
| if (method.equals("POST")) { |
| setupPostMethod(toExcludeList); |
| } else { |
| setupGetMethod(toExcludeList); |
| } |
| } |
| |
| /** |
| * Setups details for HTTP POST request. |
| * @param toExcludeList SST file names to be excluded. |
| * @throws IOException |
| */ |
| private void setupPostMethod(List<String> toExcludeList) |
| throws IOException { |
| when(requestMock.getMethod()).thenReturn("POST"); |
| when(requestMock.getContentType()).thenReturn("multipart/form-data; " + |
| "boundary=" + MULTIPART_FORM_DATA_BOUNDARY); |
| |
| // Generate form data |
| String crNl = "\r\n"; |
| String contentDisposition = "Content-Disposition: form-data; name=\"" + |
| OZONE_DB_CHECKPOINT_REQUEST_TO_EXCLUDE_SST + "[]\"" + crNl + crNl; |
| String boundary = "--" + MULTIPART_FORM_DATA_BOUNDARY; |
| String endBoundary = boundary + "--" + crNl; |
| StringBuilder sb = new StringBuilder(); |
| toExcludeList.forEach(sfn -> { |
| sb.append(boundary).append(crNl); |
| sb.append(contentDisposition); |
| sb.append(sfn).append(crNl); |
| }); |
| sb.append(endBoundary); |
| |
| // Use generated form data as input stream to the HTTP request |
| InputStream input = new ByteArrayInputStream( |
| sb.toString().getBytes(StandardCharsets.UTF_8)); |
| ServletInputStream inputStream = mock(ServletInputStream.class); |
| when(requestMock.getInputStream()).thenReturn(inputStream); |
| when(inputStream.read(any(byte[].class), anyInt(), anyInt())) |
| .thenAnswer(invocation -> { |
| byte[] buffer = invocation.getArgument(0); |
| int offset = invocation.getArgument(1); |
| int length = invocation.getArgument(2); |
| return input.read(buffer, offset, length); |
| }); |
| } |
| |
| /** |
| * Setups details for HTTP GET request. |
| * @param toExcludeList SST file names to be excluded. |
| */ |
| private void setupGetMethod(List<String> toExcludeList) { |
| when(requestMock.getMethod()).thenReturn("GET"); |
| when(requestMock |
| .getParameterValues(OZONE_DB_CHECKPOINT_REQUEST_TO_EXCLUDE_SST)) |
| .thenReturn(toExcludeList.toArray(new String[0])); |
| } |
| |
| private void prepSnapshotData() throws Exception { |
| setupCluster(); |
| metaDir = OMStorage.getOmDbDir(conf); |
| |
| OzoneBucket bucket = TestDataUtil |
| .createVolumeAndBucket(cluster.newClient()); |
| |
| // Create dummy keys for snapshotting. |
| TestDataUtil.createKey(bucket, UUID.randomUUID().toString(), |
| "content"); |
| TestDataUtil.createKey(bucket, UUID.randomUUID().toString(), |
| "content"); |
| |
| snapshotDirName = |
| createSnapshot(bucket.getVolumeName(), bucket.getName()); |
| snapshotDirName2 = |
| createSnapshot(bucket.getVolumeName(), bucket.getName()); |
| |
| // Create dummy snapshot to make sure it is not included. |
| Path fabricatedSnapshot = Paths.get( |
| new File(snapshotDirName).getParent(), |
| "fabricatedSnapshot"); |
| fabricatedSnapshot.toFile().mkdirs(); |
| assertTrue(Paths.get(fabricatedSnapshot.toString(), |
| FABRICATED_FILE_NAME).toFile().createNewFile()); |
| |
| // Create fabricated links to snapshot dirs |
| // to confirm that links are recognized even if |
| // they don't point to the checkpoint directory. |
| Path fabricatedFile = Paths.get(snapshotDirName, FABRICATED_FILE_NAME); |
| Path fabricatedLink = Paths.get(snapshotDirName2, FABRICATED_FILE_NAME); |
| |
| Files.write(fabricatedFile, |
| "fabricatedData".getBytes(StandardCharsets.UTF_8)); |
| Files.createLink(fabricatedLink, fabricatedFile); |
| |
| // Simulate links from the compaction dir. |
| compactionDirPath = Paths.get(metaDir.toString(), |
| OM_SNAPSHOT_DIFF_DIR, DB_COMPACTION_SST_BACKUP_DIR); |
| Path fabricatedLink2 = Paths.get(compactionDirPath.toString(), |
| FABRICATED_FILE_NAME); |
| Files.createLink(fabricatedLink2, fabricatedFile); |
| Path currentFile = Paths.get(metaDir.toString(), |
| OM_DB_NAME, "CURRENT"); |
| Path currentLink = Paths.get(compactionDirPath.toString(), "CURRENT"); |
| Files.createLink(currentLink, currentFile); |
| |
| dbCheckpoint = cluster.getOzoneManager() |
| .getMetadataManager().getStore() |
| .getCheckpoint(true); |
| |
| } |
| |
| private String createSnapshot(String vname, String bname) |
| throws IOException, InterruptedException, TimeoutException { |
| final OzoneManager om = cluster.getOzoneManager(); |
| String snapshotName = UUID.randomUUID().toString(); |
| OzoneManagerProtocol writeClient = cluster.newClient().getObjectStore() |
| .getClientProxy().getOzoneManagerClient(); |
| |
| writeClient.createSnapshot(vname, bname, snapshotName); |
| SnapshotInfo snapshotInfo = om.getMetadataManager().getSnapshotInfoTable() |
| .get(SnapshotInfo.getTableKey(vname, bname, snapshotName)); |
| String snapshotPath = getSnapshotPath(conf, snapshotInfo) |
| + OM_KEY_PREFIX; |
| GenericTestUtils.waitFor(() -> new File(snapshotPath).exists(), |
| 100, 2000); |
| return snapshotPath; |
| } |
| |
| private Set<String> getFiles(Path path, int truncateLength) |
| throws IOException { |
| return getFiles(path, truncateLength, new HashSet<>()); |
| } |
| |
| // Get all files below path, recursively, (skipping fabricated files). |
| private Set<String> getFiles(Path path, int truncateLength, |
| Set<String> fileSet) throws IOException { |
| try (Stream<Path> files = Files.list(path)) { |
| for (Path file : files.collect(Collectors.toList())) { |
| if (file.toFile().isDirectory()) { |
| getFiles(file, truncateLength, fileSet); |
| } |
| String filename = String.valueOf(file.getFileName()); |
| if (!filename.startsWith("fabricated") && |
| !filename.startsWith(OZONE_RATIS_SNAPSHOT_COMPLETE_FLAG_NAME)) { |
| fileSet.add(truncateFileName(truncateLength, file)); |
| } |
| } |
| } |
| return fileSet; |
| } |
| |
| /** |
| * Confirm fabricated link lines in hardlink file are properly |
| * formatted: "dir1/fabricatedFile dir2/fabricatedFile". |
| * |
| * The "fabricated" files/links are ones I've created by hand to |
| * fully test the code, (as opposed to the "natural" files/links |
| * created by the create snapshot process). |
| * |
| * @param directories Possible directories for the links to exist in. |
| * @param lines Text lines defining the link paths. |
| * @param testDirName Name of test directory. |
| */ |
| private void checkFabricatedLines(Set<String> directories, List<String> lines, |
| String testDirName) { |
| // find the real file |
| String realDir = null; |
| for (String dir: directories) { |
| if (Paths.get(testDirName, dir, FABRICATED_FILE_NAME).toFile().exists()) { |
| assertNull(realDir, "Exactly one copy of the fabricated file exists in the tarball"); |
| realDir = dir; |
| } |
| } |
| |
| assertNotNull(realDir, "real directory found"); |
| directories.remove(realDir); |
| Iterator<String> directoryIterator = directories.iterator(); |
| String dir0 = directoryIterator.next(); |
| String dir1 = directoryIterator.next(); |
| assertNotEquals("link directories are different", dir0, dir1); |
| |
| for (String line : lines) { |
| String[] files = line.split("\t"); |
| assertTrue( |
| files[0].startsWith(dir0) || files[0].startsWith(dir1), |
| "fabricated entry contains valid first directory: " + line); |
| assertTrue(files[1].startsWith(realDir), |
| "fabricated entry contains correct real directory: " + line); |
| Path path0 = Paths.get(files[0]); |
| Path path1 = Paths.get(files[1]); |
| assertEquals(FABRICATED_FILE_NAME, |
| String.valueOf(path0.getFileName()), |
| "fabricated entries contains correct file name: " + line); |
| assertEquals(FABRICATED_FILE_NAME, |
| String.valueOf(path1.getFileName()), |
| "fabricated entries contains correct file name: " + line); |
| } |
| } |
| |
| // Validates line in hard link file. should look something like: |
| // "dir1/x.sst x.sst". |
| private void checkLine(String shortSnapshotLocation, |
| String shortSnapshotLocation2, |
| String line) { |
| String[] files = line.split("\t"); |
| assertTrue(files[0].startsWith(shortSnapshotLocation) || |
| files[0].startsWith(shortSnapshotLocation2), |
| "hl entry starts with valid snapshot dir: " + line); |
| |
| String file0 = files[0].substring(shortSnapshotLocation.length() + 1); |
| String file1 = files[1]; |
| assertEquals(file0, file1, "hl filenames are the same"); |
| } |
| |
| @Test |
| public void testBootstrapLocking() throws Exception { |
| cluster = MiniOzoneCluster.newBuilder(conf).setNumDatanodes(1).build(); |
| cluster.waitForClusterToBeReady(); |
| |
| // Get the bootstrap state handlers |
| KeyManager keyManager = cluster.getOzoneManager().getKeyManager(); |
| BootstrapStateHandler keyDeletingService = |
| keyManager.getDeletingService(); |
| BootstrapStateHandler snapshotDeletingService = |
| keyManager.getSnapshotDeletingService(); |
| BootstrapStateHandler sstFilteringService = |
| keyManager.getSnapshotSstFilteringService(); |
| BootstrapStateHandler differ = |
| cluster.getOzoneManager().getMetadataManager() |
| .getStore().getRocksDBCheckpointDiffer(); |
| |
| ExecutorService executorService = Executors.newCachedThreadPool(); |
| |
| OMDBCheckpointServlet omDbCheckpointServlet = new OMDBCheckpointServlet(); |
| |
| OMDBCheckpointServlet spyServlet = spy(omDbCheckpointServlet); |
| ServletContext servletContext = mock(ServletContext.class); |
| when(servletContext.getAttribute(OzoneConsts.OM_CONTEXT_ATTRIBUTE)) |
| .thenReturn(cluster.getOzoneManager()); |
| doReturn(servletContext).when(spyServlet).getServletContext(); |
| |
| spyServlet.init(); |
| |
| // Confirm the other handlers are locked out when the bootstrap |
| // servlet takes the lock. |
| try (BootstrapStateHandler.Lock ignoredLock = |
| spyServlet.getBootstrapStateLock().lock()) { |
| confirmServletLocksOutOtherHandler(keyDeletingService, executorService); |
| confirmServletLocksOutOtherHandler(snapshotDeletingService, |
| executorService); |
| confirmServletLocksOutOtherHandler(sstFilteringService, executorService); |
| confirmServletLocksOutOtherHandler(differ, executorService); |
| } |
| // Confirm the servlet is locked out when any of the other |
| // handlers takes the lock. |
| confirmOtherHandlerLocksOutServlet(keyDeletingService, spyServlet, |
| executorService); |
| confirmOtherHandlerLocksOutServlet(snapshotDeletingService, spyServlet, |
| executorService); |
| confirmOtherHandlerLocksOutServlet(sstFilteringService, spyServlet, |
| executorService); |
| confirmOtherHandlerLocksOutServlet(differ, spyServlet, |
| executorService); |
| |
| // Confirm that servlet takes the lock when none of the other |
| // handlers have it. |
| Future<Boolean> servletTest = checkLock(spyServlet, executorService); |
| assertTrue(servletTest.get(10000, TimeUnit.MILLISECONDS)); |
| |
| executorService.shutdownNow(); |
| |
| } |
| |
| // Confirms handler can't take look the servlet already has. Assumes |
| // the servlet has already taken the lock. |
| private void confirmServletLocksOutOtherHandler(BootstrapStateHandler handler, |
| ExecutorService executorService) { |
| Future<Boolean> test = checkLock(handler, executorService); |
| // Handler should fail to take the lock because the servlet has taken it. |
| assertThrows(TimeoutException.class, |
| () -> test.get(500, TimeUnit.MILLISECONDS)); |
| } |
| |
| // Confirms Servlet can't take lock when handler has it. |
| private void confirmOtherHandlerLocksOutServlet(BootstrapStateHandler handler, |
| BootstrapStateHandler servlet, ExecutorService executorService) |
| throws InterruptedException { |
| try (BootstrapStateHandler.Lock ignoredLock = |
| handler.getBootstrapStateLock().lock()) { |
| Future<Boolean> test = checkLock(servlet, executorService); |
| // Servlet should fail to lock when other handler has taken it. |
| assertThrows(TimeoutException.class, |
| () -> test.get(500, TimeUnit.MILLISECONDS)); |
| } |
| } |
| |
| // Confirm lock is available by having handler take and release it. |
| private Future<Boolean> checkLock(BootstrapStateHandler handler, |
| ExecutorService executorService) { |
| return executorService.submit(() -> { |
| try { |
| handler.getBootstrapStateLock().lock(); |
| handler.getBootstrapStateLock().unlock(); |
| return true; |
| } catch (InterruptedException e) { |
| } |
| return false; |
| }); |
| |
| } |
| } |