| /** |
| * Licensed to the Apache Software Foundation (ASF) under one |
| * or more contributor license agreements. See the NOTICE file |
| * distributed with this work for additional information |
| * regarding copyright ownership. The ASF licenses this file |
| * to you under the Apache License, Version 2.0 (the |
| * "License"); you may not use this file except in compliance |
| * with the License. You may obtain a copy of the License at |
| * |
| * http://www.apache.org/licenses/LICENSE-2.0 |
| * |
| * Unless required by applicable law or agreed to in writing, software |
| * distributed under the License is distributed on an "AS IS" BASIS, |
| * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. |
| * See the License for the specific language governing permissions and |
| * limitations under the License. |
| */ |
| package org.apache.hadoop.hdfs.server.balancer; |
| |
| import static org.apache.hadoop.fs.CommonConfigurationKeys.IPC_CLIENT_CONNECT_MAX_RETRIES_ON_SASL_KEY; |
| import static org.apache.hadoop.fs.StorageType.DEFAULT; |
| import static org.apache.hadoop.fs.StorageType.RAM_DISK; |
| import static org.apache.hadoop.hdfs.DFSConfigKeys.DFS_CLIENT_HTTPS_KEYSTORE_RESOURCE_KEY; |
| import static org.apache.hadoop.hdfs.DFSConfigKeys.DFS_SERVER_HTTPS_KEYSTORE_RESOURCE_KEY; |
| import static org.apache.hadoop.hdfs.client.HdfsClientConfigKeys.DFS_DATA_TRANSFER_PROTECTION_KEY; |
| import static org.apache.hadoop.hdfs.DFSConfigKeys.DFS_BALANCER_ADDRESS_KEY; |
| import static org.apache.hadoop.hdfs.DFSConfigKeys.DFS_BALANCER_KERBEROS_PRINCIPAL_KEY; |
| import static org.apache.hadoop.hdfs.DFSConfigKeys.DFS_BALANCER_KEYTAB_ENABLED_KEY; |
| import static org.apache.hadoop.hdfs.DFSConfigKeys.DFS_BALANCER_KEYTAB_FILE_KEY; |
| import static org.apache.hadoop.hdfs.DFSConfigKeys.DFS_BLOCK_ACCESS_TOKEN_ENABLE_KEY; |
| import static org.apache.hadoop.hdfs.DFSConfigKeys.DFS_BLOCK_SIZE_KEY; |
| import static org.apache.hadoop.hdfs.DFSConfigKeys.DFS_DATANODE_BLOCK_PINNING_ENABLED; |
| import static org.apache.hadoop.hdfs.DFSConfigKeys.DFS_DATANODE_HTTPS_ADDRESS_KEY; |
| import static org.apache.hadoop.hdfs.DFSConfigKeys.DFS_DATANODE_KERBEROS_PRINCIPAL_KEY; |
| import static org.apache.hadoop.hdfs.DFSConfigKeys.DFS_DATANODE_KEYTAB_FILE_KEY; |
| import static org.apache.hadoop.hdfs.DFSConfigKeys.DFS_DATANODE_LAZY_WRITER_INTERVAL_SEC; |
| import static org.apache.hadoop.hdfs.DFSConfigKeys.DFS_DATANODE_MAX_LOCKED_MEMORY_KEY; |
| import static org.apache.hadoop.hdfs.DFSConfigKeys.DFS_HEARTBEAT_INTERVAL_KEY; |
| import static org.apache.hadoop.hdfs.DFSConfigKeys.DFS_HTTP_POLICY_KEY; |
| import static org.apache.hadoop.hdfs.DFSConfigKeys.DFS_NAMENODE_HEARTBEAT_RECHECK_INTERVAL_KEY; |
| import static org.apache.hadoop.hdfs.DFSConfigKeys.DFS_NAMENODE_HTTPS_ADDRESS_KEY; |
| import static org.apache.hadoop.hdfs.DFSConfigKeys.DFS_NAMENODE_KERBEROS_PRINCIPAL_KEY; |
| import static org.apache.hadoop.hdfs.DFSConfigKeys.DFS_NAMENODE_KEYTAB_FILE_KEY; |
| import static org.apache.hadoop.hdfs.DFSConfigKeys.DFS_NAMENODE_LAZY_PERSIST_FILE_SCRUB_INTERVAL_SEC; |
| import static org.apache.hadoop.hdfs.DFSConfigKeys.DFS_WEB_AUTHENTICATION_KERBEROS_PRINCIPAL_KEY; |
| import org.junit.AfterClass; |
| import static org.junit.Assert.assertEquals; |
| import static org.junit.Assert.assertTrue; |
| import static org.junit.Assert.fail; |
| import static org.junit.Assume.assumeTrue; |
| import static org.mockito.Mockito.any; |
| import static org.mockito.Mockito.anyLong; |
| import static org.mockito.Mockito.doAnswer; |
| |
| import java.io.File; |
| import java.io.IOException; |
| import java.io.OutputStream; |
| import java.io.PrintWriter; |
| import java.net.InetAddress; |
| import java.net.InetSocketAddress; |
| import java.net.URI; |
| import java.security.PrivilegedExceptionAction; |
| import java.util.ArrayList; |
| import java.util.Arrays; |
| import java.util.Collection; |
| import java.util.Collections; |
| import java.util.HashSet; |
| import java.util.List; |
| import java.util.Properties; |
| import java.util.Random; |
| import java.util.Set; |
| import java.util.concurrent.TimeoutException; |
| |
| import org.apache.commons.lang.StringUtils; |
| import org.apache.commons.logging.Log; |
| import org.apache.commons.logging.LogFactory; |
| import org.apache.hadoop.HadoopIllegalArgumentException; |
| import org.apache.hadoop.conf.Configuration; |
| import org.apache.hadoop.fs.FSDataOutputStream; |
| import org.apache.hadoop.fs.FileSystem; |
| import org.apache.hadoop.fs.FileUtil; |
| import org.apache.hadoop.fs.Path; |
| import org.apache.hadoop.fs.StorageType; |
| import org.apache.hadoop.fs.permission.FsPermission; |
| import org.apache.hadoop.hdfs.DFSClient; |
| import org.apache.hadoop.hdfs.DFSConfigKeys; |
| import org.apache.hadoop.hdfs.DFSTestUtil; |
| import org.apache.hadoop.hdfs.DFSUtil; |
| import org.apache.hadoop.hdfs.DistributedFileSystem; |
| import org.apache.hadoop.hdfs.HdfsConfiguration; |
| import org.apache.hadoop.hdfs.MiniDFSCluster; |
| import org.apache.hadoop.hdfs.NameNodeProxies; |
| import org.apache.hadoop.hdfs.protocol.Block; |
| import org.apache.hadoop.hdfs.protocol.ClientProtocol; |
| import org.apache.hadoop.hdfs.protocol.DatanodeID; |
| import org.apache.hadoop.hdfs.protocol.DatanodeInfo; |
| import org.apache.hadoop.hdfs.protocol.ExtendedBlock; |
| import org.apache.hadoop.hdfs.protocol.HdfsConstants; |
| import org.apache.hadoop.hdfs.protocol.HdfsConstants.DatanodeReportType; |
| import org.apache.hadoop.hdfs.protocol.LocatedBlock; |
| import org.apache.hadoop.hdfs.server.balancer.Balancer.Cli; |
| import org.apache.hadoop.hdfs.server.balancer.Balancer.Result; |
| import org.apache.hadoop.hdfs.server.blockmanagement.BlockPlacementPolicy; |
| import org.apache.hadoop.hdfs.server.blockmanagement.BlockPlacementPolicyWithUpgradeDomain; |
| import org.apache.hadoop.hdfs.server.blockmanagement.BlockPlacementStatus; |
| import org.apache.hadoop.hdfs.server.blockmanagement.DatanodeManager; |
| import org.apache.hadoop.hdfs.server.common.HdfsServerConstants.StartupOption; |
| import org.apache.hadoop.hdfs.server.datanode.DataNode; |
| import org.apache.hadoop.hdfs.server.datanode.SimulatedFSDataset; |
| import org.apache.hadoop.hdfs.server.datanode.fsdataset.impl.LazyPersistTestCase; |
| import org.apache.hadoop.hdfs.server.namenode.FSNamesystem; |
| import org.apache.hadoop.hdfs.server.namenode.NameNode; |
| import org.apache.hadoop.hdfs.server.namenode.NameNodeAdapter; |
| import org.apache.hadoop.hdfs.server.protocol.BlocksWithLocations; |
| import org.apache.hadoop.http.HttpConfig; |
| import org.apache.hadoop.io.IOUtils; |
| import org.apache.hadoop.minikdc.MiniKdc; |
| import org.apache.hadoop.security.authentication.util.KerberosName; |
| import org.apache.hadoop.security.SecurityUtil; |
| import org.apache.hadoop.security.UserGroupInformation; |
| import org.apache.hadoop.security.ssl.KeyStoreTestUtil; |
| import org.apache.hadoop.test.GenericTestUtils; |
| import org.apache.hadoop.util.Time; |
| import org.apache.hadoop.util.Tool; |
| import org.apache.log4j.Level; |
| import org.junit.After; |
| import org.junit.Test; |
| import org.mockito.invocation.InvocationOnMock; |
| import org.mockito.stubbing.Answer; |
| |
| /** |
| * This class tests if a balancer schedules tasks correctly. |
| */ |
| public class TestBalancer { |
| private static final Log LOG = LogFactory.getLog(TestBalancer.class); |
| |
| static { |
| GenericTestUtils.setLogLevel(Balancer.LOG, Level.ALL); |
| GenericTestUtils.setLogLevel(Dispatcher.LOG, Level.DEBUG); |
| } |
| |
| final static long CAPACITY = 5000L; |
| final static String RACK0 = "/rack0"; |
| final static String RACK1 = "/rack1"; |
| final static String RACK2 = "/rack2"; |
| final private static String fileName = "/tmp.txt"; |
| final static Path filePath = new Path(fileName); |
| final static private String username = "balancer"; |
| private static String principal; |
| private static File baseDir; |
| private static String keystoresDir; |
| private static String sslConfDir; |
| private static MiniKdc kdc; |
| private static File keytabFile; |
| private MiniDFSCluster cluster; |
| |
| static void initSecureConf(Configuration conf) throws Exception { |
| baseDir = new File(System.getProperty("test.build.dir", "target/test-dir"), |
| TestBalancer.class.getSimpleName()); |
| FileUtil.fullyDelete(baseDir); |
| assertTrue(baseDir.mkdirs()); |
| |
| Properties kdcConf = MiniKdc.createConf(); |
| kdc = new MiniKdc(kdcConf, baseDir); |
| kdc.start(); |
| |
| SecurityUtil.setAuthenticationMethod( |
| UserGroupInformation.AuthenticationMethod.KERBEROS, conf); |
| UserGroupInformation.setConfiguration(conf); |
| KerberosName.resetDefaultRealm(); |
| assertTrue("Expected configuration to enable security", |
| UserGroupInformation.isSecurityEnabled()); |
| |
| keytabFile = new File(baseDir, username + ".keytab"); |
| String keytab = keytabFile.getAbsolutePath(); |
| // Windows will not reverse name lookup "127.0.0.1" to "localhost". |
| String krbInstance = Path.WINDOWS ? "127.0.0.1" : "localhost"; |
| principal = username + "/" + krbInstance + "@" + kdc.getRealm(); |
| String spnegoPrincipal = "HTTP/" + krbInstance + "@" + kdc.getRealm(); |
| kdc.createPrincipal(keytabFile, username, username + "/" + krbInstance, |
| "HTTP/" + krbInstance); |
| |
| conf.set(DFS_NAMENODE_KERBEROS_PRINCIPAL_KEY, principal); |
| conf.set(DFS_NAMENODE_KEYTAB_FILE_KEY, keytab); |
| conf.set(DFS_DATANODE_KERBEROS_PRINCIPAL_KEY, principal); |
| conf.set(DFS_DATANODE_KEYTAB_FILE_KEY, keytab); |
| conf.set(DFS_WEB_AUTHENTICATION_KERBEROS_PRINCIPAL_KEY, spnegoPrincipal); |
| conf.setBoolean(DFS_BLOCK_ACCESS_TOKEN_ENABLE_KEY, true); |
| conf.set(DFS_DATA_TRANSFER_PROTECTION_KEY, "authentication"); |
| conf.set(DFS_HTTP_POLICY_KEY, HttpConfig.Policy.HTTPS_ONLY.name()); |
| conf.set(DFS_NAMENODE_HTTPS_ADDRESS_KEY, "localhost:0"); |
| conf.set(DFS_DATANODE_HTTPS_ADDRESS_KEY, "localhost:0"); |
| conf.setInt(IPC_CLIENT_CONNECT_MAX_RETRIES_ON_SASL_KEY, 10); |
| |
| conf.setBoolean(DFS_BALANCER_KEYTAB_ENABLED_KEY, true); |
| conf.set(DFS_BALANCER_ADDRESS_KEY, "localhost:0"); |
| conf.set(DFS_BALANCER_KEYTAB_FILE_KEY, keytab); |
| conf.set(DFS_BALANCER_KERBEROS_PRINCIPAL_KEY, principal); |
| |
| keystoresDir = baseDir.getAbsolutePath(); |
| sslConfDir = KeyStoreTestUtil.getClasspathDir(TestBalancer.class); |
| KeyStoreTestUtil.setupSSLConfig(keystoresDir, sslConfDir, conf, false); |
| |
| conf.set(DFS_CLIENT_HTTPS_KEYSTORE_RESOURCE_KEY, |
| KeyStoreTestUtil.getClientSSLConfigFileName()); |
| conf.set(DFS_SERVER_HTTPS_KEYSTORE_RESOURCE_KEY, |
| KeyStoreTestUtil.getServerSSLConfigFileName()); |
| initConf(conf); |
| } |
| |
| @After |
| public void shutdown() throws Exception { |
| if (cluster != null) { |
| cluster.shutdown(); |
| cluster = null; |
| } |
| } |
| |
| ClientProtocol client; |
| |
| static final long TIMEOUT = 40000L; //msec |
| static final double CAPACITY_ALLOWED_VARIANCE = 0.005; // 0.5% |
| static final double BALANCE_ALLOWED_VARIANCE = 0.11; // 10%+delta |
| static final int DEFAULT_BLOCK_SIZE = 100; |
| static final int DEFAULT_RAM_DISK_BLOCK_SIZE = 5 * 1024 * 1024; |
| private static final Random r = new Random(); |
| |
| static { |
| initTestSetup(); |
| } |
| |
| public static void initTestSetup() { |
| // do not create id file since it occupies the disk space |
| NameNodeConnector.setWrite2IdFile(false); |
| } |
| |
| static void initConf(Configuration conf) { |
| conf.setLong(DFSConfigKeys.DFS_BLOCK_SIZE_KEY, DEFAULT_BLOCK_SIZE); |
| conf.setInt(DFSConfigKeys.DFS_BYTES_PER_CHECKSUM_KEY, DEFAULT_BLOCK_SIZE); |
| conf.setLong(DFSConfigKeys.DFS_HEARTBEAT_INTERVAL_KEY, 1L); |
| conf.setInt(DFSConfigKeys.DFS_NAMENODE_HEARTBEAT_RECHECK_INTERVAL_KEY, 500); |
| conf.setLong(DFSConfigKeys.DFS_NAMENODE_REPLICATION_INTERVAL_KEY, 1L); |
| SimulatedFSDataset.setFactory(conf); |
| |
| conf.setLong(DFSConfigKeys.DFS_BALANCER_MOVEDWINWIDTH_KEY, 2000L); |
| conf.setLong(DFSConfigKeys.DFS_BALANCER_GETBLOCKS_MIN_BLOCK_SIZE_KEY, 1L); |
| conf.setInt(DFSConfigKeys.DFS_BALANCER_MAX_NO_MOVE_INTERVAL_KEY, 5*1000); |
| } |
| |
| static void initConfWithRamDisk(Configuration conf, |
| long ramDiskCapacity) { |
| conf.setLong(DFS_BLOCK_SIZE_KEY, DEFAULT_RAM_DISK_BLOCK_SIZE); |
| conf.setLong(DFS_DATANODE_MAX_LOCKED_MEMORY_KEY, ramDiskCapacity); |
| conf.setInt(DFS_NAMENODE_LAZY_PERSIST_FILE_SCRUB_INTERVAL_SEC, 3); |
| conf.setLong(DFS_HEARTBEAT_INTERVAL_KEY, 1); |
| conf.setInt(DFS_NAMENODE_HEARTBEAT_RECHECK_INTERVAL_KEY, 500); |
| conf.setInt(DFS_DATANODE_LAZY_WRITER_INTERVAL_SEC, 1); |
| conf.setInt(DFSConfigKeys.DFS_BALANCER_MAX_NO_MOVE_INTERVAL_KEY, 5*1000); |
| LazyPersistTestCase.initCacheManipulator(); |
| |
| conf.setLong(DFSConfigKeys.DFS_BALANCER_GETBLOCKS_MIN_BLOCK_SIZE_KEY, 1L); |
| } |
| |
| @AfterClass |
| public static void destroy() throws Exception { |
| if (kdc != null) { |
| kdc.stop(); |
| FileUtil.fullyDelete(baseDir); |
| KeyStoreTestUtil.cleanupSSLConfig(keystoresDir, sslConfDir); |
| } |
| } |
| |
| /* create a file with a length of <code>fileLen</code> */ |
| static void createFile(MiniDFSCluster cluster, Path filePath, long fileLen, |
| short replicationFactor, int nnIndex) |
| throws IOException, InterruptedException, TimeoutException { |
| FileSystem fs = cluster.getFileSystem(nnIndex); |
| DFSTestUtil.createFile(fs, filePath, fileLen, |
| replicationFactor, r.nextLong()); |
| DFSTestUtil.waitReplication(fs, filePath, replicationFactor); |
| } |
| |
| /* fill up a cluster with <code>numNodes</code> datanodes |
| * whose used space to be <code>size</code> |
| */ |
| private ExtendedBlock[] generateBlocks(Configuration conf, long size, |
| short numNodes) throws IOException, InterruptedException, TimeoutException { |
| cluster = new MiniDFSCluster.Builder(conf).numDataNodes(numNodes).build(); |
| try { |
| cluster.waitActive(); |
| client = NameNodeProxies.createProxy(conf, cluster.getFileSystem(0).getUri(), |
| ClientProtocol.class).getProxy(); |
| |
| short replicationFactor = (short)(numNodes-1); |
| long fileLen = size/replicationFactor; |
| createFile(cluster , filePath, fileLen, replicationFactor, 0); |
| |
| List<LocatedBlock> locatedBlocks = client. |
| getBlockLocations(fileName, 0, fileLen).getLocatedBlocks(); |
| |
| int numOfBlocks = locatedBlocks.size(); |
| ExtendedBlock[] blocks = new ExtendedBlock[numOfBlocks]; |
| for(int i=0; i<numOfBlocks; i++) { |
| ExtendedBlock b = locatedBlocks.get(i).getBlock(); |
| blocks[i] = new ExtendedBlock(b.getBlockPoolId(), b.getBlockId(), b |
| .getNumBytes(), b.getGenerationStamp()); |
| } |
| |
| return blocks; |
| } finally { |
| cluster.shutdown(); |
| } |
| } |
| |
| /* Distribute all blocks according to the given distribution */ |
| static Block[][] distributeBlocks(ExtendedBlock[] blocks, |
| short replicationFactor, final long[] distribution) { |
| // make a copy |
| long[] usedSpace = new long[distribution.length]; |
| System.arraycopy(distribution, 0, usedSpace, 0, distribution.length); |
| |
| List<List<Block>> blockReports = |
| new ArrayList<List<Block>>(usedSpace.length); |
| Block[][] results = new Block[usedSpace.length][]; |
| for(int i=0; i<usedSpace.length; i++) { |
| blockReports.add(new ArrayList<Block>()); |
| } |
| for(int i=0; i<blocks.length; i++) { |
| for(int j=0; j<replicationFactor; j++) { |
| boolean notChosen = true; |
| while(notChosen) { |
| int chosenIndex = r.nextInt(usedSpace.length); |
| if( usedSpace[chosenIndex]>0 ) { |
| notChosen = false; |
| blockReports.get(chosenIndex).add(blocks[i].getLocalBlock()); |
| usedSpace[chosenIndex] -= blocks[i].getNumBytes(); |
| } |
| } |
| } |
| } |
| for(int i=0; i<usedSpace.length; i++) { |
| List<Block> nodeBlockList = blockReports.get(i); |
| results[i] = nodeBlockList.toArray(new Block[nodeBlockList.size()]); |
| } |
| return results; |
| } |
| |
| static long sum(long[] x) { |
| long s = 0L; |
| for(long a : x) { |
| s += a; |
| } |
| return s; |
| } |
| |
| /* we first start a cluster and fill the cluster up to a certain size. |
| * then redistribute blocks according the required distribution. |
| * Afterwards a balancer is running to balance the cluster. |
| */ |
| private void testUnevenDistribution(Configuration conf, |
| long distribution[], long capacities[], String[] racks) throws Exception { |
| int numDatanodes = distribution.length; |
| if (capacities.length != numDatanodes || racks.length != numDatanodes) { |
| throw new IllegalArgumentException("Array length is not the same"); |
| } |
| |
| // calculate total space that need to be filled |
| final long totalUsedSpace = sum(distribution); |
| |
| // fill the cluster |
| ExtendedBlock[] blocks = generateBlocks(conf, totalUsedSpace, |
| (short) numDatanodes); |
| |
| // redistribute blocks |
| Block[][] blocksDN = distributeBlocks( |
| blocks, (short)(numDatanodes-1), distribution); |
| |
| // restart the cluster: do NOT format the cluster |
| conf.set(DFSConfigKeys.DFS_NAMENODE_SAFEMODE_THRESHOLD_PCT_KEY, "0.0f"); |
| cluster = new MiniDFSCluster.Builder(conf).numDataNodes(numDatanodes) |
| .format(false) |
| .racks(racks) |
| .simulatedCapacities(capacities) |
| .build(); |
| cluster.waitActive(); |
| client = NameNodeProxies.createProxy(conf, cluster.getFileSystem(0).getUri(), |
| ClientProtocol.class).getProxy(); |
| |
| for(int i = 0; i < blocksDN.length; i++) |
| cluster.injectBlocks(i, Arrays.asList(blocksDN[i]), null); |
| |
| final long totalCapacity = sum(capacities); |
| runBalancer(conf, totalUsedSpace, totalCapacity); |
| cluster.shutdown(); |
| } |
| |
| /** |
| * Wait until heartbeat gives expected results, within CAPACITY_ALLOWED_VARIANCE, |
| * summed over all nodes. Times out after TIMEOUT msec. |
| * @param expectedUsedSpace |
| * @param expectedTotalSpace |
| * @throws IOException - if getStats() fails |
| * @throws TimeoutException |
| */ |
| static void waitForHeartBeat(long expectedUsedSpace, |
| long expectedTotalSpace, ClientProtocol client, MiniDFSCluster cluster) |
| throws IOException, TimeoutException { |
| long timeout = TIMEOUT; |
| long failtime = (timeout <= 0L) ? Long.MAX_VALUE |
| : Time.monotonicNow() + timeout; |
| |
| while (true) { |
| long[] status = client.getStats(); |
| double totalSpaceVariance = Math.abs((double)status[0] - expectedTotalSpace) |
| / expectedTotalSpace; |
| double usedSpaceVariance = Math.abs((double)status[1] - expectedUsedSpace) |
| / expectedUsedSpace; |
| if (totalSpaceVariance < CAPACITY_ALLOWED_VARIANCE |
| && usedSpaceVariance < CAPACITY_ALLOWED_VARIANCE) |
| break; //done |
| |
| if (Time.monotonicNow() > failtime) { |
| throw new TimeoutException("Cluster failed to reached expected values of " |
| + "totalSpace (current: " + status[0] |
| + ", expected: " + expectedTotalSpace |
| + "), or usedSpace (current: " + status[1] |
| + ", expected: " + expectedUsedSpace |
| + "), in more than " + timeout + " msec."); |
| } |
| try { |
| Thread.sleep(100L); |
| } catch(InterruptedException ignored) { |
| } |
| } |
| } |
| |
| /** |
| * Wait until balanced: each datanode gives utilization within |
| * BALANCE_ALLOWED_VARIANCE of average |
| * @throws IOException |
| * @throws TimeoutException |
| */ |
| static void waitForBalancer(long totalUsedSpace, long totalCapacity, |
| ClientProtocol client, MiniDFSCluster cluster, BalancerParameters p) |
| throws IOException, TimeoutException { |
| waitForBalancer(totalUsedSpace, totalCapacity, client, cluster, p, 0); |
| } |
| |
| /** |
| * Make sure that balancer can't move pinned blocks. |
| * If specified favoredNodes when create file, blocks will be pinned use |
| * sticky bit. |
| * @throws Exception |
| */ |
| @Test(timeout=100000) |
| public void testBalancerWithPinnedBlocks() throws Exception { |
| // This test assumes stick-bit based block pin mechanism available only |
| // in Linux/Unix. It can be unblocked on Windows when HDFS-7759 is ready to |
| // provide a different mechanism for Windows. |
| assumeTrue(!Path.WINDOWS); |
| |
| final Configuration conf = new HdfsConfiguration(); |
| initConf(conf); |
| conf.setBoolean(DFS_DATANODE_BLOCK_PINNING_ENABLED, true); |
| |
| long[] capacities = new long[] { CAPACITY, CAPACITY }; |
| String[] hosts = {"host0", "host1"}; |
| String[] racks = { RACK0, RACK1 }; |
| int numOfDatanodes = capacities.length; |
| |
| cluster = new MiniDFSCluster.Builder(conf).numDataNodes(capacities.length) |
| .hosts(hosts).racks(racks).simulatedCapacities(capacities).build(); |
| |
| cluster.waitActive(); |
| client = NameNodeProxies.createProxy(conf, |
| cluster.getFileSystem(0).getUri(), ClientProtocol.class).getProxy(); |
| |
| // fill up the cluster to be 80% full |
| long totalCapacity = sum(capacities); |
| long totalUsedSpace = totalCapacity * 8 / 10; |
| InetSocketAddress[] favoredNodes = new InetSocketAddress[numOfDatanodes]; |
| for (int i = 0; i < favoredNodes.length; i++) { |
| // DFSClient will attempt reverse lookup. In case it resolves |
| // "127.0.0.1" to "localhost", we manually specify the hostname. |
| int port = cluster.getDataNodes().get(i).getXferAddress().getPort(); |
| favoredNodes[i] = new InetSocketAddress(hosts[i], port); |
| } |
| |
| DFSTestUtil.createFile(cluster.getFileSystem(0), filePath, false, 1024, |
| totalUsedSpace / numOfDatanodes, DEFAULT_BLOCK_SIZE, |
| (short) numOfDatanodes, 0, false, favoredNodes); |
| |
| // start up an empty node with the same capacity |
| cluster.startDataNodes(conf, 1, true, null, new String[] { RACK2 }, |
| new long[] { CAPACITY }); |
| |
| totalCapacity += CAPACITY; |
| |
| // run balancer and validate results |
| waitForHeartBeat(totalUsedSpace, totalCapacity, client, cluster); |
| |
| // start rebalancing |
| Collection<URI> namenodes = DFSUtil.getInternalNsRpcUris(conf); |
| int r = Balancer.run(namenodes, BalancerParameters.DEFAULT, conf); |
| assertEquals(ExitStatus.NO_MOVE_PROGRESS.getExitCode(), r); |
| } |
| |
| /** |
| * Verify balancer won't violate the default block placement policy. |
| * @throws Exception |
| */ |
| @Test(timeout=100000) |
| public void testRackPolicyAfterBalance() throws Exception { |
| final Configuration conf = new HdfsConfiguration(); |
| initConf(conf); |
| long[] capacities = new long[] { CAPACITY, CAPACITY }; |
| String[] hosts = {"host0", "host1"}; |
| String[] racks = { RACK0, RACK1 }; |
| runBalancerAndVerifyBlockPlacmentPolicy(conf, capacities, hosts, racks, |
| null, CAPACITY, "host2", RACK1, null); |
| } |
| |
| /** |
| * Verify balancer won't violate upgrade domain block placement policy. |
| * @throws Exception |
| */ |
| @Test(timeout=100000) |
| public void testUpgradeDomainPolicyAfterBalance() throws Exception { |
| final Configuration conf = new HdfsConfiguration(); |
| initConf(conf); |
| conf.setClass(DFSConfigKeys.DFS_BLOCK_REPLICATOR_CLASSNAME_KEY, |
| BlockPlacementPolicyWithUpgradeDomain.class, |
| BlockPlacementPolicy.class); |
| long[] capacities = new long[] { CAPACITY, CAPACITY, CAPACITY }; |
| String[] hosts = {"host0", "host1", "host2"}; |
| String[] racks = { RACK0, RACK1, RACK1 }; |
| String[] UDs = { "ud0", "ud1", "ud2" }; |
| runBalancerAndVerifyBlockPlacmentPolicy(conf, capacities, hosts, racks, |
| UDs, CAPACITY, "host3", RACK2, "ud2"); |
| } |
| |
| private void runBalancerAndVerifyBlockPlacmentPolicy(Configuration conf, |
| long[] capacities, String[] hosts, String[] racks, String[] UDs, |
| long newCapacity, String newHost, String newRack, String newUD) |
| throws Exception { |
| int numOfDatanodes = capacities.length; |
| |
| cluster = new MiniDFSCluster.Builder(conf).numDataNodes(capacities.length) |
| .hosts(hosts).racks(racks).simulatedCapacities(capacities).build(); |
| DatanodeManager dm = cluster.getNamesystem().getBlockManager(). |
| getDatanodeManager(); |
| if (UDs != null) { |
| for(int i = 0; i < UDs.length; i++) { |
| DatanodeID datanodeId = cluster.getDataNodes().get(i).getDatanodeId(); |
| dm.getDatanode(datanodeId).setUpgradeDomain(UDs[i]); |
| } |
| } |
| |
| try { |
| cluster.waitActive(); |
| client = NameNodeProxies.createProxy(conf, |
| cluster.getFileSystem(0).getUri(), ClientProtocol.class).getProxy(); |
| |
| // fill up the cluster to be 80% full |
| long totalCapacity = sum(capacities); |
| long totalUsedSpace = totalCapacity * 8 / 10; |
| |
| final long fileSize = totalUsedSpace / numOfDatanodes; |
| DFSTestUtil.createFile(cluster.getFileSystem(0), filePath, false, 1024, |
| fileSize, DEFAULT_BLOCK_SIZE, (short) numOfDatanodes, 0, false); |
| |
| // start up an empty node with the same capacity on the same rack as the |
| // pinned host. |
| cluster.startDataNodes(conf, 1, true, null, new String[] { newRack }, |
| new String[] { newHost }, new long[] { newCapacity }); |
| if (newUD != null) { |
| DatanodeID newId = cluster.getDataNodes().get( |
| numOfDatanodes).getDatanodeId(); |
| dm.getDatanode(newId).setUpgradeDomain(newUD); |
| } |
| totalCapacity += newCapacity; |
| |
| // run balancer and validate results |
| waitForHeartBeat(totalUsedSpace, totalCapacity, client, cluster); |
| |
| // start rebalancing |
| Collection<URI> namenodes = DFSUtil.getInternalNsRpcUris(conf); |
| Balancer.run(namenodes, BalancerParameters.DEFAULT, conf); |
| BlockPlacementPolicy placementPolicy = |
| cluster.getNamesystem().getBlockManager().getBlockPlacementPolicy(); |
| List<LocatedBlock> locatedBlocks = client. |
| getBlockLocations(fileName, 0, fileSize).getLocatedBlocks(); |
| for (LocatedBlock locatedBlock : locatedBlocks) { |
| BlockPlacementStatus status = placementPolicy.verifyBlockPlacement( |
| locatedBlock.getLocations(), numOfDatanodes); |
| assertTrue(status.isPlacementPolicySatisfied()); |
| } |
| } finally { |
| cluster.shutdown(); |
| } |
| } |
| |
| /** |
| * Wait until balanced: each datanode gives utilization within |
| * BALANCE_ALLOWED_VARIANCE of average |
| * @throws IOException |
| * @throws TimeoutException |
| */ |
| static void waitForBalancer(long totalUsedSpace, long totalCapacity, |
| ClientProtocol client, MiniDFSCluster cluster, BalancerParameters p, |
| int expectedExcludedNodes) throws IOException, TimeoutException { |
| long timeout = TIMEOUT; |
| long failtime = (timeout <= 0L) ? Long.MAX_VALUE |
| : Time.monotonicNow() + timeout; |
| if (!p.getIncludedNodes().isEmpty()) { |
| totalCapacity = p.getIncludedNodes().size() * CAPACITY; |
| } |
| if (!p.getExcludedNodes().isEmpty()) { |
| totalCapacity -= p.getExcludedNodes().size() * CAPACITY; |
| } |
| final double avgUtilization = ((double)totalUsedSpace) / totalCapacity; |
| boolean balanced; |
| do { |
| DatanodeInfo[] datanodeReport = |
| client.getDatanodeReport(DatanodeReportType.ALL); |
| assertEquals(datanodeReport.length, cluster.getDataNodes().size()); |
| balanced = true; |
| int actualExcludedNodeCount = 0; |
| for (DatanodeInfo datanode : datanodeReport) { |
| double nodeUtilization = ((double)datanode.getDfsUsed()) |
| / datanode.getCapacity(); |
| if (Dispatcher.Util.isExcluded(p.getExcludedNodes(), datanode)) { |
| assertTrue(nodeUtilization == 0); |
| actualExcludedNodeCount++; |
| continue; |
| } |
| if (!Dispatcher.Util.isIncluded(p.getIncludedNodes(), datanode)) { |
| assertTrue(nodeUtilization == 0); |
| actualExcludedNodeCount++; |
| continue; |
| } |
| if (Math.abs(avgUtilization - nodeUtilization) > BALANCE_ALLOWED_VARIANCE) { |
| balanced = false; |
| if (Time.monotonicNow() > failtime) { |
| throw new TimeoutException( |
| "Rebalancing expected avg utilization to become " |
| + avgUtilization + ", but on datanode " + datanode |
| + " it remains at " + nodeUtilization |
| + " after more than " + TIMEOUT + " msec."); |
| } |
| try { |
| Thread.sleep(100); |
| } catch (InterruptedException ignored) { |
| } |
| break; |
| } |
| } |
| assertEquals(expectedExcludedNodes,actualExcludedNodeCount); |
| } while (!balanced); |
| } |
| |
| String long2String(long[] array) { |
| if (array.length == 0) { |
| return "<empty>"; |
| } |
| StringBuilder b = new StringBuilder("[").append(array[0]); |
| for(int i = 1; i < array.length; i++) { |
| b.append(", ").append(array[i]); |
| } |
| return b.append("]").toString(); |
| } |
| /** |
| * Class which contains information about the |
| * new nodes to be added to the cluster for balancing. |
| */ |
| static abstract class NewNodeInfo { |
| |
| Set<String> nodesToBeExcluded = new HashSet<String>(); |
| Set<String> nodesToBeIncluded = new HashSet<String>(); |
| |
| abstract String[] getNames(); |
| abstract int getNumberofNewNodes(); |
| abstract int getNumberofIncludeNodes(); |
| abstract int getNumberofExcludeNodes(); |
| |
| public Set<String> getNodesToBeIncluded() { |
| return nodesToBeIncluded; |
| } |
| public Set<String> getNodesToBeExcluded() { |
| return nodesToBeExcluded; |
| } |
| } |
| |
| /** |
| * The host names of new nodes are specified |
| */ |
| static class HostNameBasedNodes extends NewNodeInfo { |
| String[] hostnames; |
| |
| public HostNameBasedNodes(String[] hostnames, |
| Set<String> nodesToBeExcluded, Set<String> nodesToBeIncluded) { |
| this.hostnames = hostnames; |
| this.nodesToBeExcluded = nodesToBeExcluded; |
| this.nodesToBeIncluded = nodesToBeIncluded; |
| } |
| |
| @Override |
| String[] getNames() { |
| return hostnames; |
| } |
| @Override |
| int getNumberofNewNodes() { |
| return hostnames.length; |
| } |
| @Override |
| int getNumberofIncludeNodes() { |
| return nodesToBeIncluded.size(); |
| } |
| @Override |
| int getNumberofExcludeNodes() { |
| return nodesToBeExcluded.size(); |
| } |
| } |
| |
| /** |
| * The number of data nodes to be started are specified. |
| * The data nodes will have same host name, but different port numbers. |
| * |
| */ |
| static class PortNumberBasedNodes extends NewNodeInfo { |
| int newNodes; |
| int excludeNodes; |
| int includeNodes; |
| |
| public PortNumberBasedNodes(int newNodes, int excludeNodes, int includeNodes) { |
| this.newNodes = newNodes; |
| this.excludeNodes = excludeNodes; |
| this.includeNodes = includeNodes; |
| } |
| |
| @Override |
| String[] getNames() { |
| return null; |
| } |
| @Override |
| int getNumberofNewNodes() { |
| return newNodes; |
| } |
| @Override |
| int getNumberofIncludeNodes() { |
| return includeNodes; |
| } |
| @Override |
| int getNumberofExcludeNodes() { |
| return excludeNodes; |
| } |
| } |
| |
| private void doTest(Configuration conf, long[] capacities, String[] racks, |
| long newCapacity, String newRack, boolean useTool) throws Exception { |
| doTest(conf, capacities, racks, newCapacity, newRack, null, useTool, false); |
| } |
| |
| private void doTest(Configuration conf, long[] capacities, String[] racks, |
| long newCapacity, String newRack, NewNodeInfo nodes, |
| boolean useTool, boolean useFile) throws Exception { |
| doTest(conf, capacities, racks, newCapacity, newRack, nodes, |
| useTool, useFile, false); |
| } |
| |
| /** This test start a cluster with specified number of nodes, |
| * and fills it to be 30% full (with a single file replicated identically |
| * to all datanodes); |
| * It then adds one new empty node and starts balancing. |
| * |
| * @param conf - configuration |
| * @param capacities - array of capacities of original nodes in cluster |
| * @param racks - array of racks for original nodes in cluster |
| * @param newCapacity - new node's capacity |
| * @param newRack - new node's rack |
| * @param nodes - information about new nodes to be started. |
| * @param useTool - if true run test via Cli with command-line argument |
| * parsing, etc. Otherwise invoke balancer API directly. |
| * @param useFile - if true, the hosts to included or excluded will be stored in a |
| * file and then later read from the file. |
| * @param useNamesystemSpy - spy on FSNamesystem if true |
| * @throws Exception |
| */ |
| private void doTest(Configuration conf, long[] capacities, |
| String[] racks, long newCapacity, String newRack, NewNodeInfo nodes, |
| boolean useTool, boolean useFile, |
| boolean useNamesystemSpy) throws Exception { |
| LOG.info("capacities = " + long2String(capacities)); |
| LOG.info("racks = " + Arrays.asList(racks)); |
| LOG.info("newCapacity= " + newCapacity); |
| LOG.info("newRack = " + newRack); |
| LOG.info("useTool = " + useTool); |
| assertEquals(capacities.length, racks.length); |
| int numOfDatanodes = capacities.length; |
| |
| try { |
| cluster = new MiniDFSCluster.Builder(conf) |
| .numDataNodes(0) |
| .build(); |
| cluster.getConfiguration(0).setInt(DFSConfigKeys.DFS_REPLICATION_KEY, |
| DFSConfigKeys.DFS_REPLICATION_DEFAULT); |
| conf.setInt(DFSConfigKeys.DFS_REPLICATION_KEY, |
| DFSConfigKeys.DFS_REPLICATION_DEFAULT); |
| if(useNamesystemSpy) { |
| LOG.info("Using Spy Namesystem"); |
| spyFSNamesystem(cluster.getNameNode()); |
| } |
| cluster.startDataNodes(conf, numOfDatanodes, true, |
| StartupOption.REGULAR, racks, null, capacities, false); |
| cluster.waitClusterUp(); |
| cluster.waitActive(); |
| client = NameNodeProxies.createProxy(conf, |
| cluster.getFileSystem(0).getUri(), ClientProtocol.class).getProxy(); |
| |
| long totalCapacity = sum(capacities); |
| |
| // fill up the cluster to be 30% full |
| long totalUsedSpace = totalCapacity*3/10; |
| createFile(cluster, filePath, totalUsedSpace / numOfDatanodes, |
| (short) numOfDatanodes, 0); |
| |
| if (nodes == null) { // there is no specification of new nodes. |
| // start up an empty node with the same capacity and on the same rack |
| cluster.startDataNodes(conf, 1, true, null, |
| new String[]{newRack}, null,new long[]{newCapacity}); |
| totalCapacity += newCapacity; |
| } else { |
| //if running a test with "include list", include original nodes as well |
| if (nodes.getNumberofIncludeNodes()>0) { |
| for (DataNode dn: cluster.getDataNodes()) |
| nodes.getNodesToBeIncluded().add(dn.getDatanodeId().getHostName()); |
| } |
| String[] newRacks = new String[nodes.getNumberofNewNodes()]; |
| long[] newCapacities = new long[nodes.getNumberofNewNodes()]; |
| for (int i=0; i < nodes.getNumberofNewNodes(); i++) { |
| newRacks[i] = newRack; |
| newCapacities[i] = newCapacity; |
| } |
| // if host names are specified for the new nodes to be created. |
| if (nodes.getNames() != null) { |
| cluster.startDataNodes(conf, nodes.getNumberofNewNodes(), true, null, |
| newRacks, nodes.getNames(), newCapacities); |
| totalCapacity += newCapacity*nodes.getNumberofNewNodes(); |
| } else { // host names are not specified |
| cluster.startDataNodes(conf, nodes.getNumberofNewNodes(), true, null, |
| newRacks, null, newCapacities); |
| totalCapacity += newCapacity*nodes.getNumberofNewNodes(); |
| //populate the include nodes |
| if (nodes.getNumberofIncludeNodes() > 0) { |
| int totalNodes = cluster.getDataNodes().size(); |
| for (int i=0; i < nodes.getNumberofIncludeNodes(); i++) { |
| nodes.getNodesToBeIncluded().add (cluster.getDataNodes().get( |
| totalNodes-1-i).getDatanodeId().getXferAddr()); |
| } |
| } |
| //polulate the exclude nodes |
| if (nodes.getNumberofExcludeNodes() > 0) { |
| int totalNodes = cluster.getDataNodes().size(); |
| for (int i=0; i < nodes.getNumberofExcludeNodes(); i++) { |
| nodes.getNodesToBeExcluded().add (cluster.getDataNodes().get( |
| totalNodes-1-i).getDatanodeId().getXferAddr()); |
| } |
| } |
| } |
| } |
| // run balancer and validate results |
| BalancerParameters.Builder pBuilder = |
| new BalancerParameters.Builder(); |
| if (nodes != null) { |
| pBuilder.setExcludedNodes(nodes.getNodesToBeExcluded()); |
| pBuilder.setIncludedNodes(nodes.getNodesToBeIncluded()); |
| pBuilder.setRunDuringUpgrade(false); |
| } |
| BalancerParameters p = pBuilder.build(); |
| |
| int expectedExcludedNodes = 0; |
| if (nodes != null) { |
| if (!nodes.getNodesToBeExcluded().isEmpty()) { |
| expectedExcludedNodes = nodes.getNodesToBeExcluded().size(); |
| } else if (!nodes.getNodesToBeIncluded().isEmpty()) { |
| expectedExcludedNodes = |
| cluster.getDataNodes().size() - nodes.getNodesToBeIncluded().size(); |
| } |
| } |
| |
| // run balancer and validate results |
| if (useTool) { |
| runBalancerCli(conf, totalUsedSpace, totalCapacity, p, useFile, expectedExcludedNodes); |
| } else { |
| runBalancer(conf, totalUsedSpace, totalCapacity, p, expectedExcludedNodes); |
| } |
| } finally { |
| if(cluster != null) { |
| cluster.shutdown(); |
| } |
| } |
| } |
| |
| private void runBalancer(Configuration conf, long totalUsedSpace, |
| long totalCapacity) throws Exception { |
| runBalancer(conf, totalUsedSpace, totalCapacity, |
| BalancerParameters.DEFAULT, 0); |
| } |
| |
| private void runBalancer(Configuration conf, long totalUsedSpace, |
| long totalCapacity, BalancerParameters p, int excludedNodes) |
| throws Exception { |
| waitForHeartBeat(totalUsedSpace, totalCapacity, client, cluster); |
| |
| // start rebalancing |
| Collection<URI> namenodes = DFSUtil.getInternalNsRpcUris(conf); |
| final int r = runBalancer(namenodes, p, conf); |
| if (conf.getInt(DFSConfigKeys.DFS_DATANODE_BALANCE_MAX_NUM_CONCURRENT_MOVES_KEY, |
| DFSConfigKeys.DFS_DATANODE_BALANCE_MAX_NUM_CONCURRENT_MOVES_DEFAULT) ==0) { |
| assertEquals(ExitStatus.NO_MOVE_PROGRESS.getExitCode(), r); |
| return; |
| } else { |
| assertEquals(ExitStatus.SUCCESS.getExitCode(), r); |
| } |
| waitForHeartBeat(totalUsedSpace, totalCapacity, client, cluster); |
| LOG.info(" ."); |
| waitForBalancer(totalUsedSpace, totalCapacity, client, cluster, p, excludedNodes); |
| } |
| |
| private static int runBalancer(Collection<URI> namenodes, |
| final BalancerParameters p, |
| Configuration conf) throws IOException, InterruptedException { |
| final long sleeptime = |
| conf.getLong(DFSConfigKeys.DFS_HEARTBEAT_INTERVAL_KEY, |
| DFSConfigKeys.DFS_HEARTBEAT_INTERVAL_DEFAULT) * 2000 + |
| conf.getLong(DFSConfigKeys.DFS_NAMENODE_REPLICATION_INTERVAL_KEY, |
| DFSConfigKeys.DFS_NAMENODE_REPLICATION_INTERVAL_DEFAULT) * 1000; |
| LOG.info("namenodes = " + namenodes); |
| LOG.info("parameters = " + p); |
| LOG.info("Print stack trace", new Throwable()); |
| |
| System.out.println("Time Stamp Iteration# Bytes Already Moved Bytes Left To Move Bytes Being Moved"); |
| |
| List<NameNodeConnector> connectors = Collections.emptyList(); |
| try { |
| connectors = NameNodeConnector.newNameNodeConnectors(namenodes, |
| Balancer.class.getSimpleName(), Balancer.BALANCER_ID_PATH, conf, |
| BalancerParameters.DEFAULT.getMaxIdleIteration()); |
| |
| boolean done = false; |
| for(int iteration = 0; !done; iteration++) { |
| done = true; |
| Collections.shuffle(connectors); |
| for(NameNodeConnector nnc : connectors) { |
| final Balancer b = new Balancer(nnc, p, conf); |
| final Result r = b.runOneIteration(); |
| r.print(iteration, System.out); |
| |
| // clean all lists |
| b.resetData(conf); |
| if (r.exitStatus == ExitStatus.IN_PROGRESS) { |
| done = false; |
| } else if (r.exitStatus != ExitStatus.SUCCESS) { |
| //must be an error statue, return. |
| return r.exitStatus.getExitCode(); |
| } else { |
| if (iteration > 0) { |
| assertTrue(r.bytesAlreadyMoved > 0); |
| } |
| } |
| } |
| |
| if (!done) { |
| Thread.sleep(sleeptime); |
| } |
| } |
| } finally { |
| for(NameNodeConnector nnc : connectors) { |
| IOUtils.cleanup(LOG, nnc); |
| } |
| } |
| return ExitStatus.SUCCESS.getExitCode(); |
| } |
| |
| private void runBalancerCli(Configuration conf, long totalUsedSpace, |
| long totalCapacity, BalancerParameters p, boolean useFile, |
| int expectedExcludedNodes) throws Exception { |
| waitForHeartBeat(totalUsedSpace, totalCapacity, client, cluster); |
| List <String> args = new ArrayList<String>(); |
| args.add("-policy"); |
| args.add("datanode"); |
| |
| File excludeHostsFile = null; |
| if (!p.getExcludedNodes().isEmpty()) { |
| args.add("-exclude"); |
| if (useFile) { |
| excludeHostsFile = GenericTestUtils.getTestDir("exclude-hosts-file"); |
| PrintWriter pw = new PrintWriter(excludeHostsFile); |
| for (String host : p.getExcludedNodes()) { |
| pw.write( host + "\n"); |
| } |
| pw.close(); |
| args.add("-f"); |
| args.add(excludeHostsFile.getAbsolutePath()); |
| } else { |
| args.add(StringUtils.join(p.getExcludedNodes(), ',')); |
| } |
| } |
| |
| File includeHostsFile = null; |
| if (!p.getIncludedNodes().isEmpty()) { |
| args.add("-include"); |
| if (useFile) { |
| includeHostsFile = GenericTestUtils.getTestDir("include-hosts-file"); |
| PrintWriter pw = new PrintWriter(includeHostsFile); |
| for (String host : p.getIncludedNodes()) { |
| pw.write( host + "\n"); |
| } |
| pw.close(); |
| args.add("-f"); |
| args.add(includeHostsFile.getAbsolutePath()); |
| } else { |
| args.add(StringUtils.join(p.getIncludedNodes(), ',')); |
| } |
| } |
| |
| final Tool tool = new Cli(); |
| tool.setConf(conf); |
| final int r = tool.run(args.toArray(new String[0])); // start rebalancing |
| |
| assertEquals("Tools should exit 0 on success", 0, r); |
| waitForHeartBeat(totalUsedSpace, totalCapacity, client, cluster); |
| LOG.info("Rebalancing with default ctor."); |
| waitForBalancer(totalUsedSpace, totalCapacity, client, cluster, p, expectedExcludedNodes); |
| |
| if (excludeHostsFile != null && excludeHostsFile.exists()) { |
| excludeHostsFile.delete(); |
| } |
| if (includeHostsFile != null && includeHostsFile.exists()) { |
| includeHostsFile.delete(); |
| } |
| } |
| |
| /** one-node cluster test*/ |
| private void oneNodeTest(Configuration conf, boolean useTool) throws Exception { |
| // add an empty node with half of the CAPACITY & the same rack |
| doTest(conf, new long[]{CAPACITY}, new String[]{RACK0}, CAPACITY/2, |
| RACK0, useTool); |
| } |
| |
| /** two-node cluster test */ |
| private void twoNodeTest(Configuration conf) throws Exception { |
| doTest(conf, new long[]{CAPACITY, CAPACITY}, new String[]{RACK0, RACK1}, |
| CAPACITY, RACK2, false); |
| } |
| |
| /** test using a user-supplied conf */ |
| public void integrationTest(Configuration conf) throws Exception { |
| initConf(conf); |
| oneNodeTest(conf, false); |
| } |
| |
| @Test(timeout = 100000) |
| public void testUnknownDatanodeSimple() throws Exception { |
| Configuration conf = new HdfsConfiguration(); |
| initConf(conf); |
| testUnknownDatanode(conf); |
| } |
| |
| /* we first start a cluster and fill the cluster up to a certain size. |
| * then redistribute blocks according the required distribution. |
| * Then we start an empty datanode. |
| * Afterwards a balancer is run to balance the cluster. |
| * A partially filled datanode is excluded during balancing. |
| * This triggers a situation where one of the block's location is unknown. |
| */ |
| private void testUnknownDatanode(Configuration conf) |
| throws IOException, InterruptedException, TimeoutException { |
| long distribution[] = new long[] {50*CAPACITY/100, 70*CAPACITY/100, 0*CAPACITY/100}; |
| long capacities[] = new long[]{CAPACITY, CAPACITY, CAPACITY}; |
| String racks[] = new String[] {RACK0, RACK1, RACK1}; |
| |
| int numDatanodes = distribution.length; |
| if (capacities.length != numDatanodes || racks.length != numDatanodes) { |
| throw new IllegalArgumentException("Array length is not the same"); |
| } |
| |
| // calculate total space that need to be filled |
| final long totalUsedSpace = sum(distribution); |
| |
| // fill the cluster |
| ExtendedBlock[] blocks = generateBlocks(conf, totalUsedSpace, |
| (short) numDatanodes); |
| |
| // redistribute blocks |
| Block[][] blocksDN = distributeBlocks( |
| blocks, (short)(numDatanodes-1), distribution); |
| |
| // restart the cluster: do NOT format the cluster |
| conf.set(DFSConfigKeys.DFS_NAMENODE_SAFEMODE_THRESHOLD_PCT_KEY, "0.0f"); |
| cluster = new MiniDFSCluster.Builder(conf).numDataNodes(numDatanodes) |
| .format(false) |
| .racks(racks) |
| .simulatedCapacities(capacities) |
| .build(); |
| cluster.waitActive(); |
| client = NameNodeProxies.createProxy(conf, cluster.getFileSystem(0).getUri(), |
| ClientProtocol.class).getProxy(); |
| |
| for(int i = 0; i < 3; i++) { |
| cluster.injectBlocks(i, Arrays.asList(blocksDN[i]), null); |
| } |
| |
| cluster.startDataNodes(conf, 1, true, null, |
| new String[]{RACK0}, null,new long[]{CAPACITY}); |
| cluster.triggerHeartbeats(); |
| |
| Collection<URI> namenodes = DFSUtil.getInternalNsRpcUris(conf); |
| Set<String> datanodes = new HashSet<String>(); |
| datanodes.add(cluster.getDataNodes().get(0).getDatanodeId().getHostName()); |
| BalancerParameters.Builder pBuilder = |
| new BalancerParameters.Builder(); |
| pBuilder.setExcludedNodes(datanodes); |
| pBuilder.setRunDuringUpgrade(false); |
| final int r = Balancer.run(namenodes, pBuilder.build(), conf); |
| assertEquals(ExitStatus.SUCCESS.getExitCode(), r); |
| } |
| |
| /** |
| * Test parse method in Balancer#Cli class with threshold value out of |
| * boundaries. |
| */ |
| @Test(timeout=100000) |
| public void testBalancerCliParseWithThresholdOutOfBoundaries() { |
| String parameters[] = new String[] { "-threshold", "0" }; |
| String reason = "IllegalArgumentException is expected when threshold value" |
| + " is out of boundary."; |
| try { |
| Balancer.Cli.parse(parameters); |
| fail(reason); |
| } catch (IllegalArgumentException e) { |
| assertEquals("Number out of range: threshold = 0.0", e.getMessage()); |
| } |
| parameters = new String[] { "-threshold", "101" }; |
| try { |
| Balancer.Cli.parse(parameters); |
| fail(reason); |
| } catch (IllegalArgumentException e) { |
| assertEquals("Number out of range: threshold = 101.0", e.getMessage()); |
| } |
| } |
| |
| /** Test a cluster with even distribution, |
| * then a new empty node is added to the cluster*/ |
| @Test(timeout=100000) |
| public void testBalancer0() throws Exception { |
| testBalancer0Internal(new HdfsConfiguration()); |
| } |
| |
| void testBalancer0Internal(Configuration conf) throws Exception { |
| initConf(conf); |
| oneNodeTest(conf, false); |
| twoNodeTest(conf); |
| } |
| |
| /** Test unevenly distributed cluster */ |
| @Test(timeout=100000) |
| public void testBalancer1() throws Exception { |
| testBalancer1Internal(new HdfsConfiguration()); |
| } |
| |
| void testBalancer1Internal(Configuration conf) throws Exception { |
| initConf(conf); |
| testUnevenDistribution(conf, |
| new long[] {50*CAPACITY/100, 10*CAPACITY/100}, |
| new long[]{CAPACITY, CAPACITY}, |
| new String[] {RACK0, RACK1}); |
| } |
| |
| @Test(expected=HadoopIllegalArgumentException.class) |
| public void testBalancerWithZeroThreadsForMove() throws Exception { |
| Configuration conf = new HdfsConfiguration(); |
| conf.setInt(DFSConfigKeys.DFS_DATANODE_BALANCE_MAX_NUM_CONCURRENT_MOVES_KEY, 0); |
| testBalancer1Internal (conf); |
| } |
| |
| @Test(timeout=100000) |
| public void testBalancerWithNonZeroThreadsForMove() throws Exception { |
| Configuration conf = new HdfsConfiguration(); |
| conf.setInt(DFSConfigKeys.DFS_DATANODE_BALANCE_MAX_NUM_CONCURRENT_MOVES_KEY, 8); |
| testBalancer1Internal (conf); |
| } |
| |
| @Test(timeout=100000) |
| public void testBalancer2() throws Exception { |
| testBalancer2Internal(new HdfsConfiguration()); |
| } |
| |
| void testBalancer2Internal(Configuration conf) throws Exception { |
| initConf(conf); |
| testBalancerDefaultConstructor(conf, new long[] { CAPACITY, CAPACITY }, |
| new String[] { RACK0, RACK1 }, CAPACITY, RACK2); |
| } |
| |
| private void testBalancerDefaultConstructor(Configuration conf, |
| long[] capacities, String[] racks, long newCapacity, String newRack) |
| throws Exception { |
| int numOfDatanodes = capacities.length; |
| assertEquals(numOfDatanodes, racks.length); |
| cluster = new MiniDFSCluster.Builder(conf) |
| .numDataNodes(capacities.length) |
| .racks(racks) |
| .simulatedCapacities(capacities) |
| .build(); |
| try { |
| cluster.waitActive(); |
| client = NameNodeProxies.createProxy(conf, cluster.getFileSystem(0).getUri(), |
| ClientProtocol.class).getProxy(); |
| |
| long totalCapacity = sum(capacities); |
| |
| // fill up the cluster to be 30% full |
| long totalUsedSpace = totalCapacity * 3 / 10; |
| createFile(cluster, filePath, totalUsedSpace / numOfDatanodes, |
| (short) numOfDatanodes, 0); |
| // start up an empty node with the same capacity and on the same rack |
| cluster.startDataNodes(conf, 1, true, null, new String[] { newRack }, |
| new long[] { newCapacity }); |
| |
| totalCapacity += newCapacity; |
| |
| // run balancer and validate results |
| runBalancer(conf, totalUsedSpace, totalCapacity); |
| } finally { |
| cluster.shutdown(); |
| } |
| } |
| |
| /** |
| * Verify balancer exits 0 on success. |
| */ |
| @Test(timeout=100000) |
| public void testExitZeroOnSuccess() throws Exception { |
| final Configuration conf = new HdfsConfiguration(); |
| |
| initConf(conf); |
| |
| oneNodeTest(conf, true); |
| } |
| |
| /** |
| * Test parse method in Balancer#Cli class with wrong number of params |
| */ |
| |
| @Test |
| public void testBalancerCliParseWithWrongParams() { |
| String parameters[] = new String[] { "-threshold" }; |
| String reason = |
| "IllegalArgumentException is expected when value is not specified"; |
| try { |
| Balancer.Cli.parse(parameters); |
| fail(reason); |
| } catch (IllegalArgumentException e) { |
| |
| } |
| parameters = new String[] { "-policy" }; |
| try { |
| Balancer.Cli.parse(parameters); |
| fail(reason); |
| } catch (IllegalArgumentException e) { |
| |
| } |
| parameters = new String[] {"-threshold", "1", "-policy"}; |
| try { |
| Balancer.Cli.parse(parameters); |
| fail(reason); |
| } catch (IllegalArgumentException e) { |
| |
| } |
| parameters = new String[] {"-threshold", "1", "-include"}; |
| try { |
| Balancer.Cli.parse(parameters); |
| fail(reason); |
| } catch (IllegalArgumentException e) { |
| |
| } |
| parameters = new String[] {"-threshold", "1", "-exclude"}; |
| try { |
| Balancer.Cli.parse(parameters); |
| fail(reason); |
| } catch (IllegalArgumentException e) { |
| |
| } |
| parameters = new String[] {"-include", "-f"}; |
| try { |
| Balancer.Cli.parse(parameters); |
| fail(reason); |
| } catch (IllegalArgumentException e) { |
| |
| } |
| parameters = new String[] {"-exclude", "-f"}; |
| try { |
| Balancer.Cli.parse(parameters); |
| fail(reason); |
| } catch (IllegalArgumentException e) { |
| |
| } |
| |
| parameters = new String[] {"-include", "testnode1", "-exclude", "testnode2"}; |
| try { |
| Balancer.Cli.parse(parameters); |
| fail("IllegalArgumentException is expected when both -exclude and -include are specified"); |
| } catch (IllegalArgumentException e) { |
| |
| } |
| |
| parameters = new String[] { "-blockpools" }; |
| try { |
| Balancer.Cli.parse(parameters); |
| fail("IllegalArgumentException is expected when a value " |
| + "is not specified for the blockpool flag"); |
| } catch (IllegalArgumentException e) { |
| |
| } |
| |
| parameters = new String[] {"-source"}; |
| try { |
| Balancer.Cli.parse(parameters); |
| fail(reason + " for -source parameter"); |
| } catch (IllegalArgumentException ignored) { |
| // expected |
| } |
| } |
| |
| @Test |
| public void testBalancerCliParseBlockpools() { |
| String[] parameters = new String[] { "-blockpools", "bp-1,bp-2,bp-3" }; |
| BalancerParameters p = Balancer.Cli.parse(parameters); |
| assertEquals(3, p.getBlockPools().size()); |
| |
| parameters = new String[] { "-blockpools", "bp-1" }; |
| p = Balancer.Cli.parse(parameters); |
| assertEquals(1, p.getBlockPools().size()); |
| |
| parameters = new String[] { "-blockpools", "bp-1,,bp-2" }; |
| p = Balancer.Cli.parse(parameters); |
| assertEquals(3, p.getBlockPools().size()); |
| |
| parameters = new String[] { "-blockpools", "bp-1," }; |
| p = Balancer.Cli.parse(parameters); |
| assertEquals(1, p.getBlockPools().size()); |
| } |
| |
| |
| /** |
| * Test a cluster with even distribution, |
| * then three nodes are added to the cluster, |
| * runs balancer with two of the nodes in the exclude list |
| */ |
| @Test(timeout=100000) |
| public void testBalancerWithExcludeList() throws Exception { |
| final Configuration conf = new HdfsConfiguration(); |
| initConf(conf); |
| Set<String> excludeHosts = new HashSet<String>(); |
| excludeHosts.add( "datanodeY"); |
| excludeHosts.add( "datanodeZ"); |
| doTest(conf, new long[]{CAPACITY, CAPACITY}, new String[]{RACK0, RACK1}, CAPACITY, RACK2, |
| new HostNameBasedNodes(new String[] {"datanodeX", "datanodeY", "datanodeZ"}, |
| excludeHosts, BalancerParameters.DEFAULT.getIncludedNodes()), |
| false, false); |
| } |
| |
| /** |
| * Test a cluster with even distribution, |
| * then three nodes are added to the cluster, |
| * runs balancer with two of the nodes in the exclude list |
| */ |
| @Test(timeout=100000) |
| public void testBalancerWithExcludeListWithPorts() throws Exception { |
| final Configuration conf = new HdfsConfiguration(); |
| initConf(conf); |
| doTest(conf, new long[]{CAPACITY, CAPACITY}, new String[]{RACK0, RACK1}, |
| CAPACITY, RACK2, new PortNumberBasedNodes(3, 2, 0), false, false); |
| } |
| |
| /** |
| * Test a cluster with even distribution, |
| * then three nodes are added to the cluster, |
| * runs balancer with two of the nodes in the exclude list |
| */ |
| @Test(timeout=100000) |
| public void testBalancerCliWithExcludeList() throws Exception { |
| final Configuration conf = new HdfsConfiguration(); |
| initConf(conf); |
| Set<String> excludeHosts = new HashSet<String>(); |
| excludeHosts.add( "datanodeY"); |
| excludeHosts.add( "datanodeZ"); |
| doTest(conf, new long[] { CAPACITY, CAPACITY }, |
| new String[] { RACK0, RACK1 }, CAPACITY, RACK2, new HostNameBasedNodes( |
| new String[] { "datanodeX", "datanodeY", "datanodeZ" }, |
| excludeHosts, BalancerParameters.DEFAULT.getIncludedNodes()), true, |
| false); |
| } |
| |
| /** |
| * Test a cluster with even distribution, |
| * then three nodes are added to the cluster, |
| * runs balancer with two of the nodes in the exclude list |
| */ |
| @Test(timeout=100000) |
| public void testBalancerCliWithExcludeListWithPorts() throws Exception { |
| final Configuration conf = new HdfsConfiguration(); |
| initConf(conf); |
| doTest(conf, new long[]{CAPACITY, CAPACITY}, new String[]{RACK0, RACK1}, |
| CAPACITY, RACK2, new PortNumberBasedNodes(3, 2, 0), true, false); |
| } |
| |
| /** |
| * Test a cluster with even distribution, |
| * then three nodes are added to the cluster, |
| * runs balancer with two of the nodes in the exclude list in a file |
| */ |
| @Test(timeout=100000) |
| public void testBalancerCliWithExcludeListInAFile() throws Exception { |
| final Configuration conf = new HdfsConfiguration(); |
| initConf(conf); |
| Set<String> excludeHosts = new HashSet<String>(); |
| excludeHosts.add( "datanodeY"); |
| excludeHosts.add( "datanodeZ"); |
| doTest(conf, new long[]{CAPACITY, CAPACITY}, new String[]{RACK0, RACK1}, CAPACITY, RACK2, |
| new HostNameBasedNodes(new String[] {"datanodeX", "datanodeY", "datanodeZ"}, |
| excludeHosts, BalancerParameters.DEFAULT.getIncludedNodes()), true, |
| true); |
| } |
| |
| /** |
| * Test a cluster with even distribution,G |
| * then three nodes are added to the cluster, |
| * runs balancer with two of the nodes in the exclude list |
| */ |
| @Test(timeout=100000) |
| public void testBalancerCliWithExcludeListWithPortsInAFile() throws Exception { |
| final Configuration conf = new HdfsConfiguration(); |
| initConf(conf); |
| doTest(conf, new long[]{CAPACITY, CAPACITY}, new String[]{RACK0, RACK1}, |
| CAPACITY, RACK2, new PortNumberBasedNodes(3, 2, 0), true, true); |
| } |
| |
| /** |
| * Test a cluster with even distribution, |
| * then three nodes are added to the cluster, |
| * runs balancer with two of the nodes in the include list |
| */ |
| @Test(timeout=100000) |
| public void testBalancerWithIncludeList() throws Exception { |
| final Configuration conf = new HdfsConfiguration(); |
| initConf(conf); |
| Set<String> includeHosts = new HashSet<String>(); |
| includeHosts.add( "datanodeY"); |
| doTest(conf, new long[]{CAPACITY, CAPACITY}, new String[]{RACK0, RACK1}, CAPACITY, RACK2, |
| new HostNameBasedNodes(new String[] {"datanodeX", "datanodeY", "datanodeZ"}, |
| BalancerParameters.DEFAULT.getExcludedNodes(), includeHosts), |
| false, false); |
| } |
| |
| /** |
| * Test a cluster with even distribution, |
| * then three nodes are added to the cluster, |
| * runs balancer with two of the nodes in the include list |
| */ |
| @Test(timeout=100000) |
| public void testBalancerWithIncludeListWithPorts() throws Exception { |
| final Configuration conf = new HdfsConfiguration(); |
| initConf(conf); |
| doTest(conf, new long[]{CAPACITY, CAPACITY}, new String[]{RACK0, RACK1}, |
| CAPACITY, RACK2, new PortNumberBasedNodes(3, 0, 1), false, false); |
| } |
| |
| /** |
| * Test a cluster with even distribution, |
| * then three nodes are added to the cluster, |
| * runs balancer with two of the nodes in the include list |
| */ |
| @Test(timeout=100000) |
| public void testBalancerCliWithIncludeList() throws Exception { |
| final Configuration conf = new HdfsConfiguration(); |
| initConf(conf); |
| Set<String> includeHosts = new HashSet<String>(); |
| includeHosts.add( "datanodeY"); |
| doTest(conf, new long[]{CAPACITY, CAPACITY}, new String[]{RACK0, RACK1}, CAPACITY, RACK2, |
| new HostNameBasedNodes(new String[] {"datanodeX", "datanodeY", "datanodeZ"}, |
| BalancerParameters.DEFAULT.getExcludedNodes(), includeHosts), true, |
| false); |
| } |
| |
| /** |
| * Test a cluster with even distribution, |
| * then three nodes are added to the cluster, |
| * runs balancer with two of the nodes in the include list |
| */ |
| @Test(timeout=100000) |
| public void testBalancerCliWithIncludeListWithPorts() throws Exception { |
| final Configuration conf = new HdfsConfiguration(); |
| initConf(conf); |
| doTest(conf, new long[]{CAPACITY, CAPACITY}, new String[]{RACK0, RACK1}, |
| CAPACITY, RACK2, new PortNumberBasedNodes(3, 0, 1), true, false); |
| } |
| |
| /** |
| * Test a cluster with even distribution, |
| * then three nodes are added to the cluster, |
| * runs balancer with two of the nodes in the include list |
| */ |
| @Test(timeout=100000) |
| public void testBalancerCliWithIncludeListInAFile() throws Exception { |
| final Configuration conf = new HdfsConfiguration(); |
| initConf(conf); |
| Set<String> includeHosts = new HashSet<String>(); |
| includeHosts.add( "datanodeY"); |
| doTest(conf, new long[]{CAPACITY, CAPACITY}, new String[]{RACK0, RACK1}, CAPACITY, RACK2, |
| new HostNameBasedNodes(new String[] {"datanodeX", "datanodeY", "datanodeZ"}, |
| BalancerParameters.DEFAULT.getExcludedNodes(), includeHosts), true, |
| true); |
| } |
| |
| /** |
| * Test a cluster with even distribution, |
| * then three nodes are added to the cluster, |
| * runs balancer with two of the nodes in the include list |
| */ |
| @Test(timeout=100000) |
| public void testBalancerCliWithIncludeListWithPortsInAFile() throws Exception { |
| final Configuration conf = new HdfsConfiguration(); |
| initConf(conf); |
| doTest(conf, new long[]{CAPACITY, CAPACITY}, new String[]{RACK0, RACK1}, |
| CAPACITY, RACK2, new PortNumberBasedNodes(3, 0, 1), true, true); |
| } |
| |
| /* |
| * Test Balancer with Ram_Disk configured |
| * One DN has two files on RAM_DISK, other DN has no files on RAM_DISK. |
| * Then verify that the balancer does not migrate files on RAM_DISK across DN. |
| */ |
| @Test(timeout=300000) |
| public void testBalancerWithRamDisk() throws Exception { |
| final int SEED = 0xFADED; |
| final short REPL_FACT = 1; |
| Configuration conf = new Configuration(); |
| |
| final int defaultRamDiskCapacity = 10; |
| final long ramDiskStorageLimit = |
| ((long) defaultRamDiskCapacity * DEFAULT_RAM_DISK_BLOCK_SIZE) + |
| (DEFAULT_RAM_DISK_BLOCK_SIZE - 1); |
| final long diskStorageLimit = |
| ((long) defaultRamDiskCapacity * DEFAULT_RAM_DISK_BLOCK_SIZE) + |
| (DEFAULT_RAM_DISK_BLOCK_SIZE - 1); |
| |
| initConfWithRamDisk(conf, ramDiskStorageLimit); |
| |
| cluster = new MiniDFSCluster |
| .Builder(conf) |
| .numDataNodes(1) |
| .storageCapacities(new long[] { ramDiskStorageLimit, diskStorageLimit }) |
| .storageTypes(new StorageType[] { RAM_DISK, DEFAULT }) |
| .build(); |
| |
| cluster.waitActive(); |
| // Create few files on RAM_DISK |
| final String METHOD_NAME = GenericTestUtils.getMethodName(); |
| final Path path1 = new Path("/" + METHOD_NAME + ".01.dat"); |
| final Path path2 = new Path("/" + METHOD_NAME + ".02.dat"); |
| |
| DistributedFileSystem fs = cluster.getFileSystem(); |
| DFSClient client = fs.getClient(); |
| DFSTestUtil.createFile(fs, path1, true, |
| DEFAULT_RAM_DISK_BLOCK_SIZE, 4 * DEFAULT_RAM_DISK_BLOCK_SIZE, |
| DEFAULT_RAM_DISK_BLOCK_SIZE, REPL_FACT, SEED, true); |
| DFSTestUtil.createFile(fs, path2, true, |
| DEFAULT_RAM_DISK_BLOCK_SIZE, 1 * DEFAULT_RAM_DISK_BLOCK_SIZE, |
| DEFAULT_RAM_DISK_BLOCK_SIZE, REPL_FACT, SEED, true); |
| |
| // Sleep for a short time to allow the lazy writer thread to do its job |
| Thread.sleep(6 * 1000); |
| |
| // Add another fresh DN with the same type/capacity without files on RAM_DISK |
| StorageType[][] storageTypes = new StorageType[][] {{RAM_DISK, DEFAULT}}; |
| long[][] storageCapacities = new long[][]{{ramDiskStorageLimit, |
| diskStorageLimit}}; |
| cluster.startDataNodes(conf, REPL_FACT, storageTypes, true, null, |
| null, null, storageCapacities, null, false, false, false, null); |
| |
| cluster.triggerHeartbeats(); |
| Collection<URI> namenodes = DFSUtil.getInternalNsRpcUris(conf); |
| |
| // Run Balancer |
| final BalancerParameters p = BalancerParameters.DEFAULT; |
| final int r = Balancer.run(namenodes, p, conf); |
| |
| // Validate no RAM_DISK block should be moved |
| assertEquals(ExitStatus.NO_MOVE_PROGRESS.getExitCode(), r); |
| |
| // Verify files are still on RAM_DISK |
| DFSTestUtil.verifyFileReplicasOnStorageType(fs, client, path1, RAM_DISK); |
| DFSTestUtil.verifyFileReplicasOnStorageType(fs, client, path2, RAM_DISK); |
| } |
| |
| /** |
| * Check that the balancer exits when there is an unfinalized upgrade. |
| */ |
| @Test(timeout=300000) |
| public void testBalancerDuringUpgrade() throws Exception { |
| final int SEED = 0xFADED; |
| Configuration conf = new HdfsConfiguration(); |
| conf.setLong(DFS_HEARTBEAT_INTERVAL_KEY, 1); |
| conf.setInt(DFS_NAMENODE_HEARTBEAT_RECHECK_INTERVAL_KEY, 500); |
| conf.setLong(DFSConfigKeys.DFS_NAMENODE_REPLICATION_INTERVAL_KEY, 1); |
| |
| conf.setLong(DFSConfigKeys.DFS_BALANCER_GETBLOCKS_MIN_BLOCK_SIZE_KEY, 1L); |
| |
| final int BLOCK_SIZE = 1024*1024; |
| cluster = new MiniDFSCluster |
| .Builder(conf) |
| .numDataNodes(1) |
| .storageCapacities(new long[] { BLOCK_SIZE * 10 }) |
| .storageTypes(new StorageType[] { DEFAULT }) |
| .storagesPerDatanode(1) |
| .build(); |
| cluster.waitActive(); |
| // Create a file on the single DN |
| final String METHOD_NAME = GenericTestUtils.getMethodName(); |
| final Path path1 = new Path("/" + METHOD_NAME + ".01.dat"); |
| |
| DistributedFileSystem fs = cluster.getFileSystem(); |
| DFSTestUtil.createFile(fs, path1, BLOCK_SIZE, BLOCK_SIZE * 2, BLOCK_SIZE, |
| (short) 1, SEED); |
| |
| // Add another DN with the same capacity, cluster is now unbalanced |
| cluster.startDataNodes(conf, 1, true, null, null); |
| cluster.triggerHeartbeats(); |
| Collection<URI> namenodes = DFSUtil.getInternalNsRpcUris(conf); |
| |
| // Run balancer |
| final BalancerParameters p = BalancerParameters.DEFAULT; |
| |
| fs.setSafeMode(HdfsConstants.SafeModeAction.SAFEMODE_ENTER); |
| fs.rollingUpgrade(HdfsConstants.RollingUpgradeAction.PREPARE); |
| fs.setSafeMode(HdfsConstants.SafeModeAction.SAFEMODE_LEAVE); |
| |
| // Rolling upgrade should abort the balancer |
| assertEquals(ExitStatus.UNFINALIZED_UPGRADE.getExitCode(), |
| Balancer.run(namenodes, p, conf)); |
| |
| // Should work with the -runDuringUpgrade flag. |
| BalancerParameters.Builder b = |
| new BalancerParameters.Builder(); |
| b.setRunDuringUpgrade(true); |
| final BalancerParameters runDuringUpgrade = b.build(); |
| assertEquals(ExitStatus.SUCCESS.getExitCode(), |
| Balancer.run(namenodes, runDuringUpgrade, conf)); |
| |
| // Finalize the rolling upgrade |
| fs.rollingUpgrade(HdfsConstants.RollingUpgradeAction.FINALIZE); |
| |
| // Should also work after finalization. |
| assertEquals(ExitStatus.SUCCESS.getExitCode(), |
| Balancer.run(namenodes, p, conf)); |
| } |
| |
| /** |
| * Test special case. Two replicas belong to same block should not in same node. |
| * We have 2 nodes. |
| * We have a block in (DN0,SSD) and (DN1,DISK). |
| * Replica in (DN0,SSD) should not be moved to (DN1,SSD). |
| * Otherwise DN1 has 2 replicas. |
| */ |
| @Test(timeout=100000) |
| public void testTwoReplicaShouldNotInSameDN() throws Exception { |
| final Configuration conf = new HdfsConfiguration(); |
| |
| int blockSize = 5 * 1024 * 1024 ; |
| conf.setLong(DFSConfigKeys.DFS_BLOCK_SIZE_KEY, blockSize); |
| conf.setLong(DFSConfigKeys.DFS_HEARTBEAT_INTERVAL_KEY, 1L); |
| conf.setLong(DFSConfigKeys.DFS_NAMENODE_REPLICATION_INTERVAL_KEY, 1L); |
| |
| conf.setLong(DFSConfigKeys.DFS_BALANCER_GETBLOCKS_MIN_BLOCK_SIZE_KEY, 1L); |
| |
| int numOfDatanodes =2; |
| cluster = new MiniDFSCluster.Builder(conf) |
| .numDataNodes(2) |
| .racks(new String[]{"/default/rack0", "/default/rack0"}) |
| .storagesPerDatanode(2) |
| .storageTypes(new StorageType[][]{ |
| {StorageType.SSD, StorageType.DISK}, |
| {StorageType.SSD, StorageType.DISK}}) |
| .storageCapacities(new long[][]{ |
| {100 * blockSize, 20 * blockSize}, |
| {20 * blockSize, 100 * blockSize}}) |
| .build(); |
| cluster.waitActive(); |
| |
| //set "/bar" directory with ONE_SSD storage policy. |
| DistributedFileSystem fs = cluster.getFileSystem(); |
| Path barDir = new Path("/bar"); |
| fs.mkdir(barDir,new FsPermission((short)777)); |
| fs.setStoragePolicy(barDir, HdfsConstants.ONESSD_STORAGE_POLICY_NAME); |
| |
| // Insert 30 blocks. So (DN0,SSD) and (DN1,DISK) are about half full, |
| // and (DN0,SSD) and (DN1,DISK) are about 15% full. |
| long fileLen = 30 * blockSize; |
| // fooFile has ONE_SSD policy. So |
| // (DN0,SSD) and (DN1,DISK) have 2 replicas belong to same block. |
| // (DN0,DISK) and (DN1,SSD) have 2 replicas belong to same block. |
| Path fooFile = new Path(barDir, "foo"); |
| createFile(cluster, fooFile, fileLen, (short) numOfDatanodes, 0); |
| // update space info |
| cluster.triggerHeartbeats(); |
| |
| BalancerParameters p = BalancerParameters.DEFAULT; |
| Collection<URI> namenodes = DFSUtil.getInternalNsRpcUris(conf); |
| final int r = Balancer.run(namenodes, p, conf); |
| |
| // Replica in (DN0,SSD) was not moved to (DN1,SSD), because (DN1,DISK) |
| // already has one. Otherwise DN1 will have 2 replicas. |
| // For same reason, no replicas were moved. |
| assertEquals(ExitStatus.NO_MOVE_PROGRESS.getExitCode(), r); |
| } |
| |
| /** |
| * Test running many balancer simultaneously. |
| * |
| * Case-1: First balancer is running. Now, running second one should get |
| * "Another balancer is running. Exiting.." IOException and fail immediately |
| * |
| * Case-2: When running second balancer 'balancer.id' file exists but the |
| * lease doesn't exists. Now, the second balancer should run successfully. |
| */ |
| @Test(timeout = 100000) |
| public void testManyBalancerSimultaneously() throws Exception { |
| final Configuration conf = new HdfsConfiguration(); |
| initConf(conf); |
| // add an empty node with half of the capacities(4 * CAPACITY) & the same |
| // rack |
| long[] capacities = new long[] { 4 * CAPACITY }; |
| String[] racks = new String[] { RACK0 }; |
| long newCapacity = 2 * CAPACITY; |
| String newRack = RACK0; |
| LOG.info("capacities = " + long2String(capacities)); |
| LOG.info("racks = " + Arrays.asList(racks)); |
| LOG.info("newCapacity= " + newCapacity); |
| LOG.info("newRack = " + newRack); |
| LOG.info("useTool = " + false); |
| assertEquals(capacities.length, racks.length); |
| int numOfDatanodes = capacities.length; |
| cluster = new MiniDFSCluster.Builder(conf).numDataNodes(capacities.length) |
| .racks(racks).simulatedCapacities(capacities).build(); |
| cluster.waitActive(); |
| client = NameNodeProxies.createProxy(conf, |
| cluster.getFileSystem(0).getUri(), ClientProtocol.class).getProxy(); |
| |
| long totalCapacity = sum(capacities); |
| |
| // fill up the cluster to be 30% full |
| final long totalUsedSpace = totalCapacity * 3 / 10; |
| createFile(cluster, filePath, totalUsedSpace / numOfDatanodes, |
| (short) numOfDatanodes, 0); |
| // start up an empty node with the same capacity and on the same rack |
| cluster.startDataNodes(conf, 1, true, null, new String[] { newRack }, |
| new long[] { newCapacity }); |
| |
| // Case1: Simulate first balancer by creating 'balancer.id' file. It |
| // will keep this file until the balancing operation is completed. |
| FileSystem fs = cluster.getFileSystem(0); |
| final FSDataOutputStream out = fs |
| .create(Balancer.BALANCER_ID_PATH, false); |
| out.writeBytes(InetAddress.getLocalHost().getHostName()); |
| out.hflush(); |
| assertTrue("'balancer.id' file doesn't exist!", |
| fs.exists(Balancer.BALANCER_ID_PATH)); |
| |
| // start second balancer |
| final String[] args = { "-policy", "datanode" }; |
| final Tool tool = new Cli(); |
| tool.setConf(conf); |
| int exitCode = tool.run(args); // start balancing |
| assertEquals("Exit status code mismatches", |
| ExitStatus.IO_EXCEPTION.getExitCode(), exitCode); |
| |
| // Case2: Release lease so that another balancer would be able to |
| // perform balancing. |
| out.close(); |
| assertTrue("'balancer.id' file doesn't exist!", |
| fs.exists(Balancer.BALANCER_ID_PATH)); |
| exitCode = tool.run(args); // start balancing |
| assertEquals("Exit status code mismatches", |
| ExitStatus.SUCCESS.getExitCode(), exitCode); |
| } |
| |
| /** Balancer should not move blocks with size < minBlockSize. */ |
| @Test(timeout=60000) |
| public void testMinBlockSizeAndSourceNodes() throws Exception { |
| final Configuration conf = new HdfsConfiguration(); |
| initConf(conf); |
| |
| final short replication = 3; |
| final long[] lengths = {10, 10, 10, 10}; |
| final long[] capacities = new long[replication]; |
| final long totalUsed = capacities.length * sum(lengths); |
| Arrays.fill(capacities, 1000); |
| |
| cluster = new MiniDFSCluster.Builder(conf) |
| .numDataNodes(capacities.length) |
| .simulatedCapacities(capacities) |
| .build(); |
| final DistributedFileSystem dfs = cluster.getFileSystem(); |
| cluster.waitActive(); |
| client = NameNodeProxies.createProxy(conf, dfs.getUri(), |
| ClientProtocol.class).getProxy(); |
| |
| // fill up the cluster to be 80% full |
| for(int i = 0; i < lengths.length; i++) { |
| final long size = lengths[i]; |
| final Path p = new Path("/file" + i + "_size" + size); |
| try(final OutputStream out = dfs.create(p)) { |
| for(int j = 0; j < size; j++) { |
| out.write(j); |
| } |
| } |
| } |
| |
| // start up an empty node with the same capacity |
| cluster.startDataNodes(conf, capacities.length, true, null, null, capacities); |
| LOG.info("capacities = " + Arrays.toString(capacities)); |
| LOG.info("totalUsedSpace= " + totalUsed); |
| LOG.info("lengths = " + Arrays.toString(lengths) + ", #=" + lengths.length); |
| waitForHeartBeat(totalUsed, 2*capacities[0]*capacities.length, client, cluster); |
| |
| final Collection<URI> namenodes = DFSUtil.getInternalNsRpcUris(conf); |
| |
| { // run Balancer with min-block-size=50 |
| final BalancerParameters p = Balancer.Cli.parse(new String[] { |
| "-policy", BalancingPolicy.Node.INSTANCE.getName(), |
| "-threshold", "1" |
| }); |
| assertEquals(p.getBalancingPolicy(), BalancingPolicy.Node.INSTANCE); |
| assertEquals(p.getThreshold(), 1.0, 0.001); |
| |
| conf.setLong(DFSConfigKeys.DFS_BALANCER_GETBLOCKS_MIN_BLOCK_SIZE_KEY, 50); |
| final int r = Balancer.run(namenodes, p, conf); |
| assertEquals(ExitStatus.NO_MOVE_PROGRESS.getExitCode(), r); |
| } |
| |
| conf.setLong(DFSConfigKeys.DFS_BALANCER_GETBLOCKS_MIN_BLOCK_SIZE_KEY, 1); |
| |
| { // run Balancer with empty nodes as source nodes |
| final Set<String> sourceNodes = new HashSet<>(); |
| final List<DataNode> datanodes = cluster.getDataNodes(); |
| for(int i = capacities.length; i < datanodes.size(); i++) { |
| sourceNodes.add(datanodes.get(i).getDisplayName()); |
| } |
| final BalancerParameters p = Balancer.Cli.parse(new String[] { |
| "-policy", BalancingPolicy.Node.INSTANCE.getName(), |
| "-threshold", "1", |
| "-source", StringUtils.join(sourceNodes, ',') |
| }); |
| assertEquals(p.getBalancingPolicy(), BalancingPolicy.Node.INSTANCE); |
| assertEquals(p.getThreshold(), 1.0, 0.001); |
| assertEquals(p.getSourceNodes(), sourceNodes); |
| |
| conf.setLong(DFSConfigKeys.DFS_BALANCER_GETBLOCKS_MIN_BLOCK_SIZE_KEY, 50); |
| final int r = Balancer.run(namenodes, p, conf); |
| assertEquals(ExitStatus.NO_MOVE_BLOCK.getExitCode(), r); |
| } |
| |
| { // run Balancer with a filled node as a source node |
| final Set<String> sourceNodes = new HashSet<>(); |
| final List<DataNode> datanodes = cluster.getDataNodes(); |
| sourceNodes.add(datanodes.get(0).getDisplayName()); |
| final BalancerParameters p = Balancer.Cli.parse(new String[] { |
| "-policy", BalancingPolicy.Node.INSTANCE.getName(), |
| "-threshold", "1", |
| "-source", StringUtils.join(sourceNodes, ',') |
| }); |
| assertEquals(p.getBalancingPolicy(), BalancingPolicy.Node.INSTANCE); |
| assertEquals(p.getThreshold(), 1.0, 0.001); |
| assertEquals(p.getSourceNodes(), sourceNodes); |
| |
| conf.setLong(DFSConfigKeys.DFS_BALANCER_GETBLOCKS_MIN_BLOCK_SIZE_KEY, 1); |
| final int r = Balancer.run(namenodes, p, conf); |
| assertEquals(ExitStatus.NO_MOVE_BLOCK.getExitCode(), r); |
| } |
| |
| { // run Balancer with all filled node as source nodes |
| final Set<String> sourceNodes = new HashSet<>(); |
| final List<DataNode> datanodes = cluster.getDataNodes(); |
| for(int i = 0; i < capacities.length; i++) { |
| sourceNodes.add(datanodes.get(i).getDisplayName()); |
| } |
| final BalancerParameters p = Balancer.Cli.parse(new String[] { |
| "-policy", BalancingPolicy.Node.INSTANCE.getName(), |
| "-threshold", "1", |
| "-source", StringUtils.join(sourceNodes, ',') |
| }); |
| assertEquals(p.getBalancingPolicy(), BalancingPolicy.Node.INSTANCE); |
| assertEquals(p.getThreshold(), 1.0, 0.001); |
| assertEquals(p.getSourceNodes(), sourceNodes); |
| |
| conf.setLong(DFSConfigKeys.DFS_BALANCER_GETBLOCKS_MIN_BLOCK_SIZE_KEY, 1); |
| final int r = Balancer.run(namenodes, p, conf); |
| assertEquals(ExitStatus.SUCCESS.getExitCode(), r); |
| } |
| } |
| |
| /** |
| * Test Balancer runs fine when logging in with a keytab in kerberized env. |
| * Reusing testUnknownDatanode here for basic functionality testing. |
| */ |
| @Test(timeout = 300000) |
| public void testBalancerWithKeytabs() throws Exception { |
| final Configuration conf = new HdfsConfiguration(); |
| try { |
| initSecureConf(conf); |
| final UserGroupInformation ugi = UserGroupInformation.loginUserFromKeytabAndReturnUGI( |
| principal, keytabFile.getAbsolutePath()); |
| ugi.doAs(new PrivilegedExceptionAction<Void>() { |
| @Override |
| public Void run() throws Exception { |
| // verify that balancer runs Ok. |
| testUnknownDatanode(conf); |
| // verify that UGI was logged in using keytab. |
| assertTrue(UserGroupInformation.isLoginKeytabBased()); |
| return null; |
| } |
| }); |
| } finally { |
| // Reset UGI so that other tests are not affected. |
| UserGroupInformation.reset(); |
| UserGroupInformation.setConfiguration(new Configuration()); |
| } |
| } |
| |
| private static int numGetBlocksCalls; |
| private static long startGetBlocksTime, endGetBlocksTime; |
| |
| private void spyFSNamesystem(NameNode nn) throws IOException { |
| FSNamesystem fsnSpy = NameNodeAdapter.spyOnNamesystem(nn); |
| numGetBlocksCalls = 0; |
| endGetBlocksTime = startGetBlocksTime = Time.monotonicNow(); |
| doAnswer(new Answer<BlocksWithLocations>() { |
| @Override |
| public BlocksWithLocations answer(InvocationOnMock invocation) |
| throws Throwable { |
| BlocksWithLocations blk = |
| (BlocksWithLocations)invocation.callRealMethod(); |
| endGetBlocksTime = Time.monotonicNow(); |
| numGetBlocksCalls++; |
| return blk; |
| }}).when(fsnSpy).getBlocks(any(DatanodeID.class), anyLong()); |
| } |
| |
| /** |
| * Test that makes the Balancer to disperse RPCs to the NameNode |
| * in order to avoid NN's RPC queue saturation. |
| */ |
| void testBalancerRPCDelay() throws Exception { |
| final Configuration conf = new HdfsConfiguration(); |
| initConf(conf); |
| conf.setInt(DFSConfigKeys.DFS_BALANCER_DISPATCHERTHREADS_KEY, 30); |
| |
| int numDNs = 20; |
| long[] capacities = new long[numDNs]; |
| String[] racks = new String[numDNs]; |
| for(int i = 0; i < numDNs; i++) { |
| capacities[i] = CAPACITY; |
| racks[i] = (i < numDNs/2 ? RACK0 : RACK1); |
| } |
| doTest(conf, capacities, racks, CAPACITY, RACK2, |
| new PortNumberBasedNodes(3, 0, 0), false, false, true); |
| assertTrue("Number of getBlocks should be not less than " + |
| Dispatcher.BALANCER_NUM_RPC_PER_SEC, |
| numGetBlocksCalls > Dispatcher.BALANCER_NUM_RPC_PER_SEC); |
| long d = 1 + endGetBlocksTime - startGetBlocksTime; |
| LOG.info("Balancer executed " + numGetBlocksCalls |
| + " getBlocks in " + d + " msec."); |
| assertTrue("Expected BALANCER_NUM_RPC_PER_SEC = " + |
| Dispatcher.BALANCER_NUM_RPC_PER_SEC, |
| (numGetBlocksCalls * 1000 / d) < Dispatcher.BALANCER_NUM_RPC_PER_SEC); |
| } |
| |
| /** |
| * @param args |
| */ |
| public static void main(String[] args) throws Exception { |
| TestBalancer balancerTest = new TestBalancer(); |
| balancerTest.testBalancer0(); |
| balancerTest.testBalancer1(); |
| balancerTest.testBalancer2(); |
| } |
| } |