| /** |
| * Licensed to the Apache Software Foundation (ASF) under one |
| * or more contributor license agreements. See the NOTICE file |
| * distributed with this work for additional information |
| * regarding copyright ownership. The ASF licenses this file |
| * to you under the Apache License, Version 2.0 (the |
| * "License"); you may not use this file except in compliance |
| * with the License. You may obtain a copy of the License at |
| * |
| * http://www.apache.org/licenses/LICENSE-2.0 |
| * |
| * Unless required by applicable law or agreed to in writing, software |
| * distributed under the License is distributed on an "AS IS" BASIS, |
| * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. |
| * See the License for the specific language governing permissions and |
| * limitations under the License. |
| */ |
| package org.apache.hadoop.tools.dynamometer; |
| |
| import org.apache.hadoop.thirdparty.com.google.common.collect.Sets; |
| import java.util.Optional; |
| import java.util.concurrent.TimeoutException; |
| import java.util.concurrent.TimeUnit; |
| import java.util.function.Supplier; |
| import org.apache.hadoop.test.PlatformAssumptions; |
| import org.apache.hadoop.tools.dynamometer.workloadgenerator.audit.AuditLogDirectParser; |
| import org.apache.hadoop.tools.dynamometer.workloadgenerator.audit.AuditReplayMapper; |
| import java.io.ByteArrayOutputStream; |
| import java.io.File; |
| import java.io.FileOutputStream; |
| import java.io.IOException; |
| import java.io.InputStream; |
| import java.io.OutputStream; |
| import java.net.URI; |
| import java.net.URISyntaxException; |
| import java.net.URL; |
| import java.util.HashMap; |
| import java.util.List; |
| import java.util.Map; |
| import java.util.Properties; |
| import java.util.Set; |
| import java.util.UUID; |
| import java.util.zip.ZipEntry; |
| import java.util.zip.ZipOutputStream; |
| import org.apache.commons.io.FileUtils; |
| import org.apache.hadoop.conf.Configuration; |
| import org.apache.hadoop.fs.FSDataInputStream; |
| import org.apache.hadoop.fs.FSDataOutputStream; |
| import org.apache.hadoop.fs.FileStatus; |
| import org.apache.hadoop.fs.FileSystem; |
| import org.apache.hadoop.fs.Path; |
| import org.apache.hadoop.fs.permission.FsPermission; |
| import org.apache.hadoop.hdfs.DistributedFileSystem; |
| import org.apache.hadoop.hdfs.MiniDFSCluster; |
| import org.apache.hadoop.io.IOUtils; |
| import org.apache.hadoop.test.GenericTestUtils; |
| import org.apache.hadoop.mapreduce.Counters; |
| import org.apache.hadoop.util.JarFinder; |
| import org.apache.hadoop.util.Shell; |
| import org.apache.hadoop.yarn.api.records.ApplicationId; |
| import org.apache.hadoop.yarn.api.records.ApplicationReport; |
| import org.apache.hadoop.yarn.api.records.ContainerId; |
| import org.apache.hadoop.yarn.api.records.NodeId; |
| import org.apache.hadoop.yarn.api.records.YarnApplicationState; |
| import org.apache.hadoop.yarn.client.api.YarnClient; |
| import org.apache.hadoop.yarn.conf.YarnConfiguration; |
| import org.apache.hadoop.yarn.exceptions.YarnException; |
| import org.apache.hadoop.yarn.server.MiniYARNCluster; |
| import org.apache.hadoop.yarn.server.nodemanager.containermanager.container.Container; |
| import org.apache.hadoop.yarn.server.resourcemanager.nodelabels.RMNodeLabelsManager; |
| import org.apache.hadoop.yarn.server.resourcemanager.scheduler.capacity.CapacitySchedulerConfiguration; |
| import org.apache.hadoop.yarn.util.resource.DominantResourceCalculator; |
| import org.apache.hadoop.yarn.util.resource.ResourceCalculator; |
| import org.junit.After; |
| import org.junit.AfterClass; |
| import org.junit.Assert; |
| import org.junit.Assume; |
| import org.junit.BeforeClass; |
| import org.junit.Test; |
| import org.slf4j.Logger; |
| import org.slf4j.LoggerFactory; |
| |
| import static org.apache.hadoop.tools.dynamometer.DynoInfraUtils.fetchHadoopTarball; |
| import static org.apache.hadoop.hdfs.MiniDFSCluster.PROP_TEST_BUILD_DATA; |
| import static org.hamcrest.CoreMatchers.notNullValue; |
| import static org.junit.Assert.assertEquals; |
| import static org.junit.Assert.assertTrue; |
| import static org.junit.Assert.fail; |
| |
| /** |
| * Start a Dynamometer cluster in a MiniYARNCluster. Ensure that the NameNode is |
| * able to start correctly, exit safemode, and run some commands. Subsequently |
| * the workload job is launched and it is verified that it completes |
| * successfully and is able to replay commands as expected. |
| * |
| * To run this test JAVA_HOME must be set correctly, and the {@code tar} utility |
| * must be available. |
| * |
| * You can optionally specify which version of HDFS should be started within the |
| * Dynamometer cluster; the default is {@value HADOOP_BIN_VERSION_DEFAULT}. This |
| * can be adjusted by setting the {@value HADOOP_BIN_VERSION_KEY} property. This |
| * will automatically download the correct Hadoop tarball for the specified |
| * version. It downloads from an Apache mirror (by default |
| * {@value DynoInfraUtils#APACHE_DOWNLOAD_MIRROR_DEFAULT}); which mirror is used |
| * can be controlled with the {@value DynoInfraUtils#APACHE_DOWNLOAD_MIRROR_KEY} |
| * property. Note that mirrors normally contain only the latest releases on any |
| * given release line; you may need to use |
| * {@code http://archive.apache.org/dist/} for older releases. The downloaded |
| * tarball will be stored in the test directory and can be reused between test |
| * executions. Alternatively, you can specify the {@value HADOOP_BIN_PATH_KEY} |
| * property to point directly to a Hadoop tarball which is present locally and |
| * no download will occur. |
| */ |
| public class TestDynamometerInfra { |
| |
| private static final Logger LOG = |
| LoggerFactory.getLogger(TestDynamometerInfra.class); |
| |
| private static final int MINICLUSTER_NUM_NMS = 3; |
| private static final int MINICLUSTER_NUM_DNS = 1; |
| |
| private static final String HADOOP_BIN_PATH_KEY = "dyno.hadoop.bin.path"; |
| private static final String HADOOP_BIN_VERSION_KEY = |
| "dyno.hadoop.bin.version"; |
| private static final String HADOOP_BIN_VERSION_DEFAULT = "3.1.4"; |
| private static final String FSIMAGE_FILENAME = "fsimage_0000000000000061740"; |
| private static final String VERSION_FILENAME = "VERSION"; |
| |
| private static final String HADOOP_BIN_UNPACKED_DIR_PREFIX = |
| "hadoop_unpacked_"; |
| |
| private static final String NAMENODE_NODELABEL = "dyno_namenode"; |
| private static final String DATANODE_NODELABEL = "dyno_datanode"; |
| |
| private static final String OUTPUT_PATH = "/tmp/trace_output_direct"; |
| |
| private static MiniDFSCluster miniDFSCluster; |
| private static MiniYARNCluster miniYARNCluster; |
| private static YarnClient yarnClient; |
| private static FileSystem fs; |
| private static Configuration conf; |
| private static Configuration yarnConf; |
| private static Path fsImageTmpPath; |
| private static Path fsVersionTmpPath; |
| private static Path blockImageOutputDir; |
| private static Path auditTraceDir; |
| private static Path confZip; |
| private static File testBaseDir; |
| private static File hadoopTarballPath; |
| private static File hadoopUnpackedDir; |
| |
| private ApplicationId infraAppId; |
| |
| @BeforeClass |
| public static void setupClass() throws Exception { |
| PlatformAssumptions.assumeNotWindows("Dynamometer will not run on Windows"); |
| Assume.assumeThat("JAVA_HOME must be set properly", |
| System.getenv("JAVA_HOME"), notNullValue()); |
| try { |
| Shell.ShellCommandExecutor tarCheck = new Shell.ShellCommandExecutor( |
| new String[]{"bash", "-c", "command -v tar"}); |
| tarCheck.execute(); |
| Assume.assumeTrue("tar command is not available", |
| tarCheck.getExitCode() == 0); |
| } catch (IOException ioe) { |
| Assume.assumeNoException("Unable to execute a shell command", ioe); |
| } |
| |
| conf = new Configuration(); |
| // Follow the conventions of MiniDFSCluster |
| testBaseDir = new File( |
| System.getProperty(PROP_TEST_BUILD_DATA, "build/test/data")); |
| String hadoopBinVersion = System.getProperty(HADOOP_BIN_VERSION_KEY, |
| HADOOP_BIN_VERSION_DEFAULT); |
| if (System.getProperty(HADOOP_BIN_PATH_KEY) == null) { |
| hadoopTarballPath = fetchHadoopTarball(testBaseDir, hadoopBinVersion, |
| conf, LOG); |
| } else { |
| hadoopTarballPath = new File(System.getProperty(HADOOP_BIN_PATH_KEY)); |
| } |
| if (testBaseDir.exists()) { |
| // Delete any old unpacked bin dirs that weren't previously cleaned up |
| File[] oldUnpackedDirs = testBaseDir.listFiles( |
| (dir, name) -> name.startsWith(HADOOP_BIN_UNPACKED_DIR_PREFIX)); |
| if (oldUnpackedDirs != null) { |
| for (File oldDir : oldUnpackedDirs) { |
| FileUtils.deleteQuietly(oldDir); |
| } |
| } |
| } |
| // Set up the Hadoop binary to be used as the system-level Hadoop install |
| hadoopUnpackedDir = new File(testBaseDir, |
| HADOOP_BIN_UNPACKED_DIR_PREFIX + UUID.randomUUID()); |
| assertTrue("Failed to make temporary directory", |
| hadoopUnpackedDir.mkdirs()); |
| Shell.ShellCommandExecutor shexec = new Shell.ShellCommandExecutor( |
| new String[] {"tar", "xzf", hadoopTarballPath.getAbsolutePath(), "-C", |
| hadoopUnpackedDir.getAbsolutePath()}); |
| shexec.execute(); |
| if (shexec.getExitCode() != 0) { |
| fail("Unable to execute tar to expand Hadoop binary"); |
| } |
| |
| conf.setInt(YarnConfiguration.RM_SCHEDULER_MINIMUM_ALLOCATION_MB, 128); |
| conf.setBoolean(YarnConfiguration.NODE_LABELS_ENABLED, true); |
| for (String q : new String[] {"root", "root.default"}) { |
| conf.setInt(CapacitySchedulerConfiguration.PREFIX + q + "." |
| + CapacitySchedulerConfiguration.CAPACITY, 100); |
| String accessibleNodeLabelPrefix = CapacitySchedulerConfiguration.PREFIX |
| + q + "." + CapacitySchedulerConfiguration.ACCESSIBLE_NODE_LABELS; |
| conf.set(accessibleNodeLabelPrefix, |
| CapacitySchedulerConfiguration.ALL_ACL); |
| conf.setInt(accessibleNodeLabelPrefix + "." + DATANODE_NODELABEL + "." |
| + CapacitySchedulerConfiguration.CAPACITY, 100); |
| conf.setInt(accessibleNodeLabelPrefix + "." + NAMENODE_NODELABEL + "." |
| + CapacitySchedulerConfiguration.CAPACITY, 100); |
| } |
| // This is necessary to have the RM respect our vcore allocation request |
| conf.setClass(CapacitySchedulerConfiguration.RESOURCE_CALCULATOR_CLASS, |
| DominantResourceCalculator.class, ResourceCalculator.class); |
| conf.setBoolean(YarnConfiguration.NM_DISK_HEALTH_CHECK_ENABLE, false); |
| miniYARNCluster = new MiniYARNCluster(TestDynamometerInfra.class.getName(), |
| 1, MINICLUSTER_NUM_NMS, 1, 1); |
| miniYARNCluster.init(conf); |
| miniYARNCluster.start(); |
| |
| yarnConf = miniYARNCluster.getConfig(); |
| miniDFSCluster = new MiniDFSCluster.Builder(conf).format(true) |
| .numDataNodes(MINICLUSTER_NUM_DNS).build(); |
| miniDFSCluster.waitClusterUp(); |
| FileSystem.setDefaultUri(conf, miniDFSCluster.getURI()); |
| FileSystem.setDefaultUri(yarnConf, miniDFSCluster.getURI()); |
| fs = miniDFSCluster.getFileSystem(); |
| |
| URL url = Thread.currentThread().getContextClassLoader() |
| .getResource("yarn-site.xml"); |
| if (url == null) { |
| throw new RuntimeException( |
| "Could not find 'yarn-site.xml' dummy file in classpath"); |
| } |
| yarnConf.set(YarnConfiguration.YARN_APPLICATION_CLASSPATH, |
| new File(url.getPath()).getParent()); |
| // Write the XML to a buffer before writing to the file. writeXml() can |
| // trigger a read of the existing yarn-site.xml, so writing directly could |
| // trigger a read of the file while it is in an inconsistent state |
| // (partially written) |
| try (ByteArrayOutputStream bytesOut = new ByteArrayOutputStream()) { |
| yarnConf.writeXml(bytesOut); |
| try (OutputStream fileOut = new FileOutputStream( |
| new File(url.getPath()))) { |
| fileOut.write(bytesOut.toByteArray()); |
| } |
| } |
| |
| yarnClient = YarnClient.createYarnClient(); |
| yarnClient.init(new Configuration(yarnConf)); |
| yarnClient.start(); |
| |
| fsImageTmpPath = fs.makeQualified(new Path("/tmp/" + FSIMAGE_FILENAME)); |
| fsVersionTmpPath = fs.makeQualified(new Path("/tmp/" + VERSION_FILENAME)); |
| blockImageOutputDir = fs.makeQualified(new Path("/tmp/blocks")); |
| auditTraceDir = fs.makeQualified(new Path("/tmp/audit_trace_direct")); |
| confZip = fs.makeQualified(new Path("/tmp/conf.zip")); |
| |
| uploadFsimageResourcesToHDFS(hadoopBinVersion); |
| |
| miniYARNCluster.waitForNodeManagersToConnect(30000); |
| |
| RMNodeLabelsManager nodeLabelManager = miniYARNCluster.getResourceManager() |
| .getRMContext().getNodeLabelManager(); |
| nodeLabelManager.addToCluserNodeLabelsWithDefaultExclusivity( |
| Sets.newHashSet(NAMENODE_NODELABEL, DATANODE_NODELABEL)); |
| Map<NodeId, Set<String>> nodeLabels = new HashMap<>(); |
| nodeLabels.put(miniYARNCluster.getNodeManager(0).getNMContext().getNodeId(), |
| Sets.newHashSet(NAMENODE_NODELABEL)); |
| nodeLabels.put(miniYARNCluster.getNodeManager(1).getNMContext().getNodeId(), |
| Sets.newHashSet(DATANODE_NODELABEL)); |
| nodeLabelManager.addLabelsToNode(nodeLabels); |
| } |
| |
| @AfterClass |
| public static void teardownClass() throws Exception { |
| if (miniDFSCluster != null) { |
| miniDFSCluster.shutdown(true); |
| miniDFSCluster = null; |
| } |
| if (yarnClient != null) { |
| yarnClient.stop(); |
| yarnClient = null; |
| } |
| if (miniYARNCluster != null) { |
| miniYARNCluster.getResourceManager().stop(); |
| miniYARNCluster.getResourceManager().waitForServiceToStop(30000); |
| miniYARNCluster.stop(); |
| miniYARNCluster.waitForServiceToStop(30000); |
| FileUtils.deleteDirectory(miniYARNCluster.getTestWorkDir()); |
| miniYARNCluster = null; |
| } |
| if (hadoopUnpackedDir != null) { |
| FileUtils.deleteDirectory(hadoopUnpackedDir); |
| } |
| } |
| |
| @After |
| public void tearDown() throws Exception { |
| if (infraAppId != null && yarnClient != null) { |
| yarnClient.killApplication(infraAppId); |
| } |
| infraAppId = null; |
| } |
| |
| @Test(timeout = 15 * 60 * 1000) |
| public void testNameNodeInYARN() throws Exception { |
| Configuration localConf = new Configuration(yarnConf); |
| localConf.setLong(AuditLogDirectParser.AUDIT_START_TIMESTAMP_KEY, 60000); |
| |
| final Client client = createAndStartClient(localConf); |
| |
| awaitApplicationStartup(); |
| |
| long startTime = System.currentTimeMillis(); |
| long maxWaitTimeMs = TimeUnit.MINUTES.toMillis(10); |
| Supplier<Boolean> exitCheckSupplier = () -> { |
| if (System.currentTimeMillis() - startTime > maxWaitTimeMs) { |
| // Wait at most 10 minutes for the NameNode to start and be ready |
| return true; |
| } |
| try { |
| // Exit immediately if the YARN app fails |
| return yarnClient.getApplicationReport(infraAppId) |
| .getYarnApplicationState() == YarnApplicationState.FAILED; |
| } catch (IOException | YarnException e) { |
| return true; |
| } |
| }; |
| Optional<Properties> namenodeProperties = DynoInfraUtils |
| .waitForAndGetNameNodeProperties(exitCheckSupplier, localConf, |
| client.getNameNodeInfoPath(), LOG); |
| if (!namenodeProperties.isPresent()) { |
| fail("Unable to fetch NameNode properties"); |
| } |
| |
| DynoInfraUtils.waitForNameNodeReadiness(namenodeProperties.get(), 3, false, |
| exitCheckSupplier, localConf, LOG); |
| |
| assertClusterIsFunctional(localConf, namenodeProperties.get()); |
| |
| Map<ContainerId, Container> namenodeContainers = miniYARNCluster |
| .getNodeManager(0).getNMContext().getContainers(); |
| Map<ContainerId, Container> datanodeContainers = miniYARNCluster |
| .getNodeManager(1).getNMContext().getContainers(); |
| Map<ContainerId, Container> amContainers = miniYARNCluster.getNodeManager(2) |
| .getNMContext().getContainers(); |
| assertEquals(1, namenodeContainers.size()); |
| assertEquals(2, |
| namenodeContainers.keySet().iterator().next().getContainerId()); |
| assertEquals(2, datanodeContainers.size()); |
| assertEquals(1, amContainers.size()); |
| assertEquals(1, amContainers.keySet().iterator().next().getContainerId()); |
| |
| LOG.info("Waiting for workload job to start and complete"); |
| GenericTestUtils.waitFor(() -> { |
| try { |
| return client.getWorkloadJob() != null |
| && client.getWorkloadJob().isComplete(); |
| } catch (IOException | IllegalStateException e) { |
| return false; |
| } |
| }, 3000, 60000); |
| LOG.info("Workload job completed"); |
| |
| if (!client.getWorkloadJob().isSuccessful()) { |
| fail("Workload job failed"); |
| } |
| Counters counters = client.getWorkloadJob().getCounters(); |
| assertEquals(6, |
| counters.findCounter(AuditReplayMapper.REPLAYCOUNTERS.TOTALCOMMANDS) |
| .getValue()); |
| assertEquals(1, |
| counters |
| .findCounter(AuditReplayMapper.REPLAYCOUNTERS.TOTALINVALIDCOMMANDS) |
| .getValue()); |
| |
| LOG.info("Waiting for infra application to exit"); |
| GenericTestUtils.waitFor(() -> { |
| try { |
| ApplicationReport report = yarnClient |
| .getApplicationReport(infraAppId); |
| return report |
| .getYarnApplicationState() == YarnApplicationState.KILLED; |
| } catch (IOException | YarnException e) { |
| return false; |
| } |
| }, 3000, 300000); |
| |
| LOG.info("Waiting for metrics file to be ready"); |
| // Try to read the metrics file |
| Path hdfsStoragePath = new Path(fs.getHomeDirectory(), |
| DynoConstants.DYNAMOMETER_STORAGE_DIR + "/" + infraAppId); |
| final Path metricsPath = new Path(hdfsStoragePath, "namenode_metrics"); |
| GenericTestUtils.waitFor(() -> { |
| try { |
| FSDataInputStream in = fs.open(metricsPath); |
| String metricsOutput = in.readUTF(); |
| in.close(); |
| // Just assert that there is some metrics content in there |
| assertTrue(metricsOutput.contains("JvmMetrics")); |
| return true; |
| } catch (IOException ioe) { |
| return false; |
| } |
| }, 3000, 60000); |
| assertTrue(fs.exists(new Path(OUTPUT_PATH))); |
| } |
| |
| private void assertClusterIsFunctional(Configuration localConf, |
| Properties namenodeProperties) throws IOException { |
| // Test that we can successfully write to / read from the cluster |
| try { |
| URI nameNodeUri = DynoInfraUtils.getNameNodeHdfsUri(namenodeProperties); |
| DistributedFileSystem dynoFS = |
| (DistributedFileSystem) FileSystem.get(nameNodeUri, localConf); |
| Path testFile = new Path("/tmp/test/foo"); |
| dynoFS.mkdir(testFile.getParent(), FsPermission.getDefault()); |
| FSDataOutputStream out = dynoFS.create(testFile, (short) 1); |
| out.write(42); |
| out.hsync(); |
| out.close(); |
| FileStatus[] stats = dynoFS.listStatus(testFile.getParent()); |
| assertEquals(1, stats.length); |
| assertEquals("foo", stats[0].getPath().getName()); |
| } catch (IOException e) { |
| LOG.error("Failed to write or read", e); |
| throw e; |
| } |
| } |
| |
| private void awaitApplicationStartup() |
| throws TimeoutException, InterruptedException { |
| LOG.info("Waiting for application ID to become available"); |
| GenericTestUtils.waitFor(() -> { |
| try { |
| List<ApplicationReport> apps = yarnClient.getApplications(); |
| if (apps.size() == 1) { |
| infraAppId = apps.get(0).getApplicationId(); |
| return true; |
| } else if (apps.size() > 1) { |
| fail("Unexpected: more than one application"); |
| } |
| } catch (IOException | YarnException e) { |
| fail("Unexpected exception: " + e); |
| } |
| return false; |
| }, 1000, 60000); |
| } |
| |
| private Client createAndStartClient(Configuration localConf) { |
| final Client client = new Client(JarFinder.getJar(ApplicationMaster.class), |
| JarFinder.getJar(Assert.class)); |
| client.setConf(localConf); |
| Thread appThread = new Thread(() -> { |
| try { |
| client.run(new String[] {"-" + Client.MASTER_MEMORY_MB_ARG, "128", |
| "-" + Client.CONF_PATH_ARG, confZip.toString(), |
| "-" + Client.BLOCK_LIST_PATH_ARG, |
| blockImageOutputDir.toString(), "-" + Client.FS_IMAGE_DIR_ARG, |
| fsImageTmpPath.getParent().toString(), |
| "-" + Client.HADOOP_BINARY_PATH_ARG, |
| hadoopTarballPath.getAbsolutePath(), |
| "-" + AMOptions.DATANODES_PER_CLUSTER_ARG, "2", |
| "-" + AMOptions.DATANODE_MEMORY_MB_ARG, "128", |
| "-" + AMOptions.DATANODE_NODELABEL_ARG, DATANODE_NODELABEL, |
| "-" + AMOptions.NAMENODE_MEMORY_MB_ARG, "256", |
| "-" + AMOptions.NAMENODE_METRICS_PERIOD_ARG, "1", |
| "-" + AMOptions.NAMENODE_NODELABEL_ARG, NAMENODE_NODELABEL, |
| "-" + AMOptions.SHELL_ENV_ARG, |
| "HADOOP_HOME=" + getHadoopHomeLocation(), |
| "-" + AMOptions.SHELL_ENV_ARG, |
| "HADOOP_CONF_DIR=" + getHadoopHomeLocation() + "/etc/hadoop", |
| "-" + Client.WORKLOAD_REPLAY_ENABLE_ARG, |
| "-" + Client.WORKLOAD_INPUT_PATH_ARG, |
| fs.makeQualified(new Path("/tmp/audit_trace_direct")).toString(), |
| "-" + Client.WORKLOAD_OUTPUT_PATH_ARG, |
| fs.makeQualified(new Path(OUTPUT_PATH)).toString(), |
| "-" + Client.WORKLOAD_THREADS_PER_MAPPER_ARG, "1", |
| "-" + Client.WORKLOAD_START_DELAY_ARG, "10s", |
| "-" + AMOptions.NAMENODE_ARGS_ARG, |
| "-Ddfs.namenode.safemode.extension=0"}); |
| } catch (Exception e) { |
| LOG.error("Error running client", e); |
| } |
| }); |
| |
| appThread.start(); |
| return client; |
| } |
| |
| private static URI getResourcePath(String resourceName) { |
| try { |
| return TestDynamometerInfra.class.getClassLoader() |
| .getResource(resourceName).toURI(); |
| } catch (URISyntaxException e) { |
| return null; |
| } |
| } |
| |
| /** |
| * Get the Hadoop home location (i.e. for {@code HADOOP_HOME}) as the only |
| * directory within the unpacked location of the Hadoop tarball. |
| * |
| * @return The absolute path to the Hadoop home directory. |
| */ |
| private String getHadoopHomeLocation() { |
| File[] files = hadoopUnpackedDir.listFiles(); |
| if (files == null || files.length != 1) { |
| fail("Should be 1 directory within the Hadoop unpacked dir"); |
| } |
| return files[0].getAbsolutePath(); |
| } |
| |
| /** |
| * Look for the resource files relevant to {@code hadoopBinVersion} and upload |
| * them onto the MiniDFSCluster's HDFS for use by the subsequent jobs. |
| * |
| * @param hadoopBinVersion |
| * The version string (e.g. "3.1.1") for which to look for resources. |
| */ |
| private static void uploadFsimageResourcesToHDFS(String hadoopBinVersion) |
| throws IOException { |
| // Keep only the major/minor version for the resources path |
| String[] versionComponents = hadoopBinVersion.split("\\."); |
| if (versionComponents.length < 2) { |
| fail( |
| "At least major and minor version are required to be specified; got: " |
| + hadoopBinVersion); |
| } |
| String hadoopResourcesPath = "hadoop_" + versionComponents[0] + "_" |
| + versionComponents[1]; |
| String fsImageResourcePath = hadoopResourcesPath + "/" + FSIMAGE_FILENAME; |
| fs.copyFromLocalFile(new Path(getResourcePath(fsImageResourcePath)), |
| fsImageTmpPath); |
| fs.copyFromLocalFile( |
| new Path(getResourcePath(fsImageResourcePath + ".md5")), |
| fsImageTmpPath.suffix(".md5")); |
| fs.copyFromLocalFile( |
| new Path(getResourcePath(hadoopResourcesPath + "/" + VERSION_FILENAME)), |
| fsVersionTmpPath); |
| fs.mkdirs(auditTraceDir); |
| IOUtils.copyBytes( |
| TestDynamometerInfra.class.getClassLoader() |
| .getResourceAsStream("audit_trace_direct/audit0"), |
| fs.create(new Path(auditTraceDir, "audit0")), conf, true); |
| fs.mkdirs(blockImageOutputDir); |
| for (String blockFile : new String[] {"dn0-a-0-r-00000", "dn1-a-0-r-00001", |
| "dn2-a-0-r-00002"}) { |
| IOUtils.copyBytes( |
| TestDynamometerInfra.class.getClassLoader() |
| .getResourceAsStream("blocks/" + blockFile), |
| fs.create(new Path(blockImageOutputDir, blockFile)), conf, true); |
| } |
| File tempConfZip = new File(testBaseDir, "conf.zip"); |
| ZipOutputStream zos = new ZipOutputStream( |
| new FileOutputStream(tempConfZip)); |
| for (String file : new String[] {"core-site.xml", "hdfs-site.xml", |
| "log4j.properties"}) { |
| zos.putNextEntry(new ZipEntry("etc/hadoop/" + file)); |
| InputStream is = TestDynamometerInfra.class.getClassLoader() |
| .getResourceAsStream("conf/etc/hadoop/" + file); |
| IOUtils.copyBytes(is, zos, conf, false); |
| is.close(); |
| zos.closeEntry(); |
| } |
| zos.close(); |
| fs.copyFromLocalFile(new Path(tempConfZip.toURI()), confZip); |
| tempConfZip.delete(); |
| } |
| |
| } |