| /** |
| * Licensed to the Apache Software Foundation (ASF) under one |
| * or more contributor license agreements. See the NOTICE file |
| * distributed with this work for additional information |
| * regarding copyright ownership. The ASF licenses this file |
| * to you under the Apache License, Version 2.0 (the |
| * "License"); you may not use this file except in compliance |
| * with the License. You may obtain a copy of the License at |
| * |
| * http://www.apache.org/licenses/LICENSE-2.0 |
| * |
| * Unless required by applicable law or agreed to in writing, software |
| * distributed under the License is distributed on an "AS IS" BASIS, |
| * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. |
| * See the License for the specific language governing permissions and |
| * limitations under the License. |
| */ |
| package org.apache.hadoop.hdfs.server.namenode.sps; |
| |
| import java.io.IOException; |
| import java.util.ArrayList; |
| import java.util.List; |
| import java.util.concurrent.TimeoutException; |
| |
| import org.apache.hadoop.conf.Configuration; |
| import org.apache.hadoop.fs.FileSystem; |
| import org.apache.hadoop.fs.Path; |
| import org.apache.hadoop.fs.StorageType; |
| import org.apache.hadoop.fs.permission.FsPermission; |
| import org.apache.hadoop.hdfs.DFSConfigKeys; |
| import org.apache.hadoop.hdfs.DFSTestUtil; |
| import org.apache.hadoop.hdfs.DistributedFileSystem; |
| import org.apache.hadoop.hdfs.HdfsConfiguration; |
| import org.apache.hadoop.hdfs.MiniDFSCluster; |
| import org.apache.hadoop.hdfs.MiniDFSCluster.DataNodeProperties; |
| import org.apache.hadoop.hdfs.NameNodeProxies; |
| import org.apache.hadoop.hdfs.StripedFileTestUtil; |
| import org.apache.hadoop.hdfs.client.HdfsAdmin; |
| import org.apache.hadoop.hdfs.protocol.ClientProtocol; |
| import org.apache.hadoop.hdfs.protocol.ErasureCodingPolicy; |
| import org.apache.hadoop.hdfs.protocol.HdfsConstants; |
| import org.apache.hadoop.hdfs.protocol.LocatedBlock; |
| import org.apache.hadoop.hdfs.protocol.LocatedBlocks; |
| import org.apache.hadoop.hdfs.protocol.HdfsConstants.StoragePolicySatisfierMode; |
| import org.apache.hadoop.hdfs.server.balancer.NameNodeConnector; |
| import org.apache.hadoop.hdfs.server.common.HdfsServerConstants; |
| import org.apache.hadoop.hdfs.server.sps.ExternalSPSContext; |
| import org.apache.hadoop.test.GenericTestUtils; |
| import org.junit.Assert; |
| import org.junit.Before; |
| import org.junit.Test; |
| import org.slf4j.Logger; |
| import org.slf4j.LoggerFactory; |
| |
| import com.google.common.base.Supplier; |
| |
| /** |
| * Tests that StoragePolicySatisfier daemon is able to check the striped blocks |
| * to be moved and finding its expected target locations in order to satisfy the |
| * storage policy. |
| */ |
| public class TestStoragePolicySatisfierWithStripedFile { |
| |
| private static final Logger LOG = LoggerFactory |
| .getLogger(TestStoragePolicySatisfierWithStripedFile.class); |
| |
| private final int stripesPerBlock = 2; |
| |
| private ErasureCodingPolicy ecPolicy; |
| private int dataBlocks; |
| private int parityBlocks; |
| private int cellSize; |
| private int defaultStripeBlockSize; |
| private Configuration conf; |
| private StoragePolicySatisfier sps; |
| private ExternalSPSContext ctxt; |
| private NameNodeConnector nnc; |
| |
| private ErasureCodingPolicy getEcPolicy() { |
| return StripedFileTestUtil.getDefaultECPolicy(); |
| } |
| |
| /** |
| * Initialize erasure coding policy. |
| */ |
| @Before |
| public void init(){ |
| ecPolicy = getEcPolicy(); |
| dataBlocks = ecPolicy.getNumDataUnits(); |
| parityBlocks = ecPolicy.getNumParityUnits(); |
| cellSize = ecPolicy.getCellSize(); |
| defaultStripeBlockSize = cellSize * stripesPerBlock; |
| conf = new HdfsConfiguration(); |
| conf.set(DFSConfigKeys.DFS_STORAGE_POLICY_SATISFIER_MODE_KEY, |
| StoragePolicySatisfierMode.EXTERNAL.toString()); |
| // Reduced refresh cycle to update latest datanodes. |
| conf.setLong(DFSConfigKeys.DFS_SPS_DATANODE_CACHE_REFRESH_INTERVAL_MS, |
| 1000); |
| conf.setInt( |
| DFSConfigKeys.DFS_STORAGE_POLICY_SATISFIER_MAX_RETRY_ATTEMPTS_KEY, 30); |
| initConfWithStripe(conf, defaultStripeBlockSize); |
| } |
| |
| /** |
| * Tests to verify that all the striped blocks(data + parity blocks) are |
| * moving to satisfy the storage policy. |
| */ |
| @Test(timeout = 300000) |
| public void testMoverWithFullStripe() throws Exception { |
| // start 11 datanodes |
| int numOfDatanodes = 11; |
| int storagesPerDatanode = 2; |
| long capacity = 20 * defaultStripeBlockSize; |
| long[][] capacities = new long[numOfDatanodes][storagesPerDatanode]; |
| for (int i = 0; i < numOfDatanodes; i++) { |
| for (int j = 0; j < storagesPerDatanode; j++) { |
| capacities[i][j] = capacity; |
| } |
| } |
| |
| final MiniDFSCluster cluster = new MiniDFSCluster.Builder(conf) |
| .numDataNodes(numOfDatanodes) |
| .storagesPerDatanode(storagesPerDatanode) |
| .storageTypes(new StorageType[][]{ |
| {StorageType.DISK, StorageType.DISK}, |
| {StorageType.DISK, StorageType.DISK}, |
| {StorageType.DISK, StorageType.DISK}, |
| {StorageType.DISK, StorageType.DISK}, |
| {StorageType.DISK, StorageType.DISK}, |
| {StorageType.DISK, StorageType.DISK}, |
| {StorageType.DISK, StorageType.ARCHIVE}, |
| {StorageType.DISK, StorageType.ARCHIVE}, |
| {StorageType.DISK, StorageType.ARCHIVE}, |
| {StorageType.DISK, StorageType.ARCHIVE}, |
| {StorageType.DISK, StorageType.ARCHIVE}}) |
| .storageCapacities(capacities) |
| .build(); |
| |
| HdfsAdmin hdfsAdmin = new HdfsAdmin(FileSystem.getDefaultUri(conf), conf); |
| try { |
| cluster.waitActive(); |
| startSPS(); |
| DistributedFileSystem dfs = cluster.getFileSystem(); |
| dfs.enableErasureCodingPolicy( |
| StripedFileTestUtil.getDefaultECPolicy().getName()); |
| |
| // set "/bar" directory with HOT storage policy. |
| ClientProtocol client = NameNodeProxies.createProxy(conf, |
| cluster.getFileSystem(0).getUri(), ClientProtocol.class).getProxy(); |
| String barDir = "/bar"; |
| client.mkdirs(barDir, new FsPermission((short) 777), true); |
| client.setStoragePolicy(barDir, HdfsConstants.HOT_STORAGE_POLICY_NAME); |
| // set an EC policy on "/bar" directory |
| client.setErasureCodingPolicy(barDir, |
| StripedFileTestUtil.getDefaultECPolicy().getName()); |
| |
| // write file to barDir |
| final String fooFile = "/bar/foo"; |
| long fileLen = cellSize * dataBlocks; |
| DFSTestUtil.createFile(cluster.getFileSystem(), new Path(fooFile), |
| fileLen, (short) 3, 0); |
| |
| // verify storage types and locations |
| LocatedBlocks locatedBlocks = client.getBlockLocations(fooFile, 0, |
| fileLen); |
| for (LocatedBlock lb : locatedBlocks.getLocatedBlocks()) { |
| for (StorageType type : lb.getStorageTypes()) { |
| Assert.assertEquals(StorageType.DISK, type); |
| } |
| } |
| StripedFileTestUtil.verifyLocatedStripedBlocks(locatedBlocks, |
| dataBlocks + parityBlocks); |
| |
| // start 5 more datanodes |
| int numOfNewDatanodes = 5; |
| capacities = new long[numOfNewDatanodes][storagesPerDatanode]; |
| for (int i = 0; i < numOfNewDatanodes; i++) { |
| for (int j = 0; j < storagesPerDatanode; j++) { |
| capacities[i][j] = capacity; |
| } |
| } |
| cluster.startDataNodes(conf, 5, |
| new StorageType[][]{ |
| {StorageType.ARCHIVE, StorageType.ARCHIVE}, |
| {StorageType.ARCHIVE, StorageType.ARCHIVE}, |
| {StorageType.ARCHIVE, StorageType.ARCHIVE}, |
| {StorageType.ARCHIVE, StorageType.ARCHIVE}, |
| {StorageType.ARCHIVE, StorageType.ARCHIVE}}, |
| true, null, null, null, capacities, null, false, false, false, null); |
| cluster.triggerHeartbeats(); |
| |
| // move file to ARCHIVE |
| client.setStoragePolicy(barDir, "COLD"); |
| hdfsAdmin.satisfyStoragePolicy(new Path(fooFile)); |
| LOG.info("Sets storage policy to COLD and invoked satisfyStoragePolicy"); |
| cluster.triggerHeartbeats(); |
| |
| // verify storage types and locations |
| waitExpectedStorageType(cluster, fooFile, fileLen, StorageType.ARCHIVE, 9, |
| 9, 60000); |
| } finally { |
| cluster.shutdown(); |
| sps.stopGracefully(); |
| } |
| } |
| |
| /** |
| * Tests to verify that only few datanodes are available and few striped |
| * blocks are able to move. Others are still trying to find available nodes. |
| * |
| * For example, we have 3 nodes A(disk, disk), B(disk, disk), C(disk, archive) |
| * |
| * Assume a block with storage locations A(disk), B(disk), C(disk). Now, set |
| * policy as COLD and invoked {@link HdfsAdmin#satisfyStoragePolicy(Path)}, |
| * while choosing the target node for A, it shouldn't choose C. For C, it |
| * should do local block movement as it has ARCHIVE storage type. |
| */ |
| @Test(timeout = 300000) |
| public void testWhenOnlyFewTargetNodesAreAvailableToSatisfyStoragePolicy() |
| throws Exception { |
| // start 10 datanodes |
| int numOfDatanodes = 11; |
| int storagesPerDatanode = 2; |
| long capacity = 20 * defaultStripeBlockSize; |
| long[][] capacities = new long[numOfDatanodes][storagesPerDatanode]; |
| for (int i = 0; i < numOfDatanodes; i++) { |
| for (int j = 0; j < storagesPerDatanode; j++) { |
| capacities[i][j] = capacity; |
| } |
| } |
| |
| final MiniDFSCluster cluster = new MiniDFSCluster.Builder(conf) |
| .numDataNodes(numOfDatanodes) |
| .storagesPerDatanode(storagesPerDatanode) |
| .storageTypes(new StorageType[][]{ |
| {StorageType.DISK, StorageType.DISK}, |
| {StorageType.DISK, StorageType.DISK}, |
| {StorageType.DISK, StorageType.DISK}, |
| {StorageType.DISK, StorageType.DISK}, |
| {StorageType.DISK, StorageType.DISK}, |
| {StorageType.DISK, StorageType.DISK}, |
| {StorageType.DISK, StorageType.DISK}, |
| {StorageType.DISK, StorageType.DISK}, |
| {StorageType.DISK, StorageType.ARCHIVE}, |
| {StorageType.DISK, StorageType.ARCHIVE}, |
| {StorageType.DISK, StorageType.ARCHIVE}}) |
| .storageCapacities(capacities) |
| .build(); |
| |
| HdfsAdmin hdfsAdmin = new HdfsAdmin(FileSystem.getDefaultUri(conf), conf); |
| try { |
| cluster.waitActive(); |
| startSPS(); |
| DistributedFileSystem dfs = cluster.getFileSystem(); |
| dfs.enableErasureCodingPolicy( |
| StripedFileTestUtil.getDefaultECPolicy().getName()); |
| // set "/bar" directory with HOT storage policy. |
| ClientProtocol client = NameNodeProxies.createProxy(conf, |
| cluster.getFileSystem(0).getUri(), ClientProtocol.class).getProxy(); |
| String barDir = "/bar"; |
| client.mkdirs(barDir, new FsPermission((short) 777), true); |
| client.setStoragePolicy(barDir, HdfsConstants.HOT_STORAGE_POLICY_NAME); |
| // set an EC policy on "/bar" directory |
| client.setErasureCodingPolicy(barDir, |
| StripedFileTestUtil.getDefaultECPolicy().getName()); |
| |
| // write file to barDir |
| final String fooFile = "/bar/foo"; |
| long fileLen = cellSize * dataBlocks; |
| DFSTestUtil.createFile(cluster.getFileSystem(), new Path(fooFile), |
| fileLen, (short) 3, 0); |
| |
| // verify storage types and locations |
| LocatedBlocks locatedBlocks = client.getBlockLocations(fooFile, 0, |
| fileLen); |
| for (LocatedBlock lb : locatedBlocks.getLocatedBlocks()) { |
| for (StorageType type : lb.getStorageTypes()) { |
| Assert.assertEquals(StorageType.DISK, type); |
| } |
| } |
| Thread.sleep(5000); |
| StripedFileTestUtil.verifyLocatedStripedBlocks(locatedBlocks, |
| dataBlocks + parityBlocks); |
| |
| // start 2 more datanodes |
| int numOfNewDatanodes = 2; |
| capacities = new long[numOfNewDatanodes][storagesPerDatanode]; |
| for (int i = 0; i < numOfNewDatanodes; i++) { |
| for (int j = 0; j < storagesPerDatanode; j++) { |
| capacities[i][j] = capacity; |
| } |
| } |
| cluster.startDataNodes(conf, 2, |
| new StorageType[][]{ |
| {StorageType.ARCHIVE, StorageType.ARCHIVE}, |
| {StorageType.ARCHIVE, StorageType.ARCHIVE}}, |
| true, null, null, null, capacities, null, false, false, false, null); |
| cluster.triggerHeartbeats(); |
| |
| // Move file to ARCHIVE. Only 5 datanodes are available with ARCHIVE |
| // storage type. |
| client.setStoragePolicy(barDir, "COLD"); |
| hdfsAdmin.satisfyStoragePolicy(new Path(fooFile)); |
| LOG.info("Sets storage policy to COLD and invoked satisfyStoragePolicy"); |
| cluster.triggerHeartbeats(); |
| |
| waitForAttemptedItems(1, 30000); |
| // verify storage types and locations. |
| waitExpectedStorageType(cluster, fooFile, fileLen, StorageType.ARCHIVE, 5, |
| 9, 60000); |
| } finally { |
| cluster.shutdown(); |
| sps.stopGracefully(); |
| } |
| } |
| |
| /** |
| * Test SPS for low redundant file blocks. |
| * 1. Create cluster with 10 datanode. |
| * 1. Create one striped file with default EC Policy. |
| * 2. Set policy and call satisfyStoragePolicy for file. |
| * 3. Stop NameNode and Datanodes. |
| * 4. Start NameNode with 5 datanode and wait for block movement. |
| * 5. Start remaining 5 datanode. |
| * 6. All replica should be moved in proper storage based on policy. |
| */ |
| @Test(timeout = 300000) |
| public void testSPSWhenFileHasLowRedundancyBlocks() throws Exception { |
| // start 9 datanodes |
| int numOfDatanodes = 9; |
| int storagesPerDatanode = 2; |
| long capacity = 20 * defaultStripeBlockSize; |
| long[][] capacities = new long[numOfDatanodes][storagesPerDatanode]; |
| for (int i = 0; i < numOfDatanodes; i++) { |
| for (int j = 0; j < storagesPerDatanode; j++) { |
| capacities[i][j] = capacity; |
| } |
| } |
| |
| conf.set(DFSConfigKeys |
| .DFS_STORAGE_POLICY_SATISFIER_RECHECK_TIMEOUT_MILLIS_KEY, |
| "3000"); |
| conf.set(DFSConfigKeys |
| .DFS_STORAGE_POLICY_SATISFIER_SELF_RETRY_TIMEOUT_MILLIS_KEY, |
| "5000"); |
| final MiniDFSCluster cluster = new MiniDFSCluster.Builder(conf) |
| .numDataNodes(numOfDatanodes) |
| .storagesPerDatanode(storagesPerDatanode) |
| .storageTypes(new StorageType[][]{ |
| {StorageType.DISK, StorageType.ARCHIVE}, |
| {StorageType.DISK, StorageType.ARCHIVE}, |
| {StorageType.DISK, StorageType.ARCHIVE}, |
| {StorageType.DISK, StorageType.ARCHIVE}, |
| {StorageType.DISK, StorageType.ARCHIVE}, |
| {StorageType.DISK, StorageType.ARCHIVE}, |
| {StorageType.DISK, StorageType.ARCHIVE}, |
| {StorageType.DISK, StorageType.ARCHIVE}, |
| {StorageType.DISK, StorageType.ARCHIVE}}) |
| .storageCapacities(capacities) |
| .build(); |
| try { |
| cluster.waitActive(); |
| startSPS(); |
| DistributedFileSystem fs = cluster.getFileSystem(); |
| fs.enableErasureCodingPolicy( |
| StripedFileTestUtil.getDefaultECPolicy().getName()); |
| Path barDir = new Path("/bar"); |
| fs.mkdirs(barDir); |
| // set an EC policy on "/bar" directory |
| fs.setErasureCodingPolicy(barDir, |
| StripedFileTestUtil.getDefaultECPolicy().getName()); |
| |
| // write file to barDir |
| final Path fooFile = new Path("/bar/foo"); |
| long fileLen = cellSize * dataBlocks; |
| DFSTestUtil.createFile(cluster.getFileSystem(), fooFile, |
| fileLen, (short) 3, 0); |
| |
| // Move file to ARCHIVE. |
| fs.setStoragePolicy(barDir, "COLD"); |
| //Stop DataNodes and restart namenode |
| List<DataNodeProperties> list = new ArrayList<>(numOfDatanodes); |
| for (int i = 0; i < numOfDatanodes; i++) { |
| list.add(cluster.stopDataNode(0)); |
| } |
| cluster.restartNameNodes(); |
| // Restart half datanodes |
| for (int i = 0; i < 5; i++) { |
| cluster.restartDataNode(list.get(i), false); |
| } |
| cluster.waitActive(); |
| fs.satisfyStoragePolicy(fooFile); |
| DFSTestUtil.waitExpectedStorageType(fooFile.toString(), |
| StorageType.ARCHIVE, 5, 30000, cluster.getFileSystem()); |
| //Start remaining datanodes |
| for (int i = numOfDatanodes - 1; i >= 5; i--) { |
| cluster.restartDataNode(list.get(i), false); |
| } |
| cluster.waitActive(); |
| // verify storage types and locations. |
| waitExpectedStorageType(cluster, fooFile.toString(), fileLen, |
| StorageType.ARCHIVE, 9, 9, 60000); |
| } finally { |
| cluster.shutdown(); |
| sps.stopGracefully(); |
| } |
| } |
| |
| |
| /** |
| * Tests to verify that for the given path, no blocks under the given path |
| * will be scheduled for block movement as there are no available datanode |
| * with required storage type. |
| * |
| * For example, there are two block for a file: |
| * |
| * File1 => blk_1[locations=A(DISK),B(DISK),C(DISK)], |
| * blk_2[locations=A(DISK),B(DISK),C(DISK)]. Now, set storage policy to COLD. |
| * No datanode is available with storage type ARCHIVE. |
| * |
| * SPS won't schedule any block movement for this path. |
| */ |
| @Test(timeout = 300000) |
| public void testWhenNoTargetDatanodeToSatisfyStoragePolicy() |
| throws Exception { |
| // start 10 datanodes |
| int numOfDatanodes = 10; |
| int storagesPerDatanode = 2; |
| long capacity = 20 * defaultStripeBlockSize; |
| long[][] capacities = new long[numOfDatanodes][storagesPerDatanode]; |
| for (int i = 0; i < numOfDatanodes; i++) { |
| for (int j = 0; j < storagesPerDatanode; j++) { |
| capacities[i][j] = capacity; |
| } |
| } |
| |
| final MiniDFSCluster cluster = new MiniDFSCluster.Builder(conf) |
| .numDataNodes(numOfDatanodes) |
| .storagesPerDatanode(storagesPerDatanode) |
| .storageTypes(new StorageType[][]{ |
| {StorageType.DISK, StorageType.DISK}, |
| {StorageType.DISK, StorageType.DISK}, |
| {StorageType.DISK, StorageType.DISK}, |
| {StorageType.DISK, StorageType.DISK}, |
| {StorageType.DISK, StorageType.DISK}, |
| {StorageType.DISK, StorageType.DISK}, |
| {StorageType.DISK, StorageType.DISK}, |
| {StorageType.DISK, StorageType.DISK}, |
| {StorageType.DISK, StorageType.DISK}, |
| {StorageType.DISK, StorageType.DISK}}) |
| .storageCapacities(capacities) |
| .build(); |
| |
| HdfsAdmin hdfsAdmin = new HdfsAdmin(FileSystem.getDefaultUri(conf), conf); |
| try { |
| cluster.waitActive(); |
| startSPS(); |
| DistributedFileSystem dfs = cluster.getFileSystem(); |
| dfs.enableErasureCodingPolicy( |
| StripedFileTestUtil.getDefaultECPolicy().getName()); |
| // set "/bar" directory with HOT storage policy. |
| ClientProtocol client = NameNodeProxies.createProxy(conf, |
| cluster.getFileSystem(0).getUri(), ClientProtocol.class).getProxy(); |
| String barDir = "/bar"; |
| client.mkdirs(barDir, new FsPermission((short) 777), true); |
| client.setStoragePolicy(barDir, HdfsConstants.HOT_STORAGE_POLICY_NAME); |
| // set an EC policy on "/bar" directory |
| client.setErasureCodingPolicy(barDir, |
| StripedFileTestUtil.getDefaultECPolicy().getName()); |
| |
| // write file to barDir |
| final String fooFile = "/bar/foo"; |
| long fileLen = cellSize * dataBlocks; |
| DFSTestUtil.createFile(cluster.getFileSystem(), new Path(fooFile), |
| fileLen, (short) 3, 0); |
| |
| // verify storage types and locations |
| LocatedBlocks locatedBlocks = client.getBlockLocations(fooFile, 0, |
| fileLen); |
| for (LocatedBlock lb : locatedBlocks.getLocatedBlocks()) { |
| for (StorageType type : lb.getStorageTypes()) { |
| Assert.assertEquals(StorageType.DISK, type); |
| } |
| } |
| StripedFileTestUtil.verifyLocatedStripedBlocks(locatedBlocks, |
| dataBlocks + parityBlocks); |
| |
| // Move file to ARCHIVE. Only 5 datanodes are available with ARCHIVE |
| // storage type. |
| client.setStoragePolicy(barDir, "COLD"); |
| hdfsAdmin.satisfyStoragePolicy(new Path(fooFile)); |
| LOG.info("Sets storage policy to COLD and invoked satisfyStoragePolicy"); |
| cluster.triggerHeartbeats(); |
| |
| waitForAttemptedItems(1, 30000); |
| // verify storage types and locations. |
| waitExpectedStorageType(cluster, fooFile, fileLen, StorageType.DISK, 9, 9, |
| 60000); |
| waitForAttemptedItems(1, 30000); |
| } finally { |
| cluster.shutdown(); |
| sps.stopGracefully(); |
| } |
| } |
| |
| private void startSPS() throws IOException { |
| nnc = DFSTestUtil.getNameNodeConnector(conf, |
| HdfsServerConstants.MOVER_ID_PATH, 1, false); |
| |
| sps = new StoragePolicySatisfier(conf); |
| ctxt = new ExternalSPSContext(sps, nnc); |
| sps.init(ctxt); |
| sps.start(true, StoragePolicySatisfierMode.EXTERNAL); |
| } |
| |
| private static void initConfWithStripe(Configuration conf, |
| int stripeBlockSize) { |
| conf.setLong(DFSConfigKeys.DFS_BLOCK_SIZE_KEY, stripeBlockSize); |
| conf.setLong(DFSConfigKeys.DFS_HEARTBEAT_INTERVAL_KEY, 1L); |
| conf.setLong(DFSConfigKeys.DFS_NAMENODE_REDUNDANCY_INTERVAL_SECONDS_KEY, |
| 1L); |
| conf.setBoolean(DFSConfigKeys.DFS_NAMENODE_REDUNDANCY_CONSIDERLOAD_KEY, |
| false); |
| } |
| |
| // Check whether the Block movement has been successfully completed to satisfy |
| // the storage policy for the given file. |
| private void waitExpectedStorageType(MiniDFSCluster cluster, |
| final String fileName, long fileLen, |
| final StorageType expectedStorageType, int expectedStorageCount, |
| int expectedBlkLocationCount, int timeout) throws Exception { |
| GenericTestUtils.waitFor(new Supplier<Boolean>() { |
| @Override |
| public Boolean get() { |
| int actualStorageCount = 0; |
| try { |
| LocatedBlocks locatedBlocks = cluster.getFileSystem().getClient() |
| .getLocatedBlocks(fileName, 0, fileLen); |
| for (LocatedBlock lb : locatedBlocks.getLocatedBlocks()) { |
| LOG.info("LocatedBlocks => Size {}, locs {}", |
| lb.getLocations().length, lb); |
| if (lb.getLocations().length > expectedBlkLocationCount) { |
| return false; |
| } |
| for (StorageType storageType : lb.getStorageTypes()) { |
| if (expectedStorageType == storageType) { |
| actualStorageCount++; |
| } else { |
| LOG.info("Expected storage type {} and actual {}", |
| expectedStorageType, storageType); |
| } |
| } |
| } |
| LOG.info( |
| expectedStorageType + " replica count, expected={} and actual={}", |
| expectedStorageCount, actualStorageCount); |
| } catch (IOException e) { |
| LOG.error("Exception while getting located blocks", e); |
| return false; |
| } |
| return expectedStorageCount == actualStorageCount; |
| } |
| }, 100, timeout); |
| } |
| |
| private void waitForAttemptedItems(long expectedBlkMovAttemptedCount, |
| int timeout) throws TimeoutException, InterruptedException { |
| GenericTestUtils.waitFor(new Supplier<Boolean>() { |
| @Override |
| public Boolean get() { |
| LOG.info("expectedAttemptedItemsCount={} actualAttemptedItemsCount={}", |
| expectedBlkMovAttemptedCount, |
| ((BlockStorageMovementAttemptedItems) (sps |
| .getAttemptedItemsMonitor())).getAttemptedItemsCount()); |
| return ((BlockStorageMovementAttemptedItems) (sps |
| .getAttemptedItemsMonitor())) |
| .getAttemptedItemsCount() == expectedBlkMovAttemptedCount; |
| } |
| }, 100, timeout); |
| } |
| } |