blob: 073bb532ddf6d508755c6799e23504712ae6675f [file] [log] [blame]
/**
* Licensed to the Apache Software Foundation (ASF) under one or more
* contributor license agreements. See the NOTICE file distributed with this
* work for additional information regarding copyright ownership. The ASF
* licenses this file to you under the Apache License, Version 2.0 (the
* "License"); you may not use this file except in compliance with the License.
* You may obtain a copy of the License at
* <p>
* http://www.apache.org/licenses/LICENSE-2.0
* <p>
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS, WITHOUT
* WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. See the
* License for the specific language governing permissions and limitations under
* the License.
*/
package org.apache.hadoop.hdfs.server.diskbalancer;
import org.apache.hadoop.util.Preconditions;
import java.util.function.Supplier;
import org.apache.commons.codec.digest.DigestUtils;
import org.apache.hadoop.conf.Configuration;
import org.apache.hadoop.conf.ReconfigurationException;
import org.apache.hadoop.fs.FileSystem;
import org.apache.hadoop.fs.Path;
import org.apache.hadoop.fs.StorageType;
import org.apache.hadoop.hdfs.DFSConfigKeys;
import org.apache.hadoop.hdfs.DFSTestUtil;
import org.apache.hadoop.hdfs.HdfsConfiguration;
import org.apache.hadoop.hdfs.MiniDFSCluster;
import org.apache.hadoop.hdfs.MiniDFSNNTopology;
import org.apache.hadoop.hdfs.protocol.ExtendedBlock;
import org.apache.hadoop.hdfs.server.balancer.TestBalancer;
import org.apache.hadoop.hdfs.server.datanode.DataNode;
import org.apache.hadoop.hdfs.server.datanode.DiskBalancer;
import org.apache.hadoop.hdfs.server.datanode.DiskBalancer.DiskBalancerMover;
import org.apache.hadoop.hdfs.server.datanode.DiskBalancer.VolumePair;
import org.apache.hadoop.hdfs.server.datanode.DiskBalancerWorkItem;
import org.apache.hadoop.hdfs.server.datanode.DiskBalancerWorkStatus;
import org.apache.hadoop.hdfs.server.datanode.DiskBalancerWorkStatus.Result;
import org.apache.hadoop.hdfs.server.datanode.fsdataset.FsDatasetSpi;
import org.apache.hadoop.hdfs.server.datanode.fsdataset.FsVolumeSpi;
import org.apache.hadoop.hdfs.server.datanode.fsdataset.impl.FsVolumeImpl;
import org.apache.hadoop.hdfs.server.diskbalancer.connectors.ClusterConnector;
import org.apache.hadoop.hdfs.server.diskbalancer.connectors.ConnectorFactory;
import org.apache.hadoop.hdfs.server.diskbalancer.datamodel.DiskBalancerCluster;
import org.apache.hadoop.hdfs.server.diskbalancer.datamodel.DiskBalancerDataNode;
import org.apache.hadoop.hdfs.server.diskbalancer.planner.NodePlan;
import org.apache.hadoop.test.GenericTestUtils;
import org.apache.hadoop.util.Time;
import org.junit.Assert;
import org.junit.Test;
import org.mockito.Mockito;
import org.mockito.invocation.InvocationOnMock;
import org.mockito.stubbing.Answer;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
import java.io.IOException;
import java.util.ArrayList;
import java.util.LinkedList;
import java.util.List;
import java.util.concurrent.CountDownLatch;
import java.util.concurrent.TimeoutException;
import java.util.concurrent.atomic.AtomicInteger;
import static org.apache.hadoop.hdfs.DFSConfigKeys.DFS_DATANODE_DATA_DIR_KEY;
import static org.hamcrest.core.Is.is;
import static org.junit.Assert.assertEquals;
import static org.junit.Assert.assertNotNull;
import static org.junit.Assert.assertThat;
import static org.junit.Assert.assertTrue;
import static org.mockito.ArgumentMatchers.any;
import static org.mockito.Mockito.doAnswer;
/**
* Test Disk Balancer.
*/
public class TestDiskBalancer {
private static final String PLAN_FILE = "/system/current.plan.json";
static final Logger LOG = LoggerFactory.getLogger(TestDiskBalancer.class);
@Test
public void testDiskBalancerNameNodeConnectivity() throws Exception {
Configuration conf = new HdfsConfiguration();
conf.setBoolean(DFSConfigKeys.DFS_DISK_BALANCER_ENABLED, true);
final int numDatanodes = 2;
MiniDFSCluster cluster = new MiniDFSCluster.Builder(conf)
.numDataNodes(numDatanodes).build();
try {
cluster.waitActive();
ClusterConnector nameNodeConnector =
ConnectorFactory.getCluster(cluster.getFileSystem(0).getUri(), conf);
DiskBalancerCluster diskBalancerCluster =
new DiskBalancerCluster(nameNodeConnector);
diskBalancerCluster.readClusterInfo();
assertEquals(diskBalancerCluster.getNodes().size(), numDatanodes);
DataNode dnNode = cluster.getDataNodes().get(0);
DiskBalancerDataNode dbDnNode =
diskBalancerCluster.getNodeByUUID(dnNode.getDatanodeUuid());
assertEquals(dnNode.getDatanodeUuid(), dbDnNode.getDataNodeUUID());
assertEquals(dnNode.getDatanodeId().getIpAddr(),
dbDnNode.getDataNodeIP());
assertEquals(dnNode.getDatanodeId().getHostName(),
dbDnNode.getDataNodeName());
try (FsDatasetSpi.FsVolumeReferences ref = dnNode.getFSDataset()
.getFsVolumeReferences()) {
assertEquals(ref.size(), dbDnNode.getVolumeCount());
}
// Shutdown the DN first, to verify that calling diskbalancer APIs on
// uninitialized DN doesn't NPE
dnNode.shutdown();
assertEquals("", dnNode.getDiskBalancerStatus());
} finally {
cluster.shutdown();
}
}
/**
* This test simulates a real Data node working with DiskBalancer.
* <p>
* Here is the overview of this test.
* <p>
* 1. Write a bunch of blocks and move them to one disk to create imbalance.
* 2. Rewrite the capacity of the disks in DiskBalancer Model so that planner
* will produce a move plan. 3. Execute the move plan and wait unitl the plan
* is done. 4. Verify the source disk has blocks now.
*
* @throws Exception
*/
@Test
public void testDiskBalancerEndToEnd() throws Exception {
Configuration conf = new HdfsConfiguration();
conf.setBoolean(DFSConfigKeys.DFS_DISK_BALANCER_ENABLED, true);
final int blockCount = 100;
final int blockSize = 1024;
final int diskCount = 2;
final int dataNodeCount = 1;
final int dataNodeIndex = 0;
final int sourceDiskIndex = 0;
final long cap = blockSize * 2L * blockCount;
MiniDFSCluster cluster = new ClusterBuilder()
.setBlockCount(blockCount)
.setBlockSize(blockSize)
.setDiskCount(diskCount)
.setNumDatanodes(dataNodeCount)
.setConf(conf)
.setCapacities(new long[] {cap, cap})
.build();
try {
DataMover dataMover = new DataMover(cluster, dataNodeIndex,
sourceDiskIndex, conf, blockSize, blockCount);
dataMover.moveDataToSourceDisk();
NodePlan plan = dataMover.generatePlan();
dataMover.executePlan(plan);
dataMover.verifyPlanExectionDone();
dataMover.verifyAllVolumesHaveData(true);
dataMover.verifyTolerance(plan, 0, sourceDiskIndex, 10);
} finally {
cluster.shutdown();
}
}
@Test
public void testDiskBalancerWithFederatedCluster() throws Exception {
Configuration conf = new HdfsConfiguration();
conf.setBoolean(DFSConfigKeys.DFS_DISK_BALANCER_ENABLED, true);
final int blockCount = 100;
final int blockSize = 1024;
final int diskCount = 2;
final int dataNodeCount = 1;
final int dataNodeIndex = 0;
final int sourceDiskIndex = 0;
final long cap = blockSize * 3L * blockCount;
conf.setInt(DFSConfigKeys.DFS_BLOCK_SIZE_KEY, blockSize);
conf.setInt(DFSConfigKeys.DFS_BYTES_PER_CHECKSUM_KEY, blockSize);
final MiniDFSCluster cluster = new MiniDFSCluster.Builder(conf)
.nnTopology(MiniDFSNNTopology.simpleFederatedTopology(2))
.numDataNodes(dataNodeCount)
.storagesPerDatanode(diskCount)
.storageCapacities(new long[] {cap, cap})
.build();
cluster.waitActive();
DFSTestUtil.setFederatedConfiguration(cluster, conf);
final String fileName = "/tmp.txt";
final Path filePath = new Path(fileName);
long fileLen = blockCount * blockSize;
FileSystem fs = cluster.getFileSystem(0);
TestBalancer.createFile(cluster, filePath, fileLen, (short) 1,
0);
DFSTestUtil.waitReplication(fs, filePath, (short) 1);
fs = cluster.getFileSystem(1);
TestBalancer.createFile(cluster, filePath, fileLen, (short) 1,
1);
DFSTestUtil.waitReplication(fs, filePath, (short) 1);
try {
DataMover dataMover = new DataMover(cluster, dataNodeIndex,
sourceDiskIndex, conf, blockSize, blockCount);
dataMover.moveDataToSourceDisk();
NodePlan plan = dataMover.generatePlan();
dataMover.executePlan(plan);
dataMover.verifyPlanExectionDone();
dataMover.verifyAllVolumesHaveData(true);
dataMover.verifyTolerance(plan, 0, sourceDiskIndex, 10);
} finally {
cluster.shutdown();
}
}
@Test
public void testDiskBalancerComputeDelay() throws Exception {
Configuration conf = new HdfsConfiguration();
conf.setBoolean(DFSConfigKeys.DFS_DISK_BALANCER_ENABLED, true);
final int blockCount = 100;
final int blockSize = 11 * 1024 * 1024;
final int diskCount = 2;
final int dataNodeCount = 1;
final int dataNodeIndex = 0;
final long cap = blockSize * 2L * blockCount;
conf.setInt(DFSConfigKeys.DFS_BLOCK_SIZE_KEY, blockSize);
conf.setInt(DFSConfigKeys.DFS_BYTES_PER_CHECKSUM_KEY, blockSize);
final MiniDFSCluster cluster = new ClusterBuilder()
.setBlockCount(blockCount).setBlockSize(blockSize)
.setDiskCount(diskCount).setNumDatanodes(dataNodeCount).setConf(conf)
.setCapacities(new long[] {cap, cap }).build();
try {
DataNode node = cluster.getDataNodes().get(dataNodeIndex);
final FsDatasetSpi<?> fsDatasetSpy = Mockito.spy(node.getFSDataset());
DiskBalancerWorkItem item = Mockito.spy(new DiskBalancerWorkItem());
// Mocking bandwidth as 10mb/sec.
Mockito.doReturn((long) 10).when(item).getBandwidth();
doAnswer(new Answer<Object>() {
public Object answer(InvocationOnMock invocation) {
try {
node.getFSDataset().moveBlockAcrossVolumes(
(ExtendedBlock) invocation.getArguments()[0],
(FsVolumeSpi) invocation.getArguments()[1]);
} catch (Exception e) {
LOG.error(e.getMessage());
}
return null;
}
}).when(fsDatasetSpy).moveBlockAcrossVolumes(any(ExtendedBlock.class),
any(FsVolumeSpi.class));
DiskBalancerMover diskBalancerMover = new DiskBalancerMover(fsDatasetSpy,
conf);
diskBalancerMover.setRunnable();
// bytesCopied - 20 * 1024 *1024 byteCopied.
// timeUsed - 1200 in milliseconds
// item - set DiskBalancerWorkItem bandwidth as 10
// Expect return sleep delay in Milliseconds. sleep value = bytesCopied /
// (1024*1024*bandwidth in MB/milli) - timeUsed;
long val = diskBalancerMover.computeDelay(20 * 1024 * 1024, 1200, item);
Assert.assertEquals(val, (long) 800);
} catch (Exception e) {
Assert.fail("Unexpected exception: " + e);
} finally {
if (cluster != null) {
cluster.shutdown();
}
}
}
@Test
public void testDiskBalancerWithFedClusterWithOneNameServiceEmpty() throws
Exception {
Configuration conf = new HdfsConfiguration();
conf.setBoolean(DFSConfigKeys.DFS_DISK_BALANCER_ENABLED, true);
final int blockCount = 100;
final int blockSize = 1024;
final int diskCount = 2;
final int dataNodeCount = 1;
final int dataNodeIndex = 0;
final int sourceDiskIndex = 0;
final long cap = blockSize * 3L * blockCount;
conf.setInt(DFSConfigKeys.DFS_BLOCK_SIZE_KEY, blockSize);
conf.setInt(DFSConfigKeys.DFS_BYTES_PER_CHECKSUM_KEY, blockSize);
final MiniDFSCluster cluster = new MiniDFSCluster.Builder(conf)
.nnTopology(MiniDFSNNTopology.simpleFederatedTopology(2))
.numDataNodes(dataNodeCount)
.storagesPerDatanode(diskCount)
.storageCapacities(new long[] {cap, cap})
.build();
cluster.waitActive();
DFSTestUtil.setFederatedConfiguration(cluster, conf);
final String fileName = "/tmp.txt";
final Path filePath = new Path(fileName);
long fileLen = blockCount * blockSize;
//Writing data only to one nameservice.
FileSystem fs = cluster.getFileSystem(0);
TestBalancer.createFile(cluster, filePath, fileLen, (short) 1,
0);
DFSTestUtil.waitReplication(fs, filePath, (short) 1);
GenericTestUtils.LogCapturer logCapturer = GenericTestUtils.LogCapturer
.captureLogs(DiskBalancer.LOG);
try {
DataMover dataMover = new DataMover(cluster, dataNodeIndex,
sourceDiskIndex, conf, blockSize, blockCount);
dataMover.moveDataToSourceDisk();
NodePlan plan = dataMover.generatePlan();
dataMover.executePlan(plan);
dataMover.verifyPlanExectionDone();
// Because here we have one nameservice empty, don't check blockPoolCount.
dataMover.verifyAllVolumesHaveData(false);
} finally {
String logOut = logCapturer.getOutput();
Assert.assertTrue("Wrong log: " + logOut, logOut.contains(
"NextBlock call returned null. No valid block to copy."));
cluster.shutdown();
}
}
@Test
public void testBalanceDataBetweenMultiplePairsOfVolumes()
throws Exception {
Configuration conf = new HdfsConfiguration();
conf.setBoolean(DFSConfigKeys.DFS_DISK_BALANCER_ENABLED, true);
final int blockCount = 1000;
final int blockSize = 1024;
// create 3 disks, that means we will have 2 plans
// Move Data from disk0->disk1 and disk0->disk2.
final int diskCount = 3;
final int dataNodeCount = 1;
final int dataNodeIndex = 0;
final int sourceDiskIndex = 0;
final long cap = blockSize * 2L * blockCount;
MiniDFSCluster cluster = new ClusterBuilder()
.setBlockCount(blockCount)
.setBlockSize(blockSize)
.setDiskCount(diskCount)
.setNumDatanodes(dataNodeCount)
.setConf(conf)
.setCapacities(new long[] {cap, cap, cap})
.build();
try {
DataMover dataMover = new DataMover(cluster, dataNodeIndex,
sourceDiskIndex, conf, blockSize, blockCount);
dataMover.moveDataToSourceDisk();
NodePlan plan = dataMover.generatePlan();
// 3 disks , The plan should move data both disks,
// so we must have 2 plan steps.
assertEquals(plan.getVolumeSetPlans().size(), 2);
dataMover.executePlan(plan);
dataMover.verifyPlanExectionDone();
dataMover.verifyAllVolumesHaveData(true);
dataMover.verifyTolerance(plan, 0, sourceDiskIndex, 10);
} finally {
cluster.shutdown();
}
}
/**
* Test disk balancer behavior when one of the disks involved
* in balancing operation is removed after submitting the plan.
* @throws Exception
*/
@Test
public void testDiskBalancerWhenRemovingVolumes() throws Exception {
Configuration conf = new HdfsConfiguration();
conf.setBoolean(DFSConfigKeys.DFS_DISK_BALANCER_ENABLED, true);
final int blockCount = 100;
final int blockSize = 1024;
final int diskCount = 2;
final int dataNodeCount = 1;
final int dataNodeIndex = 0;
final int sourceDiskIndex = 0;
final long cap = blockSize * 2L * blockCount;
MiniDFSCluster cluster = new ClusterBuilder()
.setBlockCount(blockCount)
.setBlockSize(blockSize)
.setDiskCount(diskCount)
.setNumDatanodes(dataNodeCount)
.setConf(conf)
.setCapacities(new long[] {cap, cap})
.build();
try {
DataMover dataMover = new DataMover(cluster, dataNodeIndex,
sourceDiskIndex, conf, blockSize, blockCount);
dataMover.moveDataToSourceDisk();
NodePlan plan = dataMover.generatePlan();
dataMover.executePlanDuringDiskRemove(plan);
dataMover.verifyAllVolumesHaveData(true);
dataMover.verifyTolerance(plan, 0, sourceDiskIndex, 10);
} catch (Exception e) {
Assert.fail("Unexpected exception: " + e);
} finally {
if (cluster != null) {
cluster.shutdown();
}
}
}
/**
* Helper class that allows us to create different kinds of MiniDFSClusters
* and populate data.
*/
static class ClusterBuilder {
private Configuration conf;
private int blockSize;
private int numDatanodes;
private int fileLen;
private int blockCount;
private int diskCount;
private long[] capacities;
public ClusterBuilder setConf(Configuration conf) {
this.conf = conf;
return this;
}
public ClusterBuilder setBlockSize(int blockSize) {
this.blockSize = blockSize;
return this;
}
public ClusterBuilder setNumDatanodes(int datanodeCount) {
this.numDatanodes = datanodeCount;
return this;
}
public ClusterBuilder setBlockCount(int blockCount) {
this.blockCount = blockCount;
return this;
}
public ClusterBuilder setDiskCount(int diskCount) {
this.diskCount = diskCount;
return this;
}
private ClusterBuilder setCapacities(final long[] caps) {
this.capacities = caps;
return this;
}
private StorageType[] getStorageTypes(int diskCount) {
Preconditions.checkState(diskCount > 0);
StorageType[] array = new StorageType[diskCount];
for (int x = 0; x < diskCount; x++) {
array[x] = StorageType.DISK;
}
return array;
}
public MiniDFSCluster build() throws IOException, TimeoutException,
InterruptedException {
Preconditions.checkNotNull(this.conf);
Preconditions.checkState(blockSize > 0);
Preconditions.checkState(numDatanodes > 0);
fileLen = blockCount * blockSize;
Preconditions.checkState(fileLen > 0);
conf.setBoolean(DFSConfigKeys.DFS_DISK_BALANCER_ENABLED, true);
conf.setLong(DFSConfigKeys.DFS_BLOCK_SIZE_KEY, blockSize);
conf.setInt(DFSConfigKeys.DFS_BYTES_PER_CHECKSUM_KEY, blockSize);
conf.setLong(DFSConfigKeys.DFS_HEARTBEAT_INTERVAL_KEY, 1L);
final String fileName = "/tmp.txt";
Path filePath = new Path(fileName);
fileLen = blockCount * blockSize;
// Write a file and restart the cluster
MiniDFSCluster cluster = new MiniDFSCluster.Builder(conf)
.numDataNodes(numDatanodes)
.storageCapacities(capacities)
.storageTypes(getStorageTypes(diskCount))
.storagesPerDatanode(diskCount)
.build();
generateData(filePath, cluster);
cluster.restartDataNodes();
cluster.waitActive();
return cluster;
}
private void generateData(Path filePath, MiniDFSCluster cluster)
throws IOException, InterruptedException, TimeoutException {
cluster.waitActive();
FileSystem fs = cluster.getFileSystem(0);
TestBalancer.createFile(cluster, filePath, fileLen, (short) 1,
numDatanodes - 1);
DFSTestUtil.waitReplication(fs, filePath, (short) 1);
cluster.restartDataNodes();
cluster.waitActive();
}
}
class DataMover {
private final MiniDFSCluster cluster;
private final int sourceDiskIndex;
private final int dataNodeIndex;
private final Configuration conf;
private final int blockCount;
private final int blockSize;
private DataNode node;
/**
* Constructs a DataMover class.
*
* @param cluster - MiniDFSCluster.
* @param dataNodeIndex - Datanode to operate against.
* @param sourceDiskIndex - source Disk Index.
*/
public DataMover(MiniDFSCluster cluster, int dataNodeIndex, int
sourceDiskIndex, Configuration conf, int blockSize, int
blockCount) {
this.cluster = cluster;
this.dataNodeIndex = dataNodeIndex;
this.node = cluster.getDataNodes().get(dataNodeIndex);
this.sourceDiskIndex = sourceDiskIndex;
this.conf = conf;
this.blockCount = blockCount;
this.blockSize = blockSize;
}
/**
* Moves all data to a source disk to create disk imbalance so we can run a
* planner.
*
* @throws IOException
*/
public void moveDataToSourceDisk() throws IOException {
moveAllDataToDestDisk(this.node, sourceDiskIndex);
cluster.restartDataNodes();
cluster.waitActive();
}
/**
* Moves all data in the data node to one disk.
*
* @param dataNode - Datanode
* @param destDiskindex - Index of the destination disk.
*/
private void moveAllDataToDestDisk(DataNode dataNode, int destDiskindex)
throws IOException {
Preconditions.checkNotNull(dataNode);
Preconditions.checkState(destDiskindex >= 0);
try (FsDatasetSpi.FsVolumeReferences refs =
dataNode.getFSDataset().getFsVolumeReferences()) {
if (refs.size() <= destDiskindex) {
throw new IllegalArgumentException("Invalid Disk index.");
}
FsVolumeImpl dest = (FsVolumeImpl) refs.get(destDiskindex);
for (int x = 0; x < refs.size(); x++) {
if (x == destDiskindex) {
continue;
}
FsVolumeImpl source = (FsVolumeImpl) refs.get(x);
DiskBalancerTestUtil.moveAllDataToDestVolume(dataNode.getFSDataset(),
source, dest);
}
}
}
/**
* Generates a NodePlan for the datanode specified.
*
* @return NodePlan.
*/
public NodePlan generatePlan() throws Exception {
// Start up a disk balancer and read the cluster info.
node = cluster.getDataNodes().get(dataNodeIndex);
ClusterConnector nameNodeConnector =
ConnectorFactory.getCluster(cluster.getFileSystem(dataNodeIndex)
.getUri(), conf);
DiskBalancerCluster diskBalancerCluster =
new DiskBalancerCluster(nameNodeConnector);
diskBalancerCluster.readClusterInfo();
List<DiskBalancerDataNode> nodesToProcess = new LinkedList<>();
// Pick a node to process.
nodesToProcess.add(diskBalancerCluster.getNodeByUUID(
node.getDatanodeUuid()));
diskBalancerCluster.setNodesToProcess(nodesToProcess);
// Compute a plan.
List<NodePlan> clusterplan = diskBalancerCluster.computePlan(0.0f);
// Now we must have a plan,since the node is imbalanced and we
// asked the disk balancer to create a plan.
assertTrue(clusterplan.size() == 1);
NodePlan plan = clusterplan.get(0);
plan.setNodeUUID(node.getDatanodeUuid());
plan.setTimeStamp(Time.now());
assertNotNull(plan.getVolumeSetPlans());
assertTrue(plan.getVolumeSetPlans().size() > 0);
plan.getVolumeSetPlans().get(0).setTolerancePercent(10);
return plan;
}
/**
* Waits for a plan executing to finish.
*/
public void executePlan(NodePlan plan) throws
IOException, TimeoutException, InterruptedException {
node = cluster.getDataNodes().get(dataNodeIndex);
String planJson = plan.toJson();
String planID = DigestUtils.sha1Hex(planJson);
// Submit the plan and wait till the execution is done.
node.submitDiskBalancerPlan(planID, 1, PLAN_FILE, planJson,
false);
String jmxString = node.getDiskBalancerStatus();
assertNotNull(jmxString);
DiskBalancerWorkStatus status =
DiskBalancerWorkStatus.parseJson(jmxString);
DiskBalancerWorkStatus realStatus = node.queryDiskBalancerPlan();
assertEquals(realStatus.getPlanID(), status.getPlanID());
GenericTestUtils.waitFor(new Supplier<Boolean>() {
@Override
public Boolean get() {
try {
return node.queryDiskBalancerPlan().getResult() ==
DiskBalancerWorkStatus.Result.PLAN_DONE;
} catch (IOException ex) {
return false;
}
}
}, 1000, 100000);
}
public void executePlanDuringDiskRemove(NodePlan plan) throws
IOException, TimeoutException, InterruptedException {
CountDownLatch createWorkPlanLatch = new CountDownLatch(1);
CountDownLatch removeDiskLatch = new CountDownLatch(1);
AtomicInteger errorCount = new AtomicInteger(0);
LOG.info("FSDataSet: " + node.getFSDataset());
final FsDatasetSpi<?> fsDatasetSpy = Mockito.spy(node.getFSDataset());
doAnswer(new Answer<Object>() {
public Object answer(InvocationOnMock invocation) {
try {
node.getFSDataset().moveBlockAcrossVolumes(
(ExtendedBlock)invocation.getArguments()[0],
(FsVolumeSpi) invocation.getArguments()[1]);
} catch (Exception e) {
errorCount.incrementAndGet();
}
return null;
}
}).when(fsDatasetSpy).moveBlockAcrossVolumes(
any(ExtendedBlock.class), any(FsVolumeSpi.class));
DiskBalancerMover diskBalancerMover = new DiskBalancerMover(
fsDatasetSpy, conf);
diskBalancerMover.setRunnable();
DiskBalancerMover diskBalancerMoverSpy = Mockito.spy(diskBalancerMover);
doAnswer(new Answer<Object>() {
public Object answer(InvocationOnMock invocation) {
createWorkPlanLatch.countDown();
LOG.info("Waiting for the disk removal!");
try {
removeDiskLatch.await();
} catch (InterruptedException e) {
LOG.info("Encountered " + e);
}
LOG.info("Got disk removal notification, resuming copyBlocks!");
diskBalancerMover.copyBlocks((VolumePair)(invocation
.getArguments()[0]), (DiskBalancerWorkItem)(invocation
.getArguments()[1]));
return null;
}
}).when(diskBalancerMoverSpy).copyBlocks(
any(VolumePair.class), any(DiskBalancerWorkItem.class));
DiskBalancer diskBalancer = new DiskBalancer(node.getDatanodeUuid(),
conf, diskBalancerMoverSpy);
List<String> oldDirs = new ArrayList<String>(node.getConf().
getTrimmedStringCollection(DFSConfigKeys.DFS_DATANODE_DATA_DIR_KEY));
final String newDirs = oldDirs.get(0);
LOG.info("Reconfigure newDirs:" + newDirs);
Thread reconfigThread = new Thread() {
public void run() {
try {
LOG.info("Waiting for work plan creation!");
createWorkPlanLatch.await();
LOG.info("Work plan created. Removing disk!");
assertThat(
"DN did not update its own config", node.
reconfigurePropertyImpl(DFS_DATANODE_DATA_DIR_KEY, newDirs),
is(node.getConf().get(DFS_DATANODE_DATA_DIR_KEY)));
Thread.sleep(1000);
LOG.info("Removed disk!");
removeDiskLatch.countDown();
} catch (ReconfigurationException | InterruptedException e) {
Assert.fail("Unexpected error while reconfiguring: " + e);
}
}
};
reconfigThread.start();
String planJson = plan.toJson();
String planID = DigestUtils.sha1Hex(planJson);
diskBalancer.submitPlan(planID, 1, PLAN_FILE, planJson, false);
GenericTestUtils.waitFor(new Supplier<Boolean>() {
@Override
public Boolean get() {
try {
LOG.info("Work Status: " + diskBalancer.
queryWorkStatus().toJsonString());
Result result = diskBalancer.queryWorkStatus().getResult();
return (result == Result.PLAN_DONE);
} catch (IOException e) {
return false;
}
}
}, 1000, 100000);
assertTrue("Disk balancer operation hit max errors!", errorCount.get() <=
DFSConfigKeys.DFS_DISK_BALANCER_MAX_DISK_ERRORS_DEFAULT);
createWorkPlanLatch.await();
removeDiskLatch.await();
}
/**
* Verifies the Plan Execution has been done.
*/
public void verifyPlanExectionDone() throws IOException {
node = cluster.getDataNodes().get(dataNodeIndex);
assertEquals(node.queryDiskBalancerPlan().getResult(),
DiskBalancerWorkStatus.Result.PLAN_DONE);
}
/**
* Once diskBalancer is run, all volumes mush has some data.
*/
public void verifyAllVolumesHaveData(boolean checkblockPoolCount) throws
IOException {
node = cluster.getDataNodes().get(dataNodeIndex);
try (FsDatasetSpi.FsVolumeReferences refs =
node.getFSDataset().getFsVolumeReferences()) {
for (FsVolumeSpi volume : refs) {
assertTrue(DiskBalancerTestUtil.getBlockCount(volume, checkblockPoolCount) > 0);
LOG.info("{} : Block Count : {}", refs, DiskBalancerTestUtil
.getBlockCount(volume, checkblockPoolCount));
}
}
}
/**
* Verifies that tolerance values are honored correctly.
*/
public void verifyTolerance(NodePlan plan, int planIndex, int
sourceDiskIndex, int tolerance) throws IOException {
// Tolerance
long delta = (plan.getVolumeSetPlans().get(planIndex).getBytesToMove()
* tolerance) / 100;
FsVolumeImpl volume = null;
try (FsDatasetSpi.FsVolumeReferences refs =
node.getFSDataset().getFsVolumeReferences()) {
volume = (FsVolumeImpl) refs.get(sourceDiskIndex);
assertTrue(DiskBalancerTestUtil.getBlockCount(volume, true) > 0);
assertTrue((DiskBalancerTestUtil.getBlockCount(volume, true) *
(blockSize + delta)) >= plan.getVolumeSetPlans().get(0)
.getBytesToMove());
}
}
}
}