| /** |
| * Licensed to the Apache Software Foundation (ASF) under one or more |
| * contributor license agreements. See the NOTICE file distributed with this |
| * work for additional information regarding copyright ownership. The ASF |
| * licenses this file to you under the Apache License, Version 2.0 (the |
| * "License"); you may not use this file except in compliance with the License. |
| * You may obtain a copy of the License at |
| * <p> |
| * http://www.apache.org/licenses/LICENSE-2.0 |
| * <p> |
| * Unless required by applicable law or agreed to in writing, software |
| * distributed under the License is distributed on an "AS IS" BASIS, WITHOUT |
| * WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. See the |
| * License for the specific language governing permissions and limitations under |
| * the License. |
| */ |
| package org.apache.hadoop.hdfs.server.diskbalancer; |
| |
| import org.apache.hadoop.util.Preconditions; |
| import java.util.function.Supplier; |
| import org.apache.commons.codec.digest.DigestUtils; |
| import org.apache.hadoop.conf.Configuration; |
| import org.apache.hadoop.conf.ReconfigurationException; |
| import org.apache.hadoop.fs.FileSystem; |
| import org.apache.hadoop.fs.Path; |
| import org.apache.hadoop.fs.StorageType; |
| import org.apache.hadoop.hdfs.DFSConfigKeys; |
| import org.apache.hadoop.hdfs.DFSTestUtil; |
| import org.apache.hadoop.hdfs.HdfsConfiguration; |
| import org.apache.hadoop.hdfs.MiniDFSCluster; |
| import org.apache.hadoop.hdfs.MiniDFSNNTopology; |
| import org.apache.hadoop.hdfs.protocol.ExtendedBlock; |
| import org.apache.hadoop.hdfs.server.balancer.TestBalancer; |
| import org.apache.hadoop.hdfs.server.datanode.DataNode; |
| import org.apache.hadoop.hdfs.server.datanode.DiskBalancer; |
| import org.apache.hadoop.hdfs.server.datanode.DiskBalancer.DiskBalancerMover; |
| import org.apache.hadoop.hdfs.server.datanode.DiskBalancer.VolumePair; |
| import org.apache.hadoop.hdfs.server.datanode.DiskBalancerWorkItem; |
| import org.apache.hadoop.hdfs.server.datanode.DiskBalancerWorkStatus; |
| import org.apache.hadoop.hdfs.server.datanode.DiskBalancerWorkStatus.Result; |
| import org.apache.hadoop.hdfs.server.datanode.fsdataset.FsDatasetSpi; |
| import org.apache.hadoop.hdfs.server.datanode.fsdataset.FsVolumeSpi; |
| import org.apache.hadoop.hdfs.server.datanode.fsdataset.impl.FsVolumeImpl; |
| import org.apache.hadoop.hdfs.server.diskbalancer.connectors.ClusterConnector; |
| import org.apache.hadoop.hdfs.server.diskbalancer.connectors.ConnectorFactory; |
| import org.apache.hadoop.hdfs.server.diskbalancer.datamodel.DiskBalancerCluster; |
| import org.apache.hadoop.hdfs.server.diskbalancer.datamodel.DiskBalancerDataNode; |
| import org.apache.hadoop.hdfs.server.diskbalancer.planner.NodePlan; |
| import org.apache.hadoop.test.GenericTestUtils; |
| import org.apache.hadoop.util.Time; |
| import org.junit.Assert; |
| import org.junit.Test; |
| import org.mockito.Mockito; |
| import org.mockito.invocation.InvocationOnMock; |
| import org.mockito.stubbing.Answer; |
| import org.slf4j.Logger; |
| import org.slf4j.LoggerFactory; |
| |
| import java.io.IOException; |
| import java.util.ArrayList; |
| import java.util.LinkedList; |
| import java.util.List; |
| import java.util.concurrent.CountDownLatch; |
| import java.util.concurrent.TimeoutException; |
| import java.util.concurrent.atomic.AtomicInteger; |
| |
| import static org.apache.hadoop.hdfs.DFSConfigKeys.DFS_DATANODE_DATA_DIR_KEY; |
| import static org.hamcrest.core.Is.is; |
| import static org.junit.Assert.assertEquals; |
| import static org.junit.Assert.assertNotNull; |
| import static org.junit.Assert.assertThat; |
| import static org.junit.Assert.assertTrue; |
| import static org.mockito.ArgumentMatchers.any; |
| import static org.mockito.Mockito.doAnswer; |
| |
| /** |
| * Test Disk Balancer. |
| */ |
| public class TestDiskBalancer { |
| |
| private static final String PLAN_FILE = "/system/current.plan.json"; |
| static final Logger LOG = LoggerFactory.getLogger(TestDiskBalancer.class); |
| |
| @Test |
| public void testDiskBalancerNameNodeConnectivity() throws Exception { |
| Configuration conf = new HdfsConfiguration(); |
| conf.setBoolean(DFSConfigKeys.DFS_DISK_BALANCER_ENABLED, true); |
| final int numDatanodes = 2; |
| MiniDFSCluster cluster = new MiniDFSCluster.Builder(conf) |
| .numDataNodes(numDatanodes).build(); |
| try { |
| cluster.waitActive(); |
| ClusterConnector nameNodeConnector = |
| ConnectorFactory.getCluster(cluster.getFileSystem(0).getUri(), conf); |
| |
| DiskBalancerCluster diskBalancerCluster = |
| new DiskBalancerCluster(nameNodeConnector); |
| diskBalancerCluster.readClusterInfo(); |
| assertEquals(diskBalancerCluster.getNodes().size(), numDatanodes); |
| DataNode dnNode = cluster.getDataNodes().get(0); |
| DiskBalancerDataNode dbDnNode = |
| diskBalancerCluster.getNodeByUUID(dnNode.getDatanodeUuid()); |
| assertEquals(dnNode.getDatanodeUuid(), dbDnNode.getDataNodeUUID()); |
| assertEquals(dnNode.getDatanodeId().getIpAddr(), |
| dbDnNode.getDataNodeIP()); |
| assertEquals(dnNode.getDatanodeId().getHostName(), |
| dbDnNode.getDataNodeName()); |
| try (FsDatasetSpi.FsVolumeReferences ref = dnNode.getFSDataset() |
| .getFsVolumeReferences()) { |
| assertEquals(ref.size(), dbDnNode.getVolumeCount()); |
| } |
| |
| // Shutdown the DN first, to verify that calling diskbalancer APIs on |
| // uninitialized DN doesn't NPE |
| dnNode.shutdown(); |
| assertEquals("", dnNode.getDiskBalancerStatus()); |
| } finally { |
| cluster.shutdown(); |
| } |
| } |
| |
| /** |
| * This test simulates a real Data node working with DiskBalancer. |
| * <p> |
| * Here is the overview of this test. |
| * <p> |
| * 1. Write a bunch of blocks and move them to one disk to create imbalance. |
| * 2. Rewrite the capacity of the disks in DiskBalancer Model so that planner |
| * will produce a move plan. 3. Execute the move plan and wait unitl the plan |
| * is done. 4. Verify the source disk has blocks now. |
| * |
| * @throws Exception |
| */ |
| @Test |
| public void testDiskBalancerEndToEnd() throws Exception { |
| |
| Configuration conf = new HdfsConfiguration(); |
| conf.setBoolean(DFSConfigKeys.DFS_DISK_BALANCER_ENABLED, true); |
| final int blockCount = 100; |
| final int blockSize = 1024; |
| final int diskCount = 2; |
| final int dataNodeCount = 1; |
| final int dataNodeIndex = 0; |
| final int sourceDiskIndex = 0; |
| final long cap = blockSize * 2L * blockCount; |
| |
| MiniDFSCluster cluster = new ClusterBuilder() |
| .setBlockCount(blockCount) |
| .setBlockSize(blockSize) |
| .setDiskCount(diskCount) |
| .setNumDatanodes(dataNodeCount) |
| .setConf(conf) |
| .setCapacities(new long[] {cap, cap}) |
| .build(); |
| try { |
| DataMover dataMover = new DataMover(cluster, dataNodeIndex, |
| sourceDiskIndex, conf, blockSize, blockCount); |
| dataMover.moveDataToSourceDisk(); |
| NodePlan plan = dataMover.generatePlan(); |
| dataMover.executePlan(plan); |
| dataMover.verifyPlanExectionDone(); |
| dataMover.verifyAllVolumesHaveData(true); |
| dataMover.verifyTolerance(plan, 0, sourceDiskIndex, 10); |
| } finally { |
| cluster.shutdown(); |
| } |
| } |
| |
| @Test |
| public void testDiskBalancerWithFederatedCluster() throws Exception { |
| |
| Configuration conf = new HdfsConfiguration(); |
| conf.setBoolean(DFSConfigKeys.DFS_DISK_BALANCER_ENABLED, true); |
| final int blockCount = 100; |
| final int blockSize = 1024; |
| final int diskCount = 2; |
| final int dataNodeCount = 1; |
| final int dataNodeIndex = 0; |
| final int sourceDiskIndex = 0; |
| final long cap = blockSize * 3L * blockCount; |
| |
| conf.setInt(DFSConfigKeys.DFS_BLOCK_SIZE_KEY, blockSize); |
| conf.setInt(DFSConfigKeys.DFS_BYTES_PER_CHECKSUM_KEY, blockSize); |
| |
| final MiniDFSCluster cluster = new MiniDFSCluster.Builder(conf) |
| .nnTopology(MiniDFSNNTopology.simpleFederatedTopology(2)) |
| .numDataNodes(dataNodeCount) |
| .storagesPerDatanode(diskCount) |
| .storageCapacities(new long[] {cap, cap}) |
| .build(); |
| cluster.waitActive(); |
| |
| DFSTestUtil.setFederatedConfiguration(cluster, conf); |
| |
| final String fileName = "/tmp.txt"; |
| final Path filePath = new Path(fileName); |
| long fileLen = blockCount * blockSize; |
| |
| |
| FileSystem fs = cluster.getFileSystem(0); |
| TestBalancer.createFile(cluster, filePath, fileLen, (short) 1, |
| 0); |
| DFSTestUtil.waitReplication(fs, filePath, (short) 1); |
| |
| fs = cluster.getFileSystem(1); |
| TestBalancer.createFile(cluster, filePath, fileLen, (short) 1, |
| 1); |
| DFSTestUtil.waitReplication(fs, filePath, (short) 1); |
| |
| try { |
| DataMover dataMover = new DataMover(cluster, dataNodeIndex, |
| sourceDiskIndex, conf, blockSize, blockCount); |
| dataMover.moveDataToSourceDisk(); |
| NodePlan plan = dataMover.generatePlan(); |
| dataMover.executePlan(plan); |
| dataMover.verifyPlanExectionDone(); |
| dataMover.verifyAllVolumesHaveData(true); |
| dataMover.verifyTolerance(plan, 0, sourceDiskIndex, 10); |
| } finally { |
| cluster.shutdown(); |
| } |
| } |
| |
| @Test |
| public void testDiskBalancerComputeDelay() throws Exception { |
| |
| Configuration conf = new HdfsConfiguration(); |
| conf.setBoolean(DFSConfigKeys.DFS_DISK_BALANCER_ENABLED, true); |
| |
| final int blockCount = 100; |
| final int blockSize = 11 * 1024 * 1024; |
| final int diskCount = 2; |
| final int dataNodeCount = 1; |
| final int dataNodeIndex = 0; |
| final long cap = blockSize * 2L * blockCount; |
| |
| conf.setInt(DFSConfigKeys.DFS_BLOCK_SIZE_KEY, blockSize); |
| conf.setInt(DFSConfigKeys.DFS_BYTES_PER_CHECKSUM_KEY, blockSize); |
| |
| final MiniDFSCluster cluster = new ClusterBuilder() |
| .setBlockCount(blockCount).setBlockSize(blockSize) |
| .setDiskCount(diskCount).setNumDatanodes(dataNodeCount).setConf(conf) |
| .setCapacities(new long[] {cap, cap }).build(); |
| |
| try { |
| DataNode node = cluster.getDataNodes().get(dataNodeIndex); |
| |
| final FsDatasetSpi<?> fsDatasetSpy = Mockito.spy(node.getFSDataset()); |
| DiskBalancerWorkItem item = Mockito.spy(new DiskBalancerWorkItem()); |
| // Mocking bandwidth as 10mb/sec. |
| Mockito.doReturn((long) 10).when(item).getBandwidth(); |
| |
| doAnswer(new Answer<Object>() { |
| public Object answer(InvocationOnMock invocation) { |
| try { |
| node.getFSDataset().moveBlockAcrossVolumes( |
| (ExtendedBlock) invocation.getArguments()[0], |
| (FsVolumeSpi) invocation.getArguments()[1]); |
| } catch (Exception e) { |
| LOG.error(e.getMessage()); |
| } |
| return null; |
| } |
| }).when(fsDatasetSpy).moveBlockAcrossVolumes(any(ExtendedBlock.class), |
| any(FsVolumeSpi.class)); |
| |
| DiskBalancerMover diskBalancerMover = new DiskBalancerMover(fsDatasetSpy, |
| conf); |
| |
| diskBalancerMover.setRunnable(); |
| |
| // bytesCopied - 20 * 1024 *1024 byteCopied. |
| // timeUsed - 1200 in milliseconds |
| // item - set DiskBalancerWorkItem bandwidth as 10 |
| // Expect return sleep delay in Milliseconds. sleep value = bytesCopied / |
| // (1024*1024*bandwidth in MB/milli) - timeUsed; |
| long val = diskBalancerMover.computeDelay(20 * 1024 * 1024, 1200, item); |
| Assert.assertEquals(val, (long) 800); |
| } catch (Exception e) { |
| Assert.fail("Unexpected exception: " + e); |
| } finally { |
| if (cluster != null) { |
| cluster.shutdown(); |
| } |
| } |
| } |
| |
| @Test |
| public void testDiskBalancerWithFedClusterWithOneNameServiceEmpty() throws |
| Exception { |
| Configuration conf = new HdfsConfiguration(); |
| conf.setBoolean(DFSConfigKeys.DFS_DISK_BALANCER_ENABLED, true); |
| final int blockCount = 100; |
| final int blockSize = 1024; |
| final int diskCount = 2; |
| final int dataNodeCount = 1; |
| final int dataNodeIndex = 0; |
| final int sourceDiskIndex = 0; |
| final long cap = blockSize * 3L * blockCount; |
| |
| conf.setInt(DFSConfigKeys.DFS_BLOCK_SIZE_KEY, blockSize); |
| conf.setInt(DFSConfigKeys.DFS_BYTES_PER_CHECKSUM_KEY, blockSize); |
| |
| final MiniDFSCluster cluster = new MiniDFSCluster.Builder(conf) |
| .nnTopology(MiniDFSNNTopology.simpleFederatedTopology(2)) |
| .numDataNodes(dataNodeCount) |
| .storagesPerDatanode(diskCount) |
| .storageCapacities(new long[] {cap, cap}) |
| .build(); |
| cluster.waitActive(); |
| |
| DFSTestUtil.setFederatedConfiguration(cluster, conf); |
| |
| final String fileName = "/tmp.txt"; |
| final Path filePath = new Path(fileName); |
| long fileLen = blockCount * blockSize; |
| |
| //Writing data only to one nameservice. |
| FileSystem fs = cluster.getFileSystem(0); |
| TestBalancer.createFile(cluster, filePath, fileLen, (short) 1, |
| 0); |
| DFSTestUtil.waitReplication(fs, filePath, (short) 1); |
| |
| GenericTestUtils.LogCapturer logCapturer = GenericTestUtils.LogCapturer |
| .captureLogs(DiskBalancer.LOG); |
| |
| try { |
| DataMover dataMover = new DataMover(cluster, dataNodeIndex, |
| sourceDiskIndex, conf, blockSize, blockCount); |
| dataMover.moveDataToSourceDisk(); |
| NodePlan plan = dataMover.generatePlan(); |
| dataMover.executePlan(plan); |
| dataMover.verifyPlanExectionDone(); |
| // Because here we have one nameservice empty, don't check blockPoolCount. |
| dataMover.verifyAllVolumesHaveData(false); |
| } finally { |
| String logOut = logCapturer.getOutput(); |
| Assert.assertTrue("Wrong log: " + logOut, logOut.contains( |
| "NextBlock call returned null. No valid block to copy.")); |
| cluster.shutdown(); |
| } |
| } |
| |
| @Test |
| public void testBalanceDataBetweenMultiplePairsOfVolumes() |
| throws Exception { |
| |
| Configuration conf = new HdfsConfiguration(); |
| conf.setBoolean(DFSConfigKeys.DFS_DISK_BALANCER_ENABLED, true); |
| final int blockCount = 1000; |
| final int blockSize = 1024; |
| |
| // create 3 disks, that means we will have 2 plans |
| // Move Data from disk0->disk1 and disk0->disk2. |
| final int diskCount = 3; |
| final int dataNodeCount = 1; |
| final int dataNodeIndex = 0; |
| final int sourceDiskIndex = 0; |
| final long cap = blockSize * 2L * blockCount; |
| |
| MiniDFSCluster cluster = new ClusterBuilder() |
| .setBlockCount(blockCount) |
| .setBlockSize(blockSize) |
| .setDiskCount(diskCount) |
| .setNumDatanodes(dataNodeCount) |
| .setConf(conf) |
| .setCapacities(new long[] {cap, cap, cap}) |
| .build(); |
| |
| try { |
| DataMover dataMover = new DataMover(cluster, dataNodeIndex, |
| sourceDiskIndex, conf, blockSize, blockCount); |
| dataMover.moveDataToSourceDisk(); |
| NodePlan plan = dataMover.generatePlan(); |
| |
| // 3 disks , The plan should move data both disks, |
| // so we must have 2 plan steps. |
| assertEquals(plan.getVolumeSetPlans().size(), 2); |
| |
| dataMover.executePlan(plan); |
| dataMover.verifyPlanExectionDone(); |
| dataMover.verifyAllVolumesHaveData(true); |
| dataMover.verifyTolerance(plan, 0, sourceDiskIndex, 10); |
| } finally { |
| cluster.shutdown(); |
| } |
| } |
| |
| /** |
| * Test disk balancer behavior when one of the disks involved |
| * in balancing operation is removed after submitting the plan. |
| * @throws Exception |
| */ |
| @Test |
| public void testDiskBalancerWhenRemovingVolumes() throws Exception { |
| |
| Configuration conf = new HdfsConfiguration(); |
| conf.setBoolean(DFSConfigKeys.DFS_DISK_BALANCER_ENABLED, true); |
| |
| final int blockCount = 100; |
| final int blockSize = 1024; |
| final int diskCount = 2; |
| final int dataNodeCount = 1; |
| final int dataNodeIndex = 0; |
| final int sourceDiskIndex = 0; |
| final long cap = blockSize * 2L * blockCount; |
| |
| MiniDFSCluster cluster = new ClusterBuilder() |
| .setBlockCount(blockCount) |
| .setBlockSize(blockSize) |
| .setDiskCount(diskCount) |
| .setNumDatanodes(dataNodeCount) |
| .setConf(conf) |
| .setCapacities(new long[] {cap, cap}) |
| .build(); |
| |
| try { |
| DataMover dataMover = new DataMover(cluster, dataNodeIndex, |
| sourceDiskIndex, conf, blockSize, blockCount); |
| dataMover.moveDataToSourceDisk(); |
| NodePlan plan = dataMover.generatePlan(); |
| dataMover.executePlanDuringDiskRemove(plan); |
| dataMover.verifyAllVolumesHaveData(true); |
| dataMover.verifyTolerance(plan, 0, sourceDiskIndex, 10); |
| } catch (Exception e) { |
| Assert.fail("Unexpected exception: " + e); |
| } finally { |
| if (cluster != null) { |
| cluster.shutdown(); |
| } |
| } |
| } |
| |
| /** |
| * Helper class that allows us to create different kinds of MiniDFSClusters |
| * and populate data. |
| */ |
| static class ClusterBuilder { |
| private Configuration conf; |
| private int blockSize; |
| private int numDatanodes; |
| private int fileLen; |
| private int blockCount; |
| private int diskCount; |
| private long[] capacities; |
| |
| public ClusterBuilder setConf(Configuration conf) { |
| this.conf = conf; |
| return this; |
| } |
| |
| public ClusterBuilder setBlockSize(int blockSize) { |
| this.blockSize = blockSize; |
| return this; |
| } |
| |
| public ClusterBuilder setNumDatanodes(int datanodeCount) { |
| this.numDatanodes = datanodeCount; |
| return this; |
| } |
| |
| public ClusterBuilder setBlockCount(int blockCount) { |
| this.blockCount = blockCount; |
| return this; |
| } |
| |
| public ClusterBuilder setDiskCount(int diskCount) { |
| this.diskCount = diskCount; |
| return this; |
| } |
| |
| private ClusterBuilder setCapacities(final long[] caps) { |
| this.capacities = caps; |
| return this; |
| } |
| |
| private StorageType[] getStorageTypes(int diskCount) { |
| Preconditions.checkState(diskCount > 0); |
| StorageType[] array = new StorageType[diskCount]; |
| for (int x = 0; x < diskCount; x++) { |
| array[x] = StorageType.DISK; |
| } |
| return array; |
| } |
| |
| public MiniDFSCluster build() throws IOException, TimeoutException, |
| InterruptedException { |
| Preconditions.checkNotNull(this.conf); |
| Preconditions.checkState(blockSize > 0); |
| Preconditions.checkState(numDatanodes > 0); |
| fileLen = blockCount * blockSize; |
| Preconditions.checkState(fileLen > 0); |
| conf.setBoolean(DFSConfigKeys.DFS_DISK_BALANCER_ENABLED, true); |
| conf.setLong(DFSConfigKeys.DFS_BLOCK_SIZE_KEY, blockSize); |
| conf.setInt(DFSConfigKeys.DFS_BYTES_PER_CHECKSUM_KEY, blockSize); |
| conf.setLong(DFSConfigKeys.DFS_HEARTBEAT_INTERVAL_KEY, 1L); |
| |
| final String fileName = "/tmp.txt"; |
| Path filePath = new Path(fileName); |
| fileLen = blockCount * blockSize; |
| |
| |
| // Write a file and restart the cluster |
| MiniDFSCluster cluster = new MiniDFSCluster.Builder(conf) |
| .numDataNodes(numDatanodes) |
| .storageCapacities(capacities) |
| .storageTypes(getStorageTypes(diskCount)) |
| .storagesPerDatanode(diskCount) |
| .build(); |
| generateData(filePath, cluster); |
| cluster.restartDataNodes(); |
| cluster.waitActive(); |
| return cluster; |
| } |
| |
| private void generateData(Path filePath, MiniDFSCluster cluster) |
| throws IOException, InterruptedException, TimeoutException { |
| cluster.waitActive(); |
| FileSystem fs = cluster.getFileSystem(0); |
| TestBalancer.createFile(cluster, filePath, fileLen, (short) 1, |
| numDatanodes - 1); |
| DFSTestUtil.waitReplication(fs, filePath, (short) 1); |
| cluster.restartDataNodes(); |
| cluster.waitActive(); |
| } |
| } |
| |
| class DataMover { |
| private final MiniDFSCluster cluster; |
| private final int sourceDiskIndex; |
| private final int dataNodeIndex; |
| private final Configuration conf; |
| private final int blockCount; |
| private final int blockSize; |
| private DataNode node; |
| |
| /** |
| * Constructs a DataMover class. |
| * |
| * @param cluster - MiniDFSCluster. |
| * @param dataNodeIndex - Datanode to operate against. |
| * @param sourceDiskIndex - source Disk Index. |
| */ |
| public DataMover(MiniDFSCluster cluster, int dataNodeIndex, int |
| sourceDiskIndex, Configuration conf, int blockSize, int |
| blockCount) { |
| this.cluster = cluster; |
| this.dataNodeIndex = dataNodeIndex; |
| this.node = cluster.getDataNodes().get(dataNodeIndex); |
| this.sourceDiskIndex = sourceDiskIndex; |
| this.conf = conf; |
| this.blockCount = blockCount; |
| this.blockSize = blockSize; |
| } |
| |
| /** |
| * Moves all data to a source disk to create disk imbalance so we can run a |
| * planner. |
| * |
| * @throws IOException |
| */ |
| public void moveDataToSourceDisk() throws IOException { |
| moveAllDataToDestDisk(this.node, sourceDiskIndex); |
| cluster.restartDataNodes(); |
| cluster.waitActive(); |
| |
| } |
| |
| /** |
| * Moves all data in the data node to one disk. |
| * |
| * @param dataNode - Datanode |
| * @param destDiskindex - Index of the destination disk. |
| */ |
| private void moveAllDataToDestDisk(DataNode dataNode, int destDiskindex) |
| throws IOException { |
| Preconditions.checkNotNull(dataNode); |
| Preconditions.checkState(destDiskindex >= 0); |
| try (FsDatasetSpi.FsVolumeReferences refs = |
| dataNode.getFSDataset().getFsVolumeReferences()) { |
| if (refs.size() <= destDiskindex) { |
| throw new IllegalArgumentException("Invalid Disk index."); |
| } |
| FsVolumeImpl dest = (FsVolumeImpl) refs.get(destDiskindex); |
| for (int x = 0; x < refs.size(); x++) { |
| if (x == destDiskindex) { |
| continue; |
| } |
| FsVolumeImpl source = (FsVolumeImpl) refs.get(x); |
| DiskBalancerTestUtil.moveAllDataToDestVolume(dataNode.getFSDataset(), |
| source, dest); |
| |
| } |
| } |
| } |
| |
| /** |
| * Generates a NodePlan for the datanode specified. |
| * |
| * @return NodePlan. |
| */ |
| public NodePlan generatePlan() throws Exception { |
| |
| // Start up a disk balancer and read the cluster info. |
| node = cluster.getDataNodes().get(dataNodeIndex); |
| ClusterConnector nameNodeConnector = |
| ConnectorFactory.getCluster(cluster.getFileSystem(dataNodeIndex) |
| .getUri(), conf); |
| |
| DiskBalancerCluster diskBalancerCluster = |
| new DiskBalancerCluster(nameNodeConnector); |
| diskBalancerCluster.readClusterInfo(); |
| List<DiskBalancerDataNode> nodesToProcess = new LinkedList<>(); |
| |
| // Pick a node to process. |
| nodesToProcess.add(diskBalancerCluster.getNodeByUUID( |
| node.getDatanodeUuid())); |
| diskBalancerCluster.setNodesToProcess(nodesToProcess); |
| |
| // Compute a plan. |
| List<NodePlan> clusterplan = diskBalancerCluster.computePlan(0.0f); |
| |
| // Now we must have a plan,since the node is imbalanced and we |
| // asked the disk balancer to create a plan. |
| assertTrue(clusterplan.size() == 1); |
| |
| NodePlan plan = clusterplan.get(0); |
| plan.setNodeUUID(node.getDatanodeUuid()); |
| plan.setTimeStamp(Time.now()); |
| |
| assertNotNull(plan.getVolumeSetPlans()); |
| assertTrue(plan.getVolumeSetPlans().size() > 0); |
| plan.getVolumeSetPlans().get(0).setTolerancePercent(10); |
| return plan; |
| } |
| |
| /** |
| * Waits for a plan executing to finish. |
| */ |
| public void executePlan(NodePlan plan) throws |
| IOException, TimeoutException, InterruptedException { |
| |
| node = cluster.getDataNodes().get(dataNodeIndex); |
| String planJson = plan.toJson(); |
| String planID = DigestUtils.sha1Hex(planJson); |
| |
| // Submit the plan and wait till the execution is done. |
| node.submitDiskBalancerPlan(planID, 1, PLAN_FILE, planJson, |
| false); |
| String jmxString = node.getDiskBalancerStatus(); |
| assertNotNull(jmxString); |
| DiskBalancerWorkStatus status = |
| DiskBalancerWorkStatus.parseJson(jmxString); |
| DiskBalancerWorkStatus realStatus = node.queryDiskBalancerPlan(); |
| assertEquals(realStatus.getPlanID(), status.getPlanID()); |
| |
| GenericTestUtils.waitFor(new Supplier<Boolean>() { |
| @Override |
| public Boolean get() { |
| try { |
| return node.queryDiskBalancerPlan().getResult() == |
| DiskBalancerWorkStatus.Result.PLAN_DONE; |
| } catch (IOException ex) { |
| return false; |
| } |
| } |
| }, 1000, 100000); |
| } |
| |
| public void executePlanDuringDiskRemove(NodePlan plan) throws |
| IOException, TimeoutException, InterruptedException { |
| CountDownLatch createWorkPlanLatch = new CountDownLatch(1); |
| CountDownLatch removeDiskLatch = new CountDownLatch(1); |
| AtomicInteger errorCount = new AtomicInteger(0); |
| |
| LOG.info("FSDataSet: " + node.getFSDataset()); |
| final FsDatasetSpi<?> fsDatasetSpy = Mockito.spy(node.getFSDataset()); |
| doAnswer(new Answer<Object>() { |
| public Object answer(InvocationOnMock invocation) { |
| try { |
| node.getFSDataset().moveBlockAcrossVolumes( |
| (ExtendedBlock)invocation.getArguments()[0], |
| (FsVolumeSpi) invocation.getArguments()[1]); |
| } catch (Exception e) { |
| errorCount.incrementAndGet(); |
| } |
| return null; |
| } |
| }).when(fsDatasetSpy).moveBlockAcrossVolumes( |
| any(ExtendedBlock.class), any(FsVolumeSpi.class)); |
| |
| DiskBalancerMover diskBalancerMover = new DiskBalancerMover( |
| fsDatasetSpy, conf); |
| diskBalancerMover.setRunnable(); |
| |
| DiskBalancerMover diskBalancerMoverSpy = Mockito.spy(diskBalancerMover); |
| doAnswer(new Answer<Object>() { |
| public Object answer(InvocationOnMock invocation) { |
| createWorkPlanLatch.countDown(); |
| LOG.info("Waiting for the disk removal!"); |
| try { |
| removeDiskLatch.await(); |
| } catch (InterruptedException e) { |
| LOG.info("Encountered " + e); |
| } |
| LOG.info("Got disk removal notification, resuming copyBlocks!"); |
| diskBalancerMover.copyBlocks((VolumePair)(invocation |
| .getArguments()[0]), (DiskBalancerWorkItem)(invocation |
| .getArguments()[1])); |
| return null; |
| } |
| }).when(diskBalancerMoverSpy).copyBlocks( |
| any(VolumePair.class), any(DiskBalancerWorkItem.class)); |
| |
| DiskBalancer diskBalancer = new DiskBalancer(node.getDatanodeUuid(), |
| conf, diskBalancerMoverSpy); |
| |
| List<String> oldDirs = new ArrayList<String>(node.getConf(). |
| getTrimmedStringCollection(DFSConfigKeys.DFS_DATANODE_DATA_DIR_KEY)); |
| final String newDirs = oldDirs.get(0); |
| LOG.info("Reconfigure newDirs:" + newDirs); |
| Thread reconfigThread = new Thread() { |
| public void run() { |
| try { |
| LOG.info("Waiting for work plan creation!"); |
| createWorkPlanLatch.await(); |
| LOG.info("Work plan created. Removing disk!"); |
| assertThat( |
| "DN did not update its own config", node. |
| reconfigurePropertyImpl(DFS_DATANODE_DATA_DIR_KEY, newDirs), |
| is(node.getConf().get(DFS_DATANODE_DATA_DIR_KEY))); |
| Thread.sleep(1000); |
| LOG.info("Removed disk!"); |
| removeDiskLatch.countDown(); |
| } catch (ReconfigurationException | InterruptedException e) { |
| Assert.fail("Unexpected error while reconfiguring: " + e); |
| } |
| } |
| }; |
| reconfigThread.start(); |
| |
| String planJson = plan.toJson(); |
| String planID = DigestUtils.sha1Hex(planJson); |
| diskBalancer.submitPlan(planID, 1, PLAN_FILE, planJson, false); |
| |
| GenericTestUtils.waitFor(new Supplier<Boolean>() { |
| @Override |
| public Boolean get() { |
| try { |
| LOG.info("Work Status: " + diskBalancer. |
| queryWorkStatus().toJsonString()); |
| Result result = diskBalancer.queryWorkStatus().getResult(); |
| return (result == Result.PLAN_DONE); |
| } catch (IOException e) { |
| return false; |
| } |
| } |
| }, 1000, 100000); |
| |
| assertTrue("Disk balancer operation hit max errors!", errorCount.get() <= |
| DFSConfigKeys.DFS_DISK_BALANCER_MAX_DISK_ERRORS_DEFAULT); |
| createWorkPlanLatch.await(); |
| removeDiskLatch.await(); |
| } |
| |
| /** |
| * Verifies the Plan Execution has been done. |
| */ |
| public void verifyPlanExectionDone() throws IOException { |
| node = cluster.getDataNodes().get(dataNodeIndex); |
| assertEquals(node.queryDiskBalancerPlan().getResult(), |
| DiskBalancerWorkStatus.Result.PLAN_DONE); |
| } |
| |
| /** |
| * Once diskBalancer is run, all volumes mush has some data. |
| */ |
| public void verifyAllVolumesHaveData(boolean checkblockPoolCount) throws |
| IOException { |
| node = cluster.getDataNodes().get(dataNodeIndex); |
| try (FsDatasetSpi.FsVolumeReferences refs = |
| node.getFSDataset().getFsVolumeReferences()) { |
| for (FsVolumeSpi volume : refs) { |
| assertTrue(DiskBalancerTestUtil.getBlockCount(volume, checkblockPoolCount) > 0); |
| LOG.info("{} : Block Count : {}", refs, DiskBalancerTestUtil |
| .getBlockCount(volume, checkblockPoolCount)); |
| } |
| } |
| } |
| |
| /** |
| * Verifies that tolerance values are honored correctly. |
| */ |
| public void verifyTolerance(NodePlan plan, int planIndex, int |
| sourceDiskIndex, int tolerance) throws IOException { |
| // Tolerance |
| long delta = (plan.getVolumeSetPlans().get(planIndex).getBytesToMove() |
| * tolerance) / 100; |
| FsVolumeImpl volume = null; |
| try (FsDatasetSpi.FsVolumeReferences refs = |
| node.getFSDataset().getFsVolumeReferences()) { |
| volume = (FsVolumeImpl) refs.get(sourceDiskIndex); |
| assertTrue(DiskBalancerTestUtil.getBlockCount(volume, true) > 0); |
| |
| assertTrue((DiskBalancerTestUtil.getBlockCount(volume, true) * |
| (blockSize + delta)) >= plan.getVolumeSetPlans().get(0) |
| .getBytesToMove()); |
| } |
| } |
| } |
| } |