| /** |
| * Licensed to the Apache Software Foundation (ASF) under one |
| * or more contributor license agreements. See the NOTICE file |
| * distributed with this work for additional information |
| * regarding copyright ownership. The ASF licenses this file |
| * to you under the Apache License, Version 2.0 (the |
| * "License"); you may not use this file except in compliance |
| * with the License. You may obtain a copy of the License at |
| * |
| * http://www.apache.org/licenses/LICENSE-2.0 |
| * |
| * Unless required by applicable law or agreed to in writing, software |
| * distributed under the License is distributed on an "AS IS" BASIS, |
| * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. |
| * See the License for the specific language governing permissions and |
| * limitations under the License. |
| */ |
| package org.apache.hadoop.hdfs.server.namenode; |
| |
| import java.io.File; |
| import java.io.FileOutputStream; |
| import java.io.IOException; |
| import java.util.ArrayList; |
| import java.util.Arrays; |
| import java.util.EnumSet; |
| import java.util.List; |
| |
| import javax.security.auth.login.LoginException; |
| |
| import org.apache.commons.logging.Log; |
| import org.apache.commons.logging.LogFactory; |
| import org.apache.commons.logging.impl.Log4JLogger; |
| import org.apache.hadoop.conf.Configuration; |
| import org.apache.hadoop.fs.CreateFlag; |
| import org.apache.hadoop.fs.permission.FsPermission; |
| import org.apache.hadoop.hdfs.protocol.Block; |
| import org.apache.hadoop.hdfs.protocol.BlockListAsLongs; |
| import org.apache.hadoop.hdfs.protocol.DatanodeInfo; |
| import org.apache.hadoop.hdfs.protocol.ExtendedBlock; |
| import org.apache.hadoop.hdfs.protocol.FSConstants; |
| import org.apache.hadoop.hdfs.protocol.LocatedBlock; |
| import org.apache.hadoop.hdfs.server.datanode.DataNode; |
| import org.apache.hadoop.hdfs.server.datanode.DataStorage; |
| import org.apache.hadoop.hdfs.server.protocol.BlockCommand; |
| import org.apache.hadoop.hdfs.server.protocol.DatanodeCommand; |
| import org.apache.hadoop.hdfs.server.protocol.DatanodeProtocol; |
| import org.apache.hadoop.hdfs.server.protocol.DatanodeRegistration; |
| import org.apache.hadoop.hdfs.server.protocol.NamespaceInfo; |
| import org.apache.hadoop.hdfs.HdfsConfiguration; |
| import org.apache.hadoop.hdfs.DFSConfigKeys; |
| import org.apache.hadoop.io.EnumSetWritable; |
| import org.apache.hadoop.net.DNS; |
| import org.apache.hadoop.net.NetworkTopology; |
| import org.apache.hadoop.security.Groups; |
| import org.apache.hadoop.util.StringUtils; |
| import org.apache.log4j.Level; |
| import org.apache.log4j.LogManager; |
| |
| /** |
| * Main class for a series of name-node benchmarks. |
| * |
| * Each benchmark measures throughput and average execution time |
| * of a specific name-node operation, e.g. file creation or block reports. |
| * |
| * The benchmark does not involve any other hadoop components |
| * except for the name-node. Each operation is executed |
| * by calling directly the respective name-node method. |
| * The name-node here is real all other components are simulated. |
| * |
| * Command line arguments for the benchmark include: |
| * <ol> |
| * <li>total number of operations to be performed,</li> |
| * <li>number of threads to run these operations,</li> |
| * <li>followed by operation specific input parameters.</li> |
| * <li>-logLevel L specifies the logging level when the benchmark runs. |
| * The default logging level is {@link Level#ERROR}.</li> |
| * <li>-UGCacheRefreshCount G will cause the benchmark to call |
| * {@link NameNode#refreshUserToGroupsMappings()} after |
| * every G operations, which purges the name-node's user group cache. |
| * By default the refresh is never called.</li> |
| * <li>-keepResults do not clean up the name-space after execution.</li> |
| * <li>-useExisting do not recreate the name-space, use existing data.</li> |
| * </ol> |
| * |
| * The benchmark first generates inputs for each thread so that the |
| * input generation overhead does not effect the resulting statistics. |
| * The number of operations performed by threads is practically the same. |
| * Precisely, the difference between the number of operations |
| * performed by any two threads does not exceed 1. |
| * |
| * Then the benchmark executes the specified number of operations using |
| * the specified number of threads and outputs the resulting stats. |
| */ |
| public class NNThroughputBenchmark { |
| private static final Log LOG = LogFactory.getLog(NNThroughputBenchmark.class); |
| private static final int BLOCK_SIZE = 16; |
| private static final String GENERAL_OPTIONS_USAGE = |
| " [-keepResults] | [-logLevel L] | [-UGCacheRefreshCount G]"; |
| |
| static Configuration config; |
| static NameNode nameNode; |
| |
| NNThroughputBenchmark(Configuration conf) throws IOException, LoginException { |
| config = conf; |
| // We do not need many handlers, since each thread simulates a handler |
| // by calling name-node methods directly |
| config.setInt(DFSConfigKeys.DFS_DATANODE_HANDLER_COUNT_KEY, 1); |
| // set exclude file |
| config.set(DFSConfigKeys.DFS_HOSTS_EXCLUDE, |
| "${hadoop.tmp.dir}/dfs/hosts/exclude"); |
| File excludeFile = new File(config.get(DFSConfigKeys.DFS_HOSTS_EXCLUDE, |
| "exclude")); |
| if(! excludeFile.exists()) { |
| if(!excludeFile.getParentFile().mkdirs()) |
| throw new IOException("NNThroughputBenchmark: cannot mkdir " + excludeFile); |
| } |
| new FileOutputStream(excludeFile).close(); |
| // Start the NameNode |
| String[] argv = new String[] {}; |
| nameNode = NameNode.createNameNode(argv, config); |
| } |
| |
| void close() throws IOException { |
| nameNode.stop(); |
| } |
| |
| static void setNameNodeLoggingLevel(Level logLevel) { |
| LOG.fatal("Log level = " + logLevel.toString()); |
| // change log level to NameNode logs |
| LogManager.getLogger(NameNode.class.getName()).setLevel(logLevel); |
| ((Log4JLogger)NameNode.stateChangeLog).getLogger().setLevel(logLevel); |
| LogManager.getLogger(NetworkTopology.class.getName()).setLevel(logLevel); |
| LogManager.getLogger(FSNamesystem.class.getName()).setLevel(logLevel); |
| LogManager.getLogger(LeaseManager.class.getName()).setLevel(logLevel); |
| LogManager.getLogger(Groups.class.getName()).setLevel(logLevel); |
| } |
| |
| /** |
| * Base class for collecting operation statistics. |
| * |
| * Overload this class in order to run statistics for a |
| * specific name-node operation. |
| */ |
| abstract class OperationStatsBase { |
| protected static final String BASE_DIR_NAME = "/nnThroughputBenchmark"; |
| protected static final String OP_ALL_NAME = "all"; |
| protected static final String OP_ALL_USAGE = "-op all <other ops options>"; |
| |
| protected String baseDir; |
| protected short replication; |
| protected int numThreads = 0; // number of threads |
| protected int numOpsRequired = 0; // number of operations requested |
| protected int numOpsExecuted = 0; // number of operations executed |
| protected long cumulativeTime = 0; // sum of times for each op |
| protected long elapsedTime = 0; // time from start to finish |
| protected boolean keepResults = false;// don't clean base directory on exit |
| protected Level logLevel; // logging level, ERROR by default |
| protected int ugcRefreshCount = 0; // user group cache refresh count |
| |
| protected List<StatsDaemon> daemons; |
| |
| /** |
| * Operation name. |
| */ |
| abstract String getOpName(); |
| |
| /** |
| * Parse command line arguments. |
| * |
| * @param args arguments |
| * @throws IOException |
| */ |
| abstract void parseArguments(List<String> args) throws IOException; |
| |
| /** |
| * Generate inputs for each daemon thread. |
| * |
| * @param opsPerThread number of inputs for each thread. |
| * @throws IOException |
| */ |
| abstract void generateInputs(int[] opsPerThread) throws IOException; |
| |
| /** |
| * This corresponds to the arg1 argument of |
| * {@link #executeOp(int, int, String)}, which can have different meanings |
| * depending on the operation performed. |
| * |
| * @param daemonId |
| * @return the argument |
| */ |
| abstract String getExecutionArgument(int daemonId); |
| |
| /** |
| * Execute name-node operation. |
| * |
| * @param daemonId id of the daemon calling this method. |
| * @param inputIdx serial index of the operation called by the deamon. |
| * @param arg1 operation specific argument. |
| * @return time of the individual name-node call. |
| * @throws IOException |
| */ |
| abstract long executeOp(int daemonId, int inputIdx, String arg1) throws IOException; |
| |
| /** |
| * Print the results of the benchmarking. |
| */ |
| abstract void printResults(); |
| |
| OperationStatsBase() { |
| baseDir = BASE_DIR_NAME + "/" + getOpName(); |
| replication = (short) config.getInt(DFSConfigKeys.DFS_REPLICATION_KEY, 3); |
| numOpsRequired = 10; |
| numThreads = 3; |
| logLevel = Level.ERROR; |
| ugcRefreshCount = Integer.MAX_VALUE; |
| } |
| |
| void benchmark() throws IOException { |
| daemons = new ArrayList<StatsDaemon>(); |
| long start = 0; |
| try { |
| numOpsExecuted = 0; |
| cumulativeTime = 0; |
| if(numThreads < 1) |
| return; |
| int tIdx = 0; // thread index < nrThreads |
| int opsPerThread[] = new int[numThreads]; |
| for(int opsScheduled = 0; opsScheduled < numOpsRequired; |
| opsScheduled += opsPerThread[tIdx++]) { |
| // execute in a separate thread |
| opsPerThread[tIdx] = (numOpsRequired-opsScheduled)/(numThreads-tIdx); |
| if(opsPerThread[tIdx] == 0) |
| opsPerThread[tIdx] = 1; |
| } |
| // if numThreads > numOpsRequired then the remaining threads will do nothing |
| for(; tIdx < numThreads; tIdx++) |
| opsPerThread[tIdx] = 0; |
| generateInputs(opsPerThread); |
| setNameNodeLoggingLevel(logLevel); |
| for(tIdx=0; tIdx < numThreads; tIdx++) |
| daemons.add(new StatsDaemon(tIdx, opsPerThread[tIdx], this)); |
| start = System.currentTimeMillis(); |
| LOG.info("Starting " + numOpsRequired + " " + getOpName() + "(s)."); |
| for(StatsDaemon d : daemons) |
| d.start(); |
| } finally { |
| while(isInPorgress()) { |
| // try {Thread.sleep(500);} catch (InterruptedException e) {} |
| } |
| elapsedTime = System.currentTimeMillis() - start; |
| for(StatsDaemon d : daemons) { |
| incrementStats(d.localNumOpsExecuted, d.localCumulativeTime); |
| // System.out.println(d.toString() + ": ops Exec = " + d.localNumOpsExecuted); |
| } |
| } |
| } |
| |
| private boolean isInPorgress() { |
| for(StatsDaemon d : daemons) |
| if(d.isInProgress()) |
| return true; |
| return false; |
| } |
| |
| void cleanUp() throws IOException { |
| nameNode.setSafeMode(FSConstants.SafeModeAction.SAFEMODE_LEAVE); |
| if(!keepResults) |
| nameNode.delete(getBaseDir(), true); |
| } |
| |
| int getNumOpsExecuted() { |
| return numOpsExecuted; |
| } |
| |
| long getCumulativeTime() { |
| return cumulativeTime; |
| } |
| |
| long getElapsedTime() { |
| return elapsedTime; |
| } |
| |
| long getAverageTime() { |
| return numOpsExecuted == 0 ? 0 : cumulativeTime / numOpsExecuted; |
| } |
| |
| double getOpsPerSecond() { |
| return elapsedTime == 0 ? 0 : 1000*(double)numOpsExecuted / elapsedTime; |
| } |
| |
| String getBaseDir() { |
| return baseDir; |
| } |
| |
| String getClientName(int idx) { |
| return getOpName() + "-client-" + idx; |
| } |
| |
| void incrementStats(int ops, long time) { |
| numOpsExecuted += ops; |
| cumulativeTime += time; |
| } |
| |
| /** |
| * Parse first 2 arguments, corresponding to the "-op" option. |
| * |
| * @param args |
| * @return true if operation is all, which means that options not related |
| * to this operation should be ignored, or false otherwise, meaning |
| * that usage should be printed when an unrelated option is encountered. |
| * @throws IOException |
| */ |
| protected boolean verifyOpArgument(List<String> args) { |
| if(args.size() < 2 || ! args.get(0).startsWith("-op")) |
| printUsage(); |
| |
| // process common options |
| int krIndex = args.indexOf("-keepResults"); |
| keepResults = (krIndex >= 0); |
| if(keepResults) { |
| args.remove(krIndex); |
| } |
| |
| int llIndex = args.indexOf("-logLevel"); |
| if(llIndex >= 0) { |
| if(args.size() <= llIndex + 1) |
| printUsage(); |
| logLevel = Level.toLevel(args.get(llIndex+1), Level.ERROR); |
| args.remove(llIndex+1); |
| args.remove(llIndex); |
| } |
| |
| int ugrcIndex = args.indexOf("-UGCacheRefreshCount"); |
| if(ugrcIndex >= 0) { |
| if(args.size() <= ugrcIndex + 1) |
| printUsage(); |
| int g = Integer.parseInt(args.get(ugrcIndex+1)); |
| if(g > 0) ugcRefreshCount = g; |
| args.remove(ugrcIndex+1); |
| args.remove(ugrcIndex); |
| } |
| |
| String type = args.get(1); |
| if(OP_ALL_NAME.equals(type)) { |
| type = getOpName(); |
| return true; |
| } |
| if(!getOpName().equals(type)) |
| printUsage(); |
| return false; |
| } |
| |
| void printStats() { |
| LOG.info("--- " + getOpName() + " stats ---"); |
| LOG.info("# operations: " + getNumOpsExecuted()); |
| LOG.info("Elapsed Time: " + getElapsedTime()); |
| LOG.info(" Ops per sec: " + getOpsPerSecond()); |
| LOG.info("Average Time: " + getAverageTime()); |
| } |
| } |
| |
| /** |
| * One of the threads that perform stats operations. |
| */ |
| private class StatsDaemon extends Thread { |
| private int daemonId; |
| private int opsPerThread; |
| private String arg1; // argument passed to executeOp() |
| private volatile int localNumOpsExecuted = 0; |
| private volatile long localCumulativeTime = 0; |
| private OperationStatsBase statsOp; |
| |
| StatsDaemon(int daemonId, int nrOps, OperationStatsBase op) { |
| this.daemonId = daemonId; |
| this.opsPerThread = nrOps; |
| this.statsOp = op; |
| setName(toString()); |
| } |
| |
| public void run() { |
| localNumOpsExecuted = 0; |
| localCumulativeTime = 0; |
| arg1 = statsOp.getExecutionArgument(daemonId); |
| try { |
| benchmarkOne(); |
| } catch(IOException ex) { |
| LOG.error("StatsDaemon " + daemonId + " failed: \n" |
| + StringUtils.stringifyException(ex)); |
| } |
| } |
| |
| public String toString() { |
| return "StatsDaemon-" + daemonId; |
| } |
| |
| void benchmarkOne() throws IOException { |
| for(int idx = 0; idx < opsPerThread; idx++) { |
| if((localNumOpsExecuted+1) % statsOp.ugcRefreshCount == 0) |
| nameNode.refreshUserToGroupsMappings(); |
| long stat = statsOp.executeOp(daemonId, idx, arg1); |
| localNumOpsExecuted++; |
| localCumulativeTime += stat; |
| } |
| } |
| |
| boolean isInProgress() { |
| return localNumOpsExecuted < opsPerThread; |
| } |
| |
| /** |
| * Schedule to stop this daemon. |
| */ |
| void terminate() { |
| opsPerThread = localNumOpsExecuted; |
| } |
| } |
| |
| /** |
| * Clean all benchmark result directories. |
| */ |
| class CleanAllStats extends OperationStatsBase { |
| // Operation types |
| static final String OP_CLEAN_NAME = "clean"; |
| static final String OP_CLEAN_USAGE = "-op clean"; |
| |
| CleanAllStats(List<String> args) { |
| super(); |
| parseArguments(args); |
| numOpsRequired = 1; |
| numThreads = 1; |
| keepResults = true; |
| } |
| |
| String getOpName() { |
| return OP_CLEAN_NAME; |
| } |
| |
| void parseArguments(List<String> args) { |
| boolean ignoreUnrelatedOptions = verifyOpArgument(args); |
| if(args.size() > 2 && !ignoreUnrelatedOptions) |
| printUsage(); |
| } |
| |
| void generateInputs(int[] opsPerThread) throws IOException { |
| // do nothing |
| } |
| |
| /** |
| * Does not require the argument |
| */ |
| String getExecutionArgument(int daemonId) { |
| return null; |
| } |
| |
| /** |
| * Remove entire benchmark directory. |
| */ |
| long executeOp(int daemonId, int inputIdx, String ignore) |
| throws IOException { |
| nameNode.setSafeMode(FSConstants.SafeModeAction.SAFEMODE_LEAVE); |
| long start = System.currentTimeMillis(); |
| nameNode.delete(BASE_DIR_NAME, true); |
| long end = System.currentTimeMillis(); |
| return end-start; |
| } |
| |
| void printResults() { |
| LOG.info("--- " + getOpName() + " inputs ---"); |
| LOG.info("Remove directory " + BASE_DIR_NAME); |
| printStats(); |
| } |
| } |
| |
| /** |
| * File creation statistics. |
| * |
| * Each thread creates the same (+ or -1) number of files. |
| * File names are pre-generated during initialization. |
| * The created files do not have blocks. |
| */ |
| class CreateFileStats extends OperationStatsBase { |
| // Operation types |
| static final String OP_CREATE_NAME = "create"; |
| static final String OP_CREATE_USAGE = |
| "-op create [-threads T] [-files N] [-filesPerDir P] [-close]"; |
| |
| protected FileNameGenerator nameGenerator; |
| protected String[][] fileNames; |
| private boolean closeUponCreate; |
| |
| CreateFileStats(List<String> args) { |
| super(); |
| parseArguments(args); |
| } |
| |
| String getOpName() { |
| return OP_CREATE_NAME; |
| } |
| |
| void parseArguments(List<String> args) { |
| boolean ignoreUnrelatedOptions = verifyOpArgument(args); |
| int nrFilesPerDir = 4; |
| closeUponCreate = false; |
| for (int i = 2; i < args.size(); i++) { // parse command line |
| if(args.get(i).equals("-files")) { |
| if(i+1 == args.size()) printUsage(); |
| numOpsRequired = Integer.parseInt(args.get(++i)); |
| } else if(args.get(i).equals("-threads")) { |
| if(i+1 == args.size()) printUsage(); |
| numThreads = Integer.parseInt(args.get(++i)); |
| } else if(args.get(i).equals("-filesPerDir")) { |
| if(i+1 == args.size()) printUsage(); |
| nrFilesPerDir = Integer.parseInt(args.get(++i)); |
| } else if(args.get(i).equals("-close")) { |
| closeUponCreate = true; |
| } else if(!ignoreUnrelatedOptions) |
| printUsage(); |
| } |
| nameGenerator = new FileNameGenerator(getBaseDir(), nrFilesPerDir); |
| } |
| |
| void generateInputs(int[] opsPerThread) throws IOException { |
| assert opsPerThread.length == numThreads : "Error opsPerThread.length"; |
| nameNode.setSafeMode(FSConstants.SafeModeAction.SAFEMODE_LEAVE); |
| // int generatedFileIdx = 0; |
| LOG.info("Generate " + numOpsRequired + " intputs for " + getOpName()); |
| fileNames = new String[numThreads][]; |
| for(int idx=0; idx < numThreads; idx++) { |
| int threadOps = opsPerThread[idx]; |
| fileNames[idx] = new String[threadOps]; |
| for(int jdx=0; jdx < threadOps; jdx++) |
| fileNames[idx][jdx] = nameGenerator. |
| getNextFileName("ThroughputBench"); |
| } |
| } |
| |
| void dummyActionNoSynch(int daemonId, int fileIdx) { |
| for(int i=0; i < 2000; i++) |
| fileNames[daemonId][fileIdx].contains(""+i); |
| } |
| |
| /** |
| * returns client name |
| */ |
| String getExecutionArgument(int daemonId) { |
| return getClientName(daemonId); |
| } |
| |
| /** |
| * Do file create. |
| */ |
| long executeOp(int daemonId, int inputIdx, String clientName) |
| throws IOException { |
| long start = System.currentTimeMillis(); |
| // dummyActionNoSynch(fileIdx); |
| nameNode.create(fileNames[daemonId][inputIdx], FsPermission.getDefault(), |
| clientName, new EnumSetWritable<CreateFlag>(EnumSet |
| .of(CreateFlag.CREATE, CreateFlag.OVERWRITE)), true, replication, BLOCK_SIZE); |
| long end = System.currentTimeMillis(); |
| for(boolean written = !closeUponCreate; !written; |
| written = nameNode.complete(fileNames[daemonId][inputIdx], |
| clientName, null)); |
| return end-start; |
| } |
| |
| void printResults() { |
| LOG.info("--- " + getOpName() + " inputs ---"); |
| LOG.info("nrFiles = " + numOpsRequired); |
| LOG.info("nrThreads = " + numThreads); |
| LOG.info("nrFilesPerDir = " + nameGenerator.getFilesPerDirectory()); |
| printStats(); |
| } |
| } |
| |
| /** |
| * Open file statistics. |
| * |
| * Measure how many open calls (getBlockLocations()) |
| * the name-node can handle per second. |
| */ |
| class OpenFileStats extends CreateFileStats { |
| // Operation types |
| static final String OP_OPEN_NAME = "open"; |
| static final String OP_USAGE_ARGS = |
| " [-threads T] [-files N] [-filesPerDir P] [-useExisting]"; |
| static final String OP_OPEN_USAGE = |
| "-op " + OP_OPEN_NAME + OP_USAGE_ARGS; |
| |
| private boolean useExisting; // do not generate files, use existing ones |
| |
| OpenFileStats(List<String> args) { |
| super(args); |
| } |
| |
| String getOpName() { |
| return OP_OPEN_NAME; |
| } |
| |
| void parseArguments(List<String> args) { |
| int ueIndex = args.indexOf("-useExisting"); |
| useExisting = (ueIndex >= 0); |
| if(useExisting) { |
| args.remove(ueIndex); |
| } |
| super.parseArguments(args); |
| } |
| |
| @SuppressWarnings("deprecation") |
| void generateInputs(int[] opsPerThread) throws IOException { |
| // create files using opsPerThread |
| String[] createArgs = new String[] { |
| "-op", "create", |
| "-threads", String.valueOf(this.numThreads), |
| "-files", String.valueOf(numOpsRequired), |
| "-filesPerDir", |
| String.valueOf(nameGenerator.getFilesPerDirectory()), |
| "-close"}; |
| CreateFileStats opCreate = new CreateFileStats(Arrays.asList(createArgs)); |
| |
| if(!useExisting) { // create files if they were not created before |
| opCreate.benchmark(); |
| LOG.info("Created " + numOpsRequired + " files."); |
| } else { |
| LOG.info("useExisting = true. Assuming " |
| + numOpsRequired + " files have been created before."); |
| } |
| // use the same files for open |
| super.generateInputs(opsPerThread); |
| if(nameNode.getFileInfo(opCreate.getBaseDir()) != null |
| && nameNode.getFileInfo(getBaseDir()) == null) { |
| nameNode.rename(opCreate.getBaseDir(), getBaseDir()); |
| } |
| if(nameNode.getFileInfo(getBaseDir()) == null) { |
| throw new IOException(getBaseDir() + " does not exist."); |
| } |
| } |
| |
| /** |
| * Do file open. |
| */ |
| long executeOp(int daemonId, int inputIdx, String ignore) |
| throws IOException { |
| long start = System.currentTimeMillis(); |
| nameNode.getBlockLocations(fileNames[daemonId][inputIdx], 0L, BLOCK_SIZE); |
| long end = System.currentTimeMillis(); |
| return end-start; |
| } |
| } |
| |
| /** |
| * Delete file statistics. |
| * |
| * Measure how many delete calls the name-node can handle per second. |
| */ |
| class DeleteFileStats extends OpenFileStats { |
| // Operation types |
| static final String OP_DELETE_NAME = "delete"; |
| static final String OP_DELETE_USAGE = |
| "-op " + OP_DELETE_NAME + OP_USAGE_ARGS; |
| |
| DeleteFileStats(List<String> args) { |
| super(args); |
| } |
| |
| String getOpName() { |
| return OP_DELETE_NAME; |
| } |
| |
| long executeOp(int daemonId, int inputIdx, String ignore) |
| throws IOException { |
| long start = System.currentTimeMillis(); |
| nameNode.delete(fileNames[daemonId][inputIdx], false); |
| long end = System.currentTimeMillis(); |
| return end-start; |
| } |
| } |
| |
| /** |
| * List file status statistics. |
| * |
| * Measure how many get-file-status calls the name-node can handle per second. |
| */ |
| class FileStatusStats extends OpenFileStats { |
| // Operation types |
| static final String OP_FILE_STATUS_NAME = "fileStatus"; |
| static final String OP_FILE_STATUS_USAGE = |
| "-op " + OP_FILE_STATUS_NAME + OP_USAGE_ARGS; |
| |
| FileStatusStats(List<String> args) { |
| super(args); |
| } |
| |
| String getOpName() { |
| return OP_FILE_STATUS_NAME; |
| } |
| |
| long executeOp(int daemonId, int inputIdx, String ignore) |
| throws IOException { |
| long start = System.currentTimeMillis(); |
| nameNode.getFileInfo(fileNames[daemonId][inputIdx]); |
| long end = System.currentTimeMillis(); |
| return end-start; |
| } |
| } |
| |
| /** |
| * Rename file statistics. |
| * |
| * Measure how many rename calls the name-node can handle per second. |
| */ |
| class RenameFileStats extends OpenFileStats { |
| // Operation types |
| static final String OP_RENAME_NAME = "rename"; |
| static final String OP_RENAME_USAGE = |
| "-op " + OP_RENAME_NAME + OP_USAGE_ARGS; |
| |
| protected String[][] destNames; |
| |
| RenameFileStats(List<String> args) { |
| super(args); |
| } |
| |
| String getOpName() { |
| return OP_RENAME_NAME; |
| } |
| |
| void generateInputs(int[] opsPerThread) throws IOException { |
| super.generateInputs(opsPerThread); |
| destNames = new String[fileNames.length][]; |
| for(int idx=0; idx < numThreads; idx++) { |
| int nrNames = fileNames[idx].length; |
| destNames[idx] = new String[nrNames]; |
| for(int jdx=0; jdx < nrNames; jdx++) |
| destNames[idx][jdx] = fileNames[idx][jdx] + ".r"; |
| } |
| } |
| |
| @SuppressWarnings("deprecation") |
| long executeOp(int daemonId, int inputIdx, String ignore) |
| throws IOException { |
| long start = System.currentTimeMillis(); |
| nameNode.rename(fileNames[daemonId][inputIdx], |
| destNames[daemonId][inputIdx]); |
| long end = System.currentTimeMillis(); |
| return end-start; |
| } |
| } |
| |
| /** |
| * Minimal data-node simulator. |
| */ |
| private static class TinyDatanode implements Comparable<String> { |
| private static final long DF_CAPACITY = 100*1024*1024; |
| private static final long DF_USED = 0; |
| |
| NamespaceInfo nsInfo; |
| DatanodeRegistration dnRegistration; |
| ArrayList<Block> blocks; |
| int nrBlocks; // actual number of blocks |
| long[] blockReportList; |
| |
| /** |
| * Get data-node in the form |
| * <host name> : <port> |
| * where port is a 6 digit integer. |
| * This is necessary in order to provide lexocographic ordering. |
| * Host names are all the same, the ordering goes by port numbers. |
| */ |
| private static String getNodeName(int port) throws IOException { |
| String machineName = DNS.getDefaultHost("default", "default"); |
| String sPort = String.valueOf(100000 + port); |
| if(sPort.length() > 6) |
| throw new IOException("Too many data-nodes."); |
| return machineName + ":" + sPort; |
| } |
| |
| TinyDatanode(int dnIdx, int blockCapacity) throws IOException { |
| dnRegistration = new DatanodeRegistration(getNodeName(dnIdx)); |
| this.blocks = new ArrayList<Block>(blockCapacity); |
| this.nrBlocks = 0; |
| } |
| |
| String getName() { |
| return dnRegistration.getName(); |
| } |
| |
| void register() throws IOException { |
| // get versions from the namenode |
| nsInfo = nameNode.versionRequest(); |
| dnRegistration.setStorageInfo(new DataStorage(nsInfo, "")); |
| DataNode.setNewStorageID(dnRegistration); |
| // register datanode |
| dnRegistration = nameNode.registerDatanode(dnRegistration); |
| } |
| |
| /** |
| * Send a heartbeat to the name-node. |
| * Ignore reply commands. |
| */ |
| void sendHeartbeat() throws IOException { |
| // register datanode |
| // TODO:FEDERATION currently a single block pool is supported |
| DatanodeCommand[] cmds = nameNode.sendHeartbeat(dnRegistration, |
| DF_CAPACITY, DF_USED, DF_CAPACITY - DF_USED, DF_USED, 0, 0, 0); |
| if(cmds != null) { |
| for (DatanodeCommand cmd : cmds ) { |
| if(LOG.isDebugEnabled()) { |
| LOG.debug("sendHeartbeat Name-node reply: " + cmd.getAction()); |
| } |
| } |
| } |
| } |
| |
| boolean addBlock(Block blk) { |
| if(nrBlocks == blocks.size()) { |
| if(LOG.isDebugEnabled()) { |
| LOG.debug("Cannot add block: datanode capacity = " + blocks.size()); |
| } |
| return false; |
| } |
| blocks.set(nrBlocks, blk); |
| nrBlocks++; |
| return true; |
| } |
| |
| void formBlockReport() { |
| // fill remaining slots with blocks that do not exist |
| for(int idx = blocks.size()-1; idx >= nrBlocks; idx--) |
| blocks.set(idx, new Block(blocks.size() - idx, 0, 0)); |
| blockReportList = new BlockListAsLongs(blocks,null).getBlockListAsLongs(); |
| } |
| |
| long[] getBlockReportList() { |
| return blockReportList; |
| } |
| |
| public int compareTo(String name) { |
| return getName().compareTo(name); |
| } |
| |
| /** |
| * Send a heartbeat to the name-node and replicate blocks if requested. |
| */ |
| @SuppressWarnings("unused") // keep it for future blockReceived benchmark |
| int replicateBlocks() throws IOException { |
| // register datanode |
| // TODO:FEDERATION currently a single block pool is supported |
| DatanodeCommand[] cmds = nameNode.sendHeartbeat(dnRegistration, |
| DF_CAPACITY, DF_USED, DF_CAPACITY - DF_USED, DF_USED, 0, 0, 0); |
| if (cmds != null) { |
| for (DatanodeCommand cmd : cmds) { |
| if (cmd.getAction() == DatanodeProtocol.DNA_TRANSFER) { |
| // Send a copy of a block to another datanode |
| BlockCommand bcmd = (BlockCommand)cmd; |
| return transferBlocks(bcmd.getBlocks(), bcmd.getTargets()); |
| } |
| } |
| } |
| return 0; |
| } |
| |
| /** |
| * Transfer blocks to another data-node. |
| * Just report on behalf of the other data-node |
| * that the blocks have been received. |
| */ |
| private int transferBlocks( Block blocks[], |
| DatanodeInfo xferTargets[][] |
| ) throws IOException { |
| for(int i = 0; i < blocks.length; i++) { |
| DatanodeInfo blockTargets[] = xferTargets[i]; |
| for(int t = 0; t < blockTargets.length; t++) { |
| DatanodeInfo dnInfo = blockTargets[t]; |
| DatanodeRegistration receivedDNReg; |
| receivedDNReg = new DatanodeRegistration(dnInfo.getName()); |
| receivedDNReg.setStorageInfo( |
| new DataStorage(nsInfo, dnInfo.getStorageID())); |
| receivedDNReg.setInfoPort(dnInfo.getInfoPort()); |
| nameNode.blockReceived( receivedDNReg, |
| nameNode.getNamesystem().getBlockPoolId(), |
| new Block[] {blocks[i]}, |
| new String[] {DataNode.EMPTY_DEL_HINT}); |
| } |
| } |
| return blocks.length; |
| } |
| } |
| |
| /** |
| * Block report statistics. |
| * |
| * Each thread here represents its own data-node. |
| * Data-nodes send the same block report each time. |
| * The block report may contain missing or non-existing blocks. |
| */ |
| class BlockReportStats extends OperationStatsBase { |
| static final String OP_BLOCK_REPORT_NAME = "blockReport"; |
| static final String OP_BLOCK_REPORT_USAGE = |
| "-op blockReport [-datanodes T] [-reports N] " + |
| "[-blocksPerReport B] [-blocksPerFile F]"; |
| |
| private int blocksPerReport; |
| private int blocksPerFile; |
| private TinyDatanode[] datanodes; // array of data-nodes sorted by name |
| |
| BlockReportStats(List<String> args) { |
| super(); |
| this.blocksPerReport = 100; |
| this.blocksPerFile = 10; |
| // set heartbeat interval to 3 min, so that expiration were 40 min |
| config.setLong(DFSConfigKeys.DFS_HEARTBEAT_INTERVAL_KEY, 3 * 60); |
| parseArguments(args); |
| // adjust replication to the number of data-nodes |
| this.replication = (short)Math.min((int)replication, getNumDatanodes()); |
| } |
| |
| /** |
| * Each thread pretends its a data-node here. |
| */ |
| private int getNumDatanodes() { |
| return numThreads; |
| } |
| |
| String getOpName() { |
| return OP_BLOCK_REPORT_NAME; |
| } |
| |
| void parseArguments(List<String> args) { |
| boolean ignoreUnrelatedOptions = verifyOpArgument(args); |
| for (int i = 2; i < args.size(); i++) { // parse command line |
| if(args.get(i).equals("-reports")) { |
| if(i+1 == args.size()) printUsage(); |
| numOpsRequired = Integer.parseInt(args.get(++i)); |
| } else if(args.get(i).equals("-datanodes")) { |
| if(i+1 == args.size()) printUsage(); |
| numThreads = Integer.parseInt(args.get(++i)); |
| } else if(args.get(i).equals("-blocksPerReport")) { |
| if(i+1 == args.size()) printUsage(); |
| blocksPerReport = Integer.parseInt(args.get(++i)); |
| } else if(args.get(i).equals("-blocksPerFile")) { |
| if(i+1 == args.size()) printUsage(); |
| blocksPerFile = Integer.parseInt(args.get(++i)); |
| } else if(!ignoreUnrelatedOptions) |
| printUsage(); |
| } |
| } |
| |
| void generateInputs(int[] ignore) throws IOException { |
| int nrDatanodes = getNumDatanodes(); |
| int nrBlocks = (int)Math.ceil((double)blocksPerReport * nrDatanodes |
| / replication); |
| int nrFiles = (int)Math.ceil((double)nrBlocks / blocksPerFile); |
| datanodes = new TinyDatanode[nrDatanodes]; |
| // create data-nodes |
| String prevDNName = ""; |
| for(int idx=0; idx < nrDatanodes; idx++) { |
| datanodes[idx] = new TinyDatanode(idx, blocksPerReport); |
| datanodes[idx].register(); |
| assert datanodes[idx].getName().compareTo(prevDNName) > 0 |
| : "Data-nodes must be sorted lexicographically."; |
| datanodes[idx].sendHeartbeat(); |
| prevDNName = datanodes[idx].getName(); |
| } |
| |
| // create files |
| LOG.info("Creating " + nrFiles + " with " + blocksPerFile + " blocks each."); |
| FileNameGenerator nameGenerator; |
| nameGenerator = new FileNameGenerator(getBaseDir(), 100); |
| String clientName = getClientName(007); |
| nameNode.setSafeMode(FSConstants.SafeModeAction.SAFEMODE_LEAVE); |
| for(int idx=0; idx < nrFiles; idx++) { |
| String fileName = nameGenerator.getNextFileName("ThroughputBench"); |
| nameNode.create(fileName, FsPermission.getDefault(), clientName, |
| new EnumSetWritable<CreateFlag>(EnumSet.of(CreateFlag.CREATE, CreateFlag.OVERWRITE)), true, replication, |
| BLOCK_SIZE); |
| ExtendedBlock lastBlock = addBlocks(fileName, clientName); |
| nameNode.complete(fileName, clientName, lastBlock); |
| } |
| // prepare block reports |
| for(int idx=0; idx < nrDatanodes; idx++) { |
| datanodes[idx].formBlockReport(); |
| } |
| } |
| |
| private ExtendedBlock addBlocks(String fileName, String clientName) |
| throws IOException { |
| ExtendedBlock prevBlock = null; |
| for(int jdx = 0; jdx < blocksPerFile; jdx++) { |
| LocatedBlock loc = nameNode.addBlock(fileName, clientName, prevBlock, null); |
| prevBlock = loc.getBlock(); |
| for(DatanodeInfo dnInfo : loc.getLocations()) { |
| int dnIdx = Arrays.binarySearch(datanodes, dnInfo.getName()); |
| datanodes[dnIdx].addBlock(loc.getBlock().getLocalBlock()); |
| nameNode.blockReceived( |
| datanodes[dnIdx].dnRegistration, |
| loc.getBlock().getBlockPoolId(), |
| new Block[] {loc.getBlock().getLocalBlock()}, |
| new String[] {""}); |
| } |
| } |
| return prevBlock; |
| } |
| |
| /** |
| * Does not require the argument |
| */ |
| String getExecutionArgument(int daemonId) { |
| return null; |
| } |
| |
| long executeOp(int daemonId, int inputIdx, String ignore) throws IOException { |
| assert daemonId < numThreads : "Wrong daemonId."; |
| TinyDatanode dn = datanodes[daemonId]; |
| long start = System.currentTimeMillis(); |
| nameNode.blockReport(dn.dnRegistration, nameNode.getNamesystem() |
| .getBlockPoolId(), dn.getBlockReportList()); |
| long end = System.currentTimeMillis(); |
| return end-start; |
| } |
| |
| void printResults() { |
| String blockDistribution = ""; |
| String delim = "("; |
| for(int idx=0; idx < getNumDatanodes(); idx++) { |
| blockDistribution += delim + datanodes[idx].nrBlocks; |
| delim = ", "; |
| } |
| blockDistribution += ")"; |
| LOG.info("--- " + getOpName() + " inputs ---"); |
| LOG.info("reports = " + numOpsRequired); |
| LOG.info("datanodes = " + numThreads + " " + blockDistribution); |
| LOG.info("blocksPerReport = " + blocksPerReport); |
| LOG.info("blocksPerFile = " + blocksPerFile); |
| printStats(); |
| } |
| } // end BlockReportStats |
| |
| /** |
| * Measures how fast replication monitor can compute data-node work. |
| * |
| * It runs only one thread until no more work can be scheduled. |
| */ |
| class ReplicationStats extends OperationStatsBase { |
| static final String OP_REPLICATION_NAME = "replication"; |
| static final String OP_REPLICATION_USAGE = |
| "-op replication [-datanodes T] [-nodesToDecommission D] " + |
| "[-nodeReplicationLimit C] [-totalBlocks B] [-replication R]"; |
| |
| private BlockReportStats blockReportObject; |
| private int numDatanodes; |
| private int nodesToDecommission; |
| private int nodeReplicationLimit; |
| private int totalBlocks; |
| private int numDecommissionedBlocks; |
| private int numPendingBlocks; |
| |
| ReplicationStats(List<String> args) { |
| super(); |
| numThreads = 1; |
| numDatanodes = 3; |
| nodesToDecommission = 1; |
| nodeReplicationLimit = 100; |
| totalBlocks = 100; |
| parseArguments(args); |
| // number of operations is 4 times the number of decommissioned |
| // blocks divided by the number of needed replications scanned |
| // by the replication monitor in one iteration |
| numOpsRequired = (totalBlocks*replication*nodesToDecommission*2) |
| / (numDatanodes*numDatanodes); |
| |
| String[] blkReportArgs = { |
| "-op", "blockReport", |
| "-datanodes", String.valueOf(numDatanodes), |
| "-blocksPerReport", String.valueOf(totalBlocks*replication/numDatanodes), |
| "-blocksPerFile", String.valueOf(numDatanodes)}; |
| blockReportObject = new BlockReportStats(Arrays.asList(blkReportArgs)); |
| numDecommissionedBlocks = 0; |
| numPendingBlocks = 0; |
| } |
| |
| String getOpName() { |
| return OP_REPLICATION_NAME; |
| } |
| |
| void parseArguments(List<String> args) { |
| boolean ignoreUnrelatedOptions = verifyOpArgument(args); |
| for (int i = 2; i < args.size(); i++) { // parse command line |
| if(args.get(i).equals("-datanodes")) { |
| if(i+1 == args.size()) printUsage(); |
| numDatanodes = Integer.parseInt(args.get(++i)); |
| } else if(args.get(i).equals("-nodesToDecommission")) { |
| if(i+1 == args.size()) printUsage(); |
| nodesToDecommission = Integer.parseInt(args.get(++i)); |
| } else if(args.get(i).equals("-nodeReplicationLimit")) { |
| if(i+1 == args.size()) printUsage(); |
| nodeReplicationLimit = Integer.parseInt(args.get(++i)); |
| } else if(args.get(i).equals("-totalBlocks")) { |
| if(i+1 == args.size()) printUsage(); |
| totalBlocks = Integer.parseInt(args.get(++i)); |
| } else if(args.get(i).equals("-replication")) { |
| if(i+1 == args.size()) printUsage(); |
| replication = Short.parseShort(args.get(++i)); |
| } else if(!ignoreUnrelatedOptions) |
| printUsage(); |
| } |
| } |
| |
| void generateInputs(int[] ignore) throws IOException { |
| final FSNamesystem namesystem = nameNode.getNamesystem(); |
| |
| // start data-nodes; create a bunch of files; generate block reports. |
| blockReportObject.generateInputs(ignore); |
| // stop replication monitor |
| namesystem.replthread.interrupt(); |
| try { |
| namesystem.replthread.join(); |
| } catch(InterruptedException ei) { |
| return; |
| } |
| // report blocks once |
| int nrDatanodes = blockReportObject.getNumDatanodes(); |
| for(int idx=0; idx < nrDatanodes; idx++) { |
| blockReportObject.executeOp(idx, 0, null); |
| } |
| // decommission data-nodes |
| decommissionNodes(); |
| // set node replication limit |
| namesystem.setNodeReplicationLimit(nodeReplicationLimit); |
| } |
| |
| private void decommissionNodes() throws IOException { |
| String excludeFN = config.get(DFSConfigKeys.DFS_HOSTS_EXCLUDE, "exclude"); |
| FileOutputStream excludeFile = new FileOutputStream(excludeFN); |
| excludeFile.getChannel().truncate(0L); |
| int nrDatanodes = blockReportObject.getNumDatanodes(); |
| numDecommissionedBlocks = 0; |
| for(int i=0; i < nodesToDecommission; i++) { |
| TinyDatanode dn = blockReportObject.datanodes[nrDatanodes-1-i]; |
| numDecommissionedBlocks += dn.nrBlocks; |
| excludeFile.write(dn.getName().getBytes()); |
| excludeFile.write('\n'); |
| LOG.info("Datanode " + dn.getName() + " is decommissioned."); |
| } |
| excludeFile.close(); |
| nameNode.refreshNodes(); |
| } |
| |
| /** |
| * Does not require the argument |
| */ |
| String getExecutionArgument(int daemonId) { |
| return null; |
| } |
| |
| long executeOp(int daemonId, int inputIdx, String ignore) throws IOException { |
| assert daemonId < numThreads : "Wrong daemonId."; |
| long start = System.currentTimeMillis(); |
| // compute data-node work |
| int work = nameNode.getNamesystem().computeDatanodeWork(); |
| long end = System.currentTimeMillis(); |
| numPendingBlocks += work; |
| if(work == 0) |
| daemons.get(daemonId).terminate(); |
| return end-start; |
| } |
| |
| void printResults() { |
| String blockDistribution = ""; |
| String delim = "("; |
| int totalReplicas = 0; |
| for(int idx=0; idx < blockReportObject.getNumDatanodes(); idx++) { |
| totalReplicas += blockReportObject.datanodes[idx].nrBlocks; |
| blockDistribution += delim + blockReportObject.datanodes[idx].nrBlocks; |
| delim = ", "; |
| } |
| blockDistribution += ")"; |
| LOG.info("--- " + getOpName() + " inputs ---"); |
| LOG.info("numOpsRequired = " + numOpsRequired); |
| LOG.info("datanodes = " + numDatanodes + " " + blockDistribution); |
| LOG.info("decommissioned datanodes = " + nodesToDecommission); |
| LOG.info("datanode replication limit = " + nodeReplicationLimit); |
| LOG.info("total blocks = " + totalBlocks); |
| printStats(); |
| LOG.info("decommissioned blocks = " + numDecommissionedBlocks); |
| LOG.info("pending replications = " + numPendingBlocks); |
| LOG.info("replications per sec: " + getBlocksPerSecond()); |
| } |
| |
| private double getBlocksPerSecond() { |
| return elapsedTime == 0 ? 0 : 1000*(double)numPendingBlocks / elapsedTime; |
| } |
| |
| } // end ReplicationStats |
| |
| static void printUsage() { |
| System.err.println("Usage: NNThroughputBenchmark" |
| + "\n\t" + OperationStatsBase.OP_ALL_USAGE |
| + " | \n\t" + CreateFileStats.OP_CREATE_USAGE |
| + " | \n\t" + OpenFileStats.OP_OPEN_USAGE |
| + " | \n\t" + DeleteFileStats.OP_DELETE_USAGE |
| + " | \n\t" + FileStatusStats.OP_FILE_STATUS_USAGE |
| + " | \n\t" + RenameFileStats.OP_RENAME_USAGE |
| + " | \n\t" + BlockReportStats.OP_BLOCK_REPORT_USAGE |
| + " | \n\t" + ReplicationStats.OP_REPLICATION_USAGE |
| + " | \n\t" + CleanAllStats.OP_CLEAN_USAGE |
| + " | \n\t" + GENERAL_OPTIONS_USAGE |
| ); |
| System.exit(-1); |
| } |
| |
| /** |
| * Main method of the benchmark. |
| * @param args command line parameters |
| */ |
| public static void runBenchmark(Configuration conf, List<String> args) throws Exception { |
| if(args.size() < 2 || ! args.get(0).startsWith("-op")) |
| printUsage(); |
| |
| String type = args.get(1); |
| boolean runAll = OperationStatsBase.OP_ALL_NAME.equals(type); |
| |
| NNThroughputBenchmark bench = null; |
| List<OperationStatsBase> ops = new ArrayList<OperationStatsBase>(); |
| OperationStatsBase opStat = null; |
| try { |
| bench = new NNThroughputBenchmark(conf); |
| if(runAll || CreateFileStats.OP_CREATE_NAME.equals(type)) { |
| opStat = bench.new CreateFileStats(args); |
| ops.add(opStat); |
| } |
| if(runAll || OpenFileStats.OP_OPEN_NAME.equals(type)) { |
| opStat = bench.new OpenFileStats(args); |
| ops.add(opStat); |
| } |
| if(runAll || DeleteFileStats.OP_DELETE_NAME.equals(type)) { |
| opStat = bench.new DeleteFileStats(args); |
| ops.add(opStat); |
| } |
| if(runAll || FileStatusStats.OP_FILE_STATUS_NAME.equals(type)) { |
| opStat = bench.new FileStatusStats(args); |
| ops.add(opStat); |
| } |
| if(runAll || RenameFileStats.OP_RENAME_NAME.equals(type)) { |
| opStat = bench.new RenameFileStats(args); |
| ops.add(opStat); |
| } |
| if(runAll || BlockReportStats.OP_BLOCK_REPORT_NAME.equals(type)) { |
| opStat = bench.new BlockReportStats(args); |
| ops.add(opStat); |
| } |
| if(runAll || ReplicationStats.OP_REPLICATION_NAME.equals(type)) { |
| opStat = bench.new ReplicationStats(args); |
| ops.add(opStat); |
| } |
| if(runAll || CleanAllStats.OP_CLEAN_NAME.equals(type)) { |
| opStat = bench.new CleanAllStats(args); |
| ops.add(opStat); |
| } |
| if(ops.size() == 0) |
| printUsage(); |
| // run each benchmark |
| for(OperationStatsBase op : ops) { |
| LOG.info("Starting benchmark: " + op.getOpName()); |
| op.benchmark(); |
| op.cleanUp(); |
| } |
| // print statistics |
| for(OperationStatsBase op : ops) { |
| LOG.info(""); |
| op.printResults(); |
| } |
| } catch(Exception e) { |
| LOG.error(StringUtils.stringifyException(e)); |
| throw e; |
| } finally { |
| if(bench != null) |
| bench.close(); |
| } |
| } |
| |
| public static void main(String[] args) throws Exception { |
| runBenchmark(new HdfsConfiguration(), |
| new ArrayList<String>(Arrays.asList(args))); |
| } |
| } |