blob: 2a496fb0ec22234fd2a577fca19f7b77feae1c0d [file] [log] [blame]
/**
* Licensed to the Apache Software Foundation (ASF) under one or more
* contributor license agreements. See the NOTICE file distributed with this
* work for additional information regarding copyright ownership. The ASF
* licenses this file to you under the Apache License, Version 2.0 (the
* "License"); you may not use this file except in compliance with the License.
* You may obtain a copy of the License at
* <p/>
* http://www.apache.org/licenses/LICENSE-2.0
* <p/>
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS, WITHOUT
* WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. See the
* License for the specific language governing permissions and limitations under
* the License.
*/
package org.apache.hadoop.hdfs.server.diskbalancer;
import org.apache.hadoop.thirdparty.com.google.common.base.Preconditions;
import org.apache.hadoop.conf.Configuration;
import org.apache.hadoop.fs.FileSystem;
import org.apache.hadoop.fs.Path;
import org.apache.hadoop.fs.StorageType;
import org.apache.hadoop.hdfs.DFSConfigKeys;
import org.apache.hadoop.hdfs.DFSTestUtil;
import org.apache.hadoop.hdfs.MiniDFSCluster;
import org.apache.hadoop.hdfs.protocol.ExtendedBlock;
import org.apache.hadoop.hdfs.server.balancer.TestBalancer;
import org.apache.hadoop.hdfs.server.common.HdfsServerConstants.StartupOption;
import org.apache.hadoop.hdfs.server.datanode.DataNode;
import org.apache.hadoop.hdfs.server.datanode.fsdataset.FsDatasetSpi;
import org.apache.hadoop.hdfs.server.datanode.fsdataset.FsVolumeSpi;
import org.apache.hadoop.hdfs.server.datanode.fsdataset.impl.FsVolumeImpl;
import org.apache.hadoop.hdfs.server.diskbalancer.connectors.NullConnector;
import org.apache.hadoop.hdfs.server.diskbalancer.datamodel.DiskBalancerCluster;
import org.apache.hadoop.hdfs.server.diskbalancer.datamodel.DiskBalancerDataNode;
import org.apache.hadoop.hdfs.server.diskbalancer.datamodel.DiskBalancerVolume;
import org.apache.hadoop.hdfs.server.diskbalancer.datamodel.DiskBalancerVolumeSet;
import org.apache.hadoop.test.GenericTestUtils;
import org.apache.hadoop.util.Time;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
import static org.junit.Assert.assertEquals;
import static org.junit.Assert.assertTrue;
import java.io.File;
import java.io.IOException;
import java.util.Random;
import java.util.UUID;
import java.util.concurrent.TimeoutException;
/**
* Helper class to create various cluster configurations at run time.
*/
public class DiskBalancerTestUtil {
static final Logger LOG = LoggerFactory.getLogger(TestDiskBalancer.class);
public static final long MB = 1024 * 1024L;
public static final long GB = MB * 1024L;
public static final long TB = GB * 1024L;
private static int[] diskSizes =
{1, 2, 3, 4, 5, 6, 7, 8, 9, 100, 200, 300, 400, 500, 600, 700, 800, 900};
private Random rand;
private String stringTable =
"ABCDEDFGHIJKLMNOPQRSTUVWXYZabcdefghijklmnopqrstuvwxyz0987654321";
/**
* Constructs a util class.
*/
public DiskBalancerTestUtil() {
this.rand = new Random(Time.monotonicNow());
}
/**
* Returns a random string.
*
* @param length - Number of chars in the string
* @return random String
*/
private String getRandomName(int length) {
StringBuilder name = new StringBuilder();
for (int x = 0; x < length; x++) {
name.append(stringTable.charAt(rand.nextInt(stringTable.length())));
}
return name.toString();
}
/**
* Returns a Random Storage Type.
*
* @return - StorageType
*/
private StorageType getRandomStorageType() {
return StorageType.parseStorageType(rand.nextInt(3));
}
/**
* Returns random capacity, if the size is smaller than 10
* they are TBs otherwise the size is assigned to GB range.
*
* @return Long - Disk Size
*/
private long getRandomCapacity() {
int size = diskSizes[rand.nextInt(diskSizes.length)];
if (size < 10) {
return size * TB;
} else {
return size * GB;
}
}
/**
* Some value under 20% in these tests.
*/
private long getRandomReserved(long capacity) {
double rcap = capacity * 0.2d;
double randDouble = rand.nextDouble();
double temp = randDouble * rcap;
return (new Double(temp)).longValue();
}
/**
* Some value less that capacity - reserved.
*/
private long getRandomDfsUsed(long capacity, long reserved) {
double rcap = capacity - reserved;
double randDouble = rand.nextDouble();
double temp = randDouble * rcap;
return (new Double(temp)).longValue();
}
/**
* Creates a Random Volume of a specific storageType.
*
* @return Volume
*/
public DiskBalancerVolume createRandomVolume() {
return createRandomVolume(getRandomStorageType());
}
/**
* Creates a Random Volume for testing purpose.
*
* @param type - StorageType
* @return DiskBalancerVolume
*/
public DiskBalancerVolume createRandomVolume(StorageType type) {
DiskBalancerVolume volume = new DiskBalancerVolume();
volume.setPath("/tmp/disk/" + getRandomName(10));
volume.setStorageType(type.toString());
volume.setTransient(type.isTransient());
volume.setCapacity(getRandomCapacity());
volume.setReserved(getRandomReserved(volume.getCapacity()));
volume
.setUsed(getRandomDfsUsed(volume.getCapacity(), volume.getReserved()));
volume.setUuid(UUID.randomUUID().toString());
return volume;
}
/**
* Creates a RandomVolumeSet.
*
* @param type - Storage Type
* @param diskCount - How many disks you need.
* @return volumeSet
* @throws Exception
*/
public DiskBalancerVolumeSet createRandomVolumeSet(StorageType type,
int diskCount)
throws Exception {
Preconditions.checkState(diskCount > 0);
DiskBalancerVolumeSet volumeSet =
new DiskBalancerVolumeSet(type.isTransient());
for (int x = 0; x < diskCount; x++) {
volumeSet.addVolume(createRandomVolume(type));
}
assert (volumeSet.getVolumeCount() == diskCount);
return volumeSet;
}
/**
* Creates a RandomDataNode.
*
* @param diskTypes - Storage types needed in the Node
* @param diskCount - Disk count - that many disks of each type is created
* @return DataNode
* @throws Exception
*/
public DiskBalancerDataNode createRandomDataNode(StorageType[] diskTypes,
int diskCount)
throws Exception {
Preconditions.checkState(diskTypes.length > 0);
Preconditions.checkState(diskCount > 0);
DiskBalancerDataNode node =
new DiskBalancerDataNode(UUID.randomUUID().toString());
for (StorageType t : diskTypes) {
DiskBalancerVolumeSet vSet = createRandomVolumeSet(t, diskCount);
for (DiskBalancerVolume v : vSet.getVolumes()) {
node.addVolume(v);
}
}
return node;
}
/**
* Creates a RandomCluster.
*
* @param dataNodeCount - How many nodes you need
* @param diskTypes - StorageTypes you need in each node
* @param diskCount - How many disks you need of each type.
* @return Cluster
* @throws Exception
*/
public DiskBalancerCluster createRandCluster(int dataNodeCount,
StorageType[] diskTypes,
int diskCount)
throws Exception {
Preconditions.checkState(diskTypes.length > 0);
Preconditions.checkState(diskCount > 0);
Preconditions.checkState(dataNodeCount > 0);
NullConnector nullConnector = new NullConnector();
DiskBalancerCluster cluster = new DiskBalancerCluster(nullConnector);
// once we add these nodes into the connector, cluster will read them
// from the connector.
for (int x = 0; x < dataNodeCount; x++) {
nullConnector.addNode(createRandomDataNode(diskTypes, diskCount));
}
// with this call we have populated the cluster info
cluster.readClusterInfo();
return cluster;
}
/**
* Returns the number of blocks on a volume.
*
* @param source - Source Volume.
* @return Number of Blocks.
* @throws IOException
*/
public static int getBlockCount(FsVolumeSpi source,
boolean checkblockPoolCount)
throws IOException {
int count = 0;
for (String blockPoolID : source.getBlockPoolList()) {
FsVolumeSpi.BlockIterator sourceIter =
source.newBlockIterator(blockPoolID, "TestDiskBalancerSource");
int blockCount = 0;
while (!sourceIter.atEnd()) {
ExtendedBlock block = sourceIter.nextBlock();
if (block != null) {
blockCount++;
}
}
if (checkblockPoolCount) {
LOG.info("Block Pool Id: {}, blockCount: {}", blockPoolID, blockCount);
assertTrue(blockCount > 0);
}
count += blockCount;
}
return count;
}
public static MiniDFSCluster newImbalancedCluster(
final Configuration conf,
final int numDatanodes,
final long[] storageCapacities,
final int defaultBlockSize,
final int fileLen)
throws IOException, InterruptedException, TimeoutException {
return newImbalancedCluster(
conf,
numDatanodes,
storageCapacities,
defaultBlockSize,
fileLen,
null);
}
public static MiniDFSCluster newImbalancedCluster(
final Configuration conf,
final int numDatanodes,
final long[] storageCapacities,
final int defaultBlockSize,
final int fileLen,
final StartupOption dnOption)
throws IOException, InterruptedException, TimeoutException {
conf.setBoolean(DFSConfigKeys.DFS_DISK_BALANCER_ENABLED, true);
conf.setLong(DFSConfigKeys.DFS_BLOCK_SIZE_KEY, defaultBlockSize);
conf.setInt(DFSConfigKeys.DFS_BYTES_PER_CHECKSUM_KEY, defaultBlockSize);
conf.setLong(DFSConfigKeys.DFS_HEARTBEAT_INTERVAL_KEY, 1L);
final String fileName = "/" + UUID.randomUUID().toString();
final Path filePath = new Path(fileName);
Preconditions.checkNotNull(storageCapacities);
Preconditions.checkArgument(
storageCapacities.length == 2,
"need to specify capacities for two storages.");
// Write a file and restart the cluster
File basedir = new File(GenericTestUtils.getRandomizedTempPath());
MiniDFSCluster cluster = new MiniDFSCluster.Builder(conf, basedir)
.numDataNodes(numDatanodes)
.storageCapacities(storageCapacities)
.storageTypes(new StorageType[]{StorageType.DISK, StorageType.DISK})
.storagesPerDatanode(2)
.dnStartupOption(dnOption)
.build();
FsVolumeImpl source = null;
FsVolumeImpl dest = null;
cluster.waitActive();
Random r = new Random();
FileSystem fs = cluster.getFileSystem(0);
TestBalancer.createFile(cluster, filePath, fileLen, (short) 1, 0);
DFSTestUtil.waitReplication(fs, filePath, (short) 1);
cluster.restartDataNodes();
cluster.waitActive();
// Get the data node and move all data to one disk.
for (int i = 0; i < numDatanodes; i++) {
DataNode dnNode = cluster.getDataNodes().get(i);
try (FsDatasetSpi.FsVolumeReferences refs =
dnNode.getFSDataset().getFsVolumeReferences()) {
source = (FsVolumeImpl) refs.get(0);
dest = (FsVolumeImpl) refs.get(1);
assertTrue(DiskBalancerTestUtil.getBlockCount(source, true) > 0);
DiskBalancerTestUtil.moveAllDataToDestVolume(dnNode.getFSDataset(),
source, dest);
assertEquals(0, DiskBalancerTestUtil.getBlockCount(source, false));
}
}
cluster.restartDataNodes();
cluster.waitActive();
return cluster;
}
/**
* Moves all blocks to the destination volume.
*
* @param fsDataset - Dataset
* @param source - Source Volume.
* @param dest - Destination Volume.
* @throws IOException
*/
public static void moveAllDataToDestVolume(FsDatasetSpi fsDataset,
FsVolumeSpi source, FsVolumeSpi dest) throws IOException {
for (String blockPoolID : source.getBlockPoolList()) {
FsVolumeSpi.BlockIterator sourceIter =
source.newBlockIterator(blockPoolID, "TestDiskBalancerSource");
while (!sourceIter.atEnd()) {
ExtendedBlock block = sourceIter.nextBlock();
if (block != null) {
fsDataset.moveBlockAcrossVolumes(block, dest);
}
}
}
}
}