/**
 * Licensed to the Apache Software Foundation (ASF) under one
 * or more contributor license agreements.  See the NOTICE file
 * distributed with this work for additional information
 * regarding copyright ownership.  The ASF licenses this file
 * to you under the Apache License, Version 2.0 (the
 * "License"); you may not use this file except in compliance
 * with the License.  You may obtain a copy of the License at
 *
 *     http://www.apache.org/licenses/LICENSE-2.0
 *
 * Unless required by applicable law or agreed to in writing, software
 * distributed under the License is distributed on an "AS IS" BASIS,
 * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
 * See the License for the specific language governing permissions and
 * limitations under the License.
 */
package org.apache.hadoop.hdfs;

import static org.apache.hadoop.hdfs.server.blockmanagement.BlockStoragePolicySuite.ID_UNSPECIFIED;

import java.io.File;
import java.io.FileNotFoundException;
import java.util.*;

import com.google.common.collect.Lists;
import org.apache.hadoop.conf.Configuration;
import org.apache.hadoop.fs.FileSystem;
import org.apache.hadoop.fs.Path;
import org.apache.hadoop.hdfs.protocol.*;
import org.apache.hadoop.hdfs.protocol.HdfsConstants.SafeModeAction;
import org.apache.hadoop.hdfs.server.blockmanagement.*;
import org.apache.hadoop.hdfs.server.datanode.DataNode;
import org.apache.hadoop.hdfs.server.datanode.DataNodeTestUtils;
import org.apache.hadoop.hdfs.server.namenode.NameNode;
import org.apache.hadoop.hdfs.server.namenode.snapshot.SnapshotTestHelper;
import org.apache.hadoop.io.IOUtils;
import org.apache.hadoop.ipc.RemoteException;
import org.apache.hadoop.net.NetworkTopology;
import org.apache.hadoop.net.Node;
import org.apache.hadoop.test.GenericTestUtils;
import org.apache.hadoop.test.PathUtils;
import org.junit.Assert;
import org.junit.Test;

/** Test {@link BlockStoragePolicy} */
public class TestBlockStoragePolicy {
  public static final BlockStoragePolicySuite POLICY_SUITE;
  public static final BlockStoragePolicy DEFAULT_STORAGE_POLICY;
  public static final Configuration conf;

  static {
    conf = new HdfsConfiguration();
    conf.setLong(DFSConfigKeys.DFS_HEARTBEAT_INTERVAL_KEY, 1);
    conf.setInt(DFSConfigKeys.DFS_NAMENODE_REPLICATION_INTERVAL_KEY, 1);
    POLICY_SUITE = BlockStoragePolicySuite.createDefaultSuite();
    DEFAULT_STORAGE_POLICY = POLICY_SUITE.getDefaultPolicy();
  }

  static final EnumSet<StorageType> none = EnumSet.noneOf(StorageType.class);
  static final EnumSet<StorageType> archive = EnumSet.of(StorageType.ARCHIVE);
  static final EnumSet<StorageType> disk = EnumSet.of(StorageType.DISK);
  static final EnumSet<StorageType> both = EnumSet.of(StorageType.DISK, StorageType.ARCHIVE);

  static final long FILE_LEN = 1024;
  static final short REPLICATION = 3;

  static final byte COLD = (byte) 4;
  static final byte WARM = (byte) 8;
  static final byte HOT  = (byte) 12;

  @Test
  public void testDefaultPolicies() {
    final Map<Byte, String> expectedPolicyStrings = new HashMap<Byte, String>();
    expectedPolicyStrings.put(COLD,
        "BlockStoragePolicy{COLD:4, storageTypes=[ARCHIVE], creationFallbacks=[], replicationFallbacks=[]");
    expectedPolicyStrings.put(WARM,
        "BlockStoragePolicy{WARM:8, storageTypes=[DISK, ARCHIVE], creationFallbacks=[DISK, ARCHIVE], replicationFallbacks=[DISK, ARCHIVE]");
    expectedPolicyStrings.put(HOT,
        "BlockStoragePolicy{HOT:12, storageTypes=[DISK], creationFallbacks=[], replicationFallbacks=[ARCHIVE]");

    for(byte i = 1; i < 16; i++) {
      final BlockStoragePolicy policy = POLICY_SUITE.getPolicy(i); 
      if (policy != null) {
        final String s = policy.toString();
        Assert.assertEquals(expectedPolicyStrings.get(i), s);
      }
    }
    Assert.assertEquals(POLICY_SUITE.getPolicy(HOT), POLICY_SUITE.getDefaultPolicy());
    
    { // check Cold policy
      final BlockStoragePolicy cold = POLICY_SUITE.getPolicy(COLD);
      for(short replication = 1; replication < 6; replication++) {
        final List<StorageType> computed = cold.chooseStorageTypes(replication);
        assertStorageType(computed, replication, StorageType.ARCHIVE);
      }
      assertCreationFallback(cold, null, null, null);
      assertReplicationFallback(cold, null, null, null);
    }
    
    { // check Warm policy
      final BlockStoragePolicy warm = POLICY_SUITE.getPolicy(WARM);
      for(short replication = 1; replication < 6; replication++) {
        final List<StorageType> computed = warm.chooseStorageTypes(replication);
        assertStorageType(computed, replication, StorageType.DISK, StorageType.ARCHIVE);
      }
      assertCreationFallback(warm, StorageType.DISK, StorageType.DISK, StorageType.ARCHIVE);
      assertReplicationFallback(warm, StorageType.DISK, StorageType.DISK, StorageType.ARCHIVE);
    }

    { // check Hot policy
      final BlockStoragePolicy hot = POLICY_SUITE.getPolicy(HOT);
      for(short replication = 1; replication < 6; replication++) {
        final List<StorageType> computed = hot.chooseStorageTypes(replication);
        assertStorageType(computed, replication, StorageType.DISK);
      }
      assertCreationFallback(hot, null, null, null);
      assertReplicationFallback(hot, StorageType.ARCHIVE, null, StorageType.ARCHIVE);
    }
  }

  static StorageType[] newStorageTypes(int nDisk, int nArchive) {
    final StorageType[] t = new StorageType[nDisk + nArchive];
    Arrays.fill(t, 0, nDisk, StorageType.DISK);
    Arrays.fill(t, nDisk, t.length, StorageType.ARCHIVE);
    return t;
  }

  static List<StorageType> asList(int nDisk, int nArchive) {
    return Arrays.asList(newStorageTypes(nDisk, nArchive));
  }

  static void assertStorageType(List<StorageType> computed, short replication,
      StorageType... answers) {
    Assert.assertEquals(replication, computed.size());
    final StorageType last = answers[answers.length - 1];
    for(int i = 0; i < computed.size(); i++) {
      final StorageType expected = i < answers.length? answers[i]: last;
      Assert.assertEquals(expected, computed.get(i));
    }
  }

  static void assertCreationFallback(BlockStoragePolicy policy, StorageType noneExpected,
      StorageType archiveExpected, StorageType diskExpected) {
    Assert.assertEquals(noneExpected, policy.getCreationFallback(none));
    Assert.assertEquals(archiveExpected, policy.getCreationFallback(archive));
    Assert.assertEquals(diskExpected, policy.getCreationFallback(disk));
    Assert.assertEquals(null, policy.getCreationFallback(both));
  }

  static void assertReplicationFallback(BlockStoragePolicy policy, StorageType noneExpected,
      StorageType archiveExpected, StorageType diskExpected) {
    Assert.assertEquals(noneExpected, policy.getReplicationFallback(none));
    Assert.assertEquals(archiveExpected, policy.getReplicationFallback(archive));
    Assert.assertEquals(diskExpected, policy.getReplicationFallback(disk));
    Assert.assertEquals(null, policy.getReplicationFallback(both));
  }

  private static interface CheckChooseStorageTypes {
    public void checkChooseStorageTypes(BlockStoragePolicy p, short replication,
        List<StorageType> chosen, StorageType... expected);

    /** Basic case: pass only replication and chosen */
    static final CheckChooseStorageTypes Basic = new CheckChooseStorageTypes() {
      @Override
      public void checkChooseStorageTypes(BlockStoragePolicy p, short replication,
          List<StorageType> chosen, StorageType... expected) {
        final List<StorageType> types = p.chooseStorageTypes(replication, chosen);
        assertStorageTypes(types, expected);
      }
    };
    
    /** With empty unavailables and isNewBlock=true */
    static final CheckChooseStorageTypes EmptyUnavailablesAndNewBlock
        = new CheckChooseStorageTypes() {
      @Override
      public void checkChooseStorageTypes(BlockStoragePolicy p,
          short replication, List<StorageType> chosen, StorageType... expected) {
        final List<StorageType> types = p.chooseStorageTypes(replication,
            chosen, none, true);
        assertStorageTypes(types, expected);
      }
    };

    /** With empty unavailables and isNewBlock=false */
    static final CheckChooseStorageTypes EmptyUnavailablesAndNonNewBlock
        = new CheckChooseStorageTypes() {
      @Override
      public void checkChooseStorageTypes(BlockStoragePolicy p,
          short replication, List<StorageType> chosen, StorageType... expected) {
        final List<StorageType> types = p.chooseStorageTypes(replication,
            chosen, none, false);
        assertStorageTypes(types, expected);
      }
    };
    
    /** With both DISK and ARCHIVE unavailables and isNewBlock=true */
    static final CheckChooseStorageTypes BothUnavailableAndNewBlock
        = new CheckChooseStorageTypes() {
      @Override
      public void checkChooseStorageTypes(BlockStoragePolicy p,
          short replication, List<StorageType> chosen, StorageType... expected) {
        final List<StorageType> types = p.chooseStorageTypes(replication,
            chosen, both, true);
        assertStorageTypes(types, expected);
      }
    };

    /** With both DISK and ARCHIVE unavailable and isNewBlock=false */
    static final CheckChooseStorageTypes BothUnavailableAndNonNewBlock
        = new CheckChooseStorageTypes() {
      @Override
      public void checkChooseStorageTypes(BlockStoragePolicy p,
          short replication, List<StorageType> chosen, StorageType... expected) {
        final List<StorageType> types = p.chooseStorageTypes(replication,
            chosen, both, false);
        assertStorageTypes(types, expected);
      }
    };

    /** With ARCHIVE unavailable and isNewBlock=true */
    static final CheckChooseStorageTypes ArchivalUnavailableAndNewBlock
        = new CheckChooseStorageTypes() {
      @Override
      public void checkChooseStorageTypes(BlockStoragePolicy p,
          short replication, List<StorageType> chosen, StorageType... expected) {
        final List<StorageType> types = p.chooseStorageTypes(replication,
            chosen, archive, true);
        assertStorageTypes(types, expected);
      }
    };

    /** With ARCHIVE unavailable and isNewBlock=true */
    static final CheckChooseStorageTypes ArchivalUnavailableAndNonNewBlock
        = new CheckChooseStorageTypes() {
      @Override
      public void checkChooseStorageTypes(BlockStoragePolicy p,
          short replication, List<StorageType> chosen, StorageType... expected) {
        final List<StorageType> types = p.chooseStorageTypes(replication,
            chosen, archive, false);
        assertStorageTypes(types, expected);
      }
    };
  }

  @Test
  public void testChooseStorageTypes() {
    run(CheckChooseStorageTypes.Basic);
    run(CheckChooseStorageTypes.EmptyUnavailablesAndNewBlock);
    run(CheckChooseStorageTypes.EmptyUnavailablesAndNonNewBlock);
  }

  private static void run(CheckChooseStorageTypes method) {
    final BlockStoragePolicy hot = POLICY_SUITE.getPolicy(HOT);
    final BlockStoragePolicy warm = POLICY_SUITE.getPolicy(WARM);
    final BlockStoragePolicy cold = POLICY_SUITE.getPolicy(COLD);

    final short replication = 3;
    {
      final List<StorageType> chosen = Lists.newArrayList();
      method.checkChooseStorageTypes(hot, replication, chosen,
          StorageType.DISK, StorageType.DISK, StorageType.DISK);
      method.checkChooseStorageTypes(warm, replication, chosen,
          StorageType.DISK, StorageType.ARCHIVE, StorageType.ARCHIVE);
      method.checkChooseStorageTypes(cold, replication, chosen,
          StorageType.ARCHIVE, StorageType.ARCHIVE, StorageType.ARCHIVE);
    }

    {
      final List<StorageType> chosen = Arrays.asList(StorageType.DISK); 
      method.checkChooseStorageTypes(hot, replication, chosen,
          StorageType.DISK, StorageType.DISK);
      method.checkChooseStorageTypes(warm, replication, chosen,
          StorageType.ARCHIVE, StorageType.ARCHIVE);
      method.checkChooseStorageTypes(cold, replication, chosen,
          StorageType.ARCHIVE, StorageType.ARCHIVE, StorageType.ARCHIVE);
    }

    {
      final List<StorageType> chosen = Arrays.asList(StorageType.ARCHIVE); 
      method.checkChooseStorageTypes(hot, replication, chosen,
          StorageType.DISK, StorageType.DISK, StorageType.DISK);
      method.checkChooseStorageTypes(warm, replication, chosen,
          StorageType.DISK, StorageType.ARCHIVE);
      method.checkChooseStorageTypes(cold, replication, chosen,
          StorageType.ARCHIVE, StorageType.ARCHIVE);
    }

    {
      final List<StorageType> chosen = Arrays.asList(
          StorageType.DISK, StorageType.DISK); 
      method.checkChooseStorageTypes(hot, replication, chosen,
          StorageType.DISK);
      method.checkChooseStorageTypes(warm, replication, chosen,
          StorageType.ARCHIVE, StorageType.ARCHIVE);
      method.checkChooseStorageTypes(cold, replication, chosen,
          StorageType.ARCHIVE, StorageType.ARCHIVE, StorageType.ARCHIVE);
    }

    {
      final List<StorageType> chosen = Arrays.asList(
          StorageType.DISK, StorageType.ARCHIVE); 
      method.checkChooseStorageTypes(hot, replication, chosen,
          StorageType.DISK, StorageType.DISK);
      method.checkChooseStorageTypes(warm, replication, chosen,
          StorageType.ARCHIVE);
      method.checkChooseStorageTypes(cold, replication, chosen,
          StorageType.ARCHIVE, StorageType.ARCHIVE);
    }

    {
      final List<StorageType> chosen = Arrays.asList(
          StorageType.ARCHIVE, StorageType.ARCHIVE); 
      method.checkChooseStorageTypes(hot, replication, chosen,
          StorageType.DISK, StorageType.DISK, StorageType.DISK);
      method.checkChooseStorageTypes(warm, replication, chosen,
          StorageType.DISK);
      method.checkChooseStorageTypes(cold, replication, chosen,
          StorageType.ARCHIVE);
    }

    {
      final List<StorageType> chosen = Arrays.asList(
          StorageType.DISK, StorageType.DISK, StorageType.DISK); 
      method.checkChooseStorageTypes(hot, replication, chosen);
      method.checkChooseStorageTypes(warm, replication, chosen,
          StorageType.ARCHIVE, StorageType.ARCHIVE);
      method.checkChooseStorageTypes(cold, replication, chosen,
          StorageType.ARCHIVE, StorageType.ARCHIVE, StorageType.ARCHIVE);
    }

    {
      final List<StorageType> chosen = Arrays.asList(
          StorageType.DISK, StorageType.DISK, StorageType.ARCHIVE); 
      method.checkChooseStorageTypes(hot, replication, chosen,
          StorageType.DISK);
      method.checkChooseStorageTypes(warm, replication, chosen,
          StorageType.ARCHIVE);
      method.checkChooseStorageTypes(cold, replication, chosen,
          StorageType.ARCHIVE, StorageType.ARCHIVE);
    }

    {
      final List<StorageType> chosen = Arrays.asList(
          StorageType.DISK, StorageType.ARCHIVE, StorageType.ARCHIVE); 
      method.checkChooseStorageTypes(hot, replication, chosen,
          StorageType.DISK, StorageType.DISK);
      method.checkChooseStorageTypes(warm, replication, chosen);
      method.checkChooseStorageTypes(cold, replication, chosen,
          StorageType.ARCHIVE);
    }

    {
      final List<StorageType> chosen = Arrays.asList(
          StorageType.ARCHIVE, StorageType.ARCHIVE, StorageType.ARCHIVE); 
      method.checkChooseStorageTypes(hot, replication, chosen,
          StorageType.DISK, StorageType.DISK, StorageType.DISK);
      method.checkChooseStorageTypes(warm, replication, chosen,
          StorageType.DISK);
      method.checkChooseStorageTypes(cold, replication, chosen);
    }
  }

  @Test
  public void testChooseStorageTypesWithBothUnavailable() {
    runWithBothUnavailable(CheckChooseStorageTypes.BothUnavailableAndNewBlock);
    runWithBothUnavailable(CheckChooseStorageTypes.BothUnavailableAndNonNewBlock);
  }

  private static void runWithBothUnavailable(CheckChooseStorageTypes method) {
    final BlockStoragePolicy hot = POLICY_SUITE.getPolicy(HOT);
    final BlockStoragePolicy warm = POLICY_SUITE.getPolicy(WARM);
    final BlockStoragePolicy cold = POLICY_SUITE.getPolicy(COLD);

    final short replication = 3;
    for(int n = 0; n <= 3; n++) {
      for(int d = 0; d <= n; d++) {
        final int a = n - d;
        final List<StorageType> chosen = asList(d, a);
        method.checkChooseStorageTypes(hot, replication, chosen);
        method.checkChooseStorageTypes(warm, replication, chosen);
        method.checkChooseStorageTypes(cold, replication, chosen);
      }
    }
  }

  @Test
  public void testChooseStorageTypesWithDiskUnavailableAndNewBlock() {
    final BlockStoragePolicy hot = POLICY_SUITE.getPolicy(HOT);
    final BlockStoragePolicy warm = POLICY_SUITE.getPolicy(WARM);
    final BlockStoragePolicy cold = POLICY_SUITE.getPolicy(COLD);

    final short replication = 3;
    final EnumSet<StorageType> unavailables = disk;
    final boolean isNewBlock = true;
    {
      final List<StorageType> chosen = Lists.newArrayList();
      checkChooseStorageTypes(hot, replication, chosen, unavailables, isNewBlock);
      checkChooseStorageTypes(warm, replication, chosen, unavailables, isNewBlock,
          StorageType.ARCHIVE, StorageType.ARCHIVE, StorageType.ARCHIVE);
      checkChooseStorageTypes(cold, replication, chosen, unavailables, isNewBlock,
          StorageType.ARCHIVE, StorageType.ARCHIVE, StorageType.ARCHIVE);
    }

    {
      final List<StorageType> chosen = Arrays.asList(StorageType.DISK); 
      checkChooseStorageTypes(hot, replication, chosen, unavailables, isNewBlock);
      checkChooseStorageTypes(warm, replication, chosen, unavailables, isNewBlock,
          StorageType.ARCHIVE, StorageType.ARCHIVE);
      checkChooseStorageTypes(cold, replication, chosen, unavailables, isNewBlock,
          StorageType.ARCHIVE, StorageType.ARCHIVE, StorageType.ARCHIVE);
    }

    {
      final List<StorageType> chosen = Arrays.asList(StorageType.ARCHIVE); 
      checkChooseStorageTypes(hot, replication, chosen, unavailables, isNewBlock);
      checkChooseStorageTypes(warm, replication, chosen, unavailables, isNewBlock,
          StorageType.ARCHIVE, StorageType.ARCHIVE);
      checkChooseStorageTypes(cold, replication, chosen, unavailables, isNewBlock,
          StorageType.ARCHIVE, StorageType.ARCHIVE);
    }

    {
      final List<StorageType> chosen = Arrays.asList(
          StorageType.DISK, StorageType.DISK); 
      checkChooseStorageTypes(hot, replication, chosen, unavailables, isNewBlock);
      checkChooseStorageTypes(warm, replication, chosen, unavailables, isNewBlock,
          StorageType.ARCHIVE, StorageType.ARCHIVE);
      checkChooseStorageTypes(cold, replication, chosen, unavailables, isNewBlock,
          StorageType.ARCHIVE, StorageType.ARCHIVE, StorageType.ARCHIVE);
    }

    {
      final List<StorageType> chosen = Arrays.asList(
          StorageType.DISK, StorageType.ARCHIVE); 
      checkChooseStorageTypes(hot, replication, chosen, unavailables, isNewBlock);
      checkChooseStorageTypes(warm, replication, chosen, unavailables, isNewBlock,
          StorageType.ARCHIVE);
      checkChooseStorageTypes(cold, replication, chosen, unavailables, isNewBlock,
          StorageType.ARCHIVE, StorageType.ARCHIVE);
    }

    {
      final List<StorageType> chosen = Arrays.asList(
          StorageType.ARCHIVE, StorageType.ARCHIVE); 
      checkChooseStorageTypes(hot, replication, chosen, unavailables, isNewBlock);
      checkChooseStorageTypes(warm, replication, chosen, unavailables, isNewBlock,
          StorageType.ARCHIVE);
      checkChooseStorageTypes(cold, replication, chosen, unavailables, isNewBlock,
          StorageType.ARCHIVE);
    }

    {
      final List<StorageType> chosen = Arrays.asList(
          StorageType.DISK, StorageType.DISK, StorageType.DISK); 
      checkChooseStorageTypes(hot, replication, chosen, unavailables, isNewBlock);
      checkChooseStorageTypes(warm, replication, chosen, unavailables, isNewBlock,
          StorageType.ARCHIVE, StorageType.ARCHIVE);
      checkChooseStorageTypes(cold, replication, chosen, unavailables, isNewBlock,
          StorageType.ARCHIVE, StorageType.ARCHIVE, StorageType.ARCHIVE);
    }

    {
      final List<StorageType> chosen = Arrays.asList(
          StorageType.DISK, StorageType.DISK, StorageType.ARCHIVE); 
      checkChooseStorageTypes(hot, replication, chosen, unavailables, isNewBlock);
      checkChooseStorageTypes(warm, replication, chosen, unavailables, isNewBlock,
          StorageType.ARCHIVE);
      checkChooseStorageTypes(cold, replication, chosen, unavailables, isNewBlock,
          StorageType.ARCHIVE, StorageType.ARCHIVE);
    }

    {
      final List<StorageType> chosen = Arrays.asList(
          StorageType.DISK, StorageType.ARCHIVE, StorageType.ARCHIVE); 
      checkChooseStorageTypes(hot, replication, chosen, unavailables, isNewBlock);
      checkChooseStorageTypes(warm, replication, chosen, unavailables, isNewBlock);
      checkChooseStorageTypes(cold, replication, chosen, unavailables, isNewBlock,
          StorageType.ARCHIVE);
    }

    {
      final List<StorageType> chosen = Arrays.asList(
          StorageType.ARCHIVE, StorageType.ARCHIVE, StorageType.ARCHIVE); 
      checkChooseStorageTypes(hot, replication, chosen, unavailables, isNewBlock);
      checkChooseStorageTypes(warm, replication, chosen, unavailables, isNewBlock);
      checkChooseStorageTypes(cold, replication, chosen, unavailables, isNewBlock);
    }
  }

  @Test
  public void testChooseStorageTypesWithArchiveUnavailable() {
    runWithArchiveUnavailable(CheckChooseStorageTypes.ArchivalUnavailableAndNewBlock);
    runWithArchiveUnavailable(CheckChooseStorageTypes.ArchivalUnavailableAndNonNewBlock);
  }

  private static void runWithArchiveUnavailable(CheckChooseStorageTypes method) {
    final BlockStoragePolicy hot = POLICY_SUITE.getPolicy(HOT);
    final BlockStoragePolicy warm = POLICY_SUITE.getPolicy(WARM);
    final BlockStoragePolicy cold = POLICY_SUITE.getPolicy(COLD);

    final short replication = 3;
    {
      final List<StorageType> chosen = Lists.newArrayList();
      method.checkChooseStorageTypes(hot, replication, chosen,
          StorageType.DISK, StorageType.DISK, StorageType.DISK);
      method.checkChooseStorageTypes(warm, replication, chosen,
          StorageType.DISK, StorageType.DISK, StorageType.DISK);
      method.checkChooseStorageTypes(cold, replication, chosen);
    }

    {
      final List<StorageType> chosen = Arrays.asList(StorageType.DISK); 
      method.checkChooseStorageTypes(hot, replication, chosen,
          StorageType.DISK, StorageType.DISK);
      method.checkChooseStorageTypes(warm, replication, chosen,
          StorageType.DISK, StorageType.DISK);
      method.checkChooseStorageTypes(cold, replication, chosen);
    }

    {
      final List<StorageType> chosen = Arrays.asList(StorageType.ARCHIVE); 
      method.checkChooseStorageTypes(hot, replication, chosen,
          StorageType.DISK, StorageType.DISK, StorageType.DISK);
      method.checkChooseStorageTypes(warm, replication, chosen,
          StorageType.DISK, StorageType.DISK);
      method.checkChooseStorageTypes(cold, replication, chosen);
    }

    {
      final List<StorageType> chosen = Arrays.asList(
          StorageType.DISK, StorageType.DISK); 
      method.checkChooseStorageTypes(hot, replication, chosen,
          StorageType.DISK);
      method.checkChooseStorageTypes(warm, replication, chosen,
          StorageType.DISK);
      method.checkChooseStorageTypes(cold, replication, chosen);
    }

    {
      final List<StorageType> chosen = Arrays.asList(
          StorageType.DISK, StorageType.ARCHIVE); 
      method.checkChooseStorageTypes(hot, replication, chosen,
          StorageType.DISK, StorageType.DISK); 
      method.checkChooseStorageTypes(warm, replication, chosen,
          StorageType.DISK);
      method.checkChooseStorageTypes(cold, replication, chosen);
    }

    {
      final List<StorageType> chosen = Arrays.asList(
          StorageType.ARCHIVE, StorageType.ARCHIVE); 
      method.checkChooseStorageTypes(hot, replication, chosen,
          StorageType.DISK, StorageType.DISK, StorageType.DISK); 
      method.checkChooseStorageTypes(warm, replication, chosen,
          StorageType.DISK);
      method.checkChooseStorageTypes(cold, replication, chosen);
    }

    {
      final List<StorageType> chosen = Arrays.asList(
          StorageType.DISK, StorageType.DISK, StorageType.DISK); 
      method.checkChooseStorageTypes(hot, replication, chosen);
      method.checkChooseStorageTypes(warm, replication, chosen);
      method.checkChooseStorageTypes(cold, replication, chosen);
    }

    {
      final List<StorageType> chosen = Arrays.asList(
          StorageType.DISK, StorageType.DISK, StorageType.ARCHIVE); 
      method.checkChooseStorageTypes(hot, replication, chosen,
          StorageType.DISK);
      method.checkChooseStorageTypes(warm, replication, chosen);
      method.checkChooseStorageTypes(cold, replication, chosen);
    }

    {
      final List<StorageType> chosen = Arrays.asList(
          StorageType.DISK, StorageType.ARCHIVE, StorageType.ARCHIVE); 
      method.checkChooseStorageTypes(hot, replication, chosen,
          StorageType.DISK, StorageType.DISK); 
      method.checkChooseStorageTypes(warm, replication, chosen);
      method.checkChooseStorageTypes(cold, replication, chosen);
    }

    {
      final List<StorageType> chosen = Arrays.asList(
          StorageType.ARCHIVE, StorageType.ARCHIVE, StorageType.ARCHIVE); 
      method.checkChooseStorageTypes(hot, replication, chosen,
          StorageType.DISK, StorageType.DISK, StorageType.DISK);
      method.checkChooseStorageTypes(warm, replication, chosen,
          StorageType.DISK);
      method.checkChooseStorageTypes(cold, replication, chosen);
    }
  }

  @Test
  public void testChooseStorageTypesWithDiskUnavailableAndNonNewBlock() {
    final BlockStoragePolicy hot = POLICY_SUITE.getPolicy(HOT);
    final BlockStoragePolicy warm = POLICY_SUITE.getPolicy(WARM);
    final BlockStoragePolicy cold = POLICY_SUITE.getPolicy(COLD);

    final short replication = 3;
    final EnumSet<StorageType> unavailables = disk;
    final boolean isNewBlock = false;
    {
      final List<StorageType> chosen = Lists.newArrayList();
      checkChooseStorageTypes(hot, replication, chosen, unavailables, isNewBlock,
          StorageType.ARCHIVE, StorageType.ARCHIVE, StorageType.ARCHIVE);
      checkChooseStorageTypes(warm, replication, chosen, unavailables, isNewBlock,
          StorageType.ARCHIVE, StorageType.ARCHIVE, StorageType.ARCHIVE);
      checkChooseStorageTypes(cold, replication, chosen, unavailables, isNewBlock,
          StorageType.ARCHIVE, StorageType.ARCHIVE, StorageType.ARCHIVE);
    }

    {
      final List<StorageType> chosen = Arrays.asList(StorageType.DISK); 
      checkChooseStorageTypes(hot, replication, chosen, unavailables, isNewBlock,
          StorageType.ARCHIVE, StorageType.ARCHIVE);
      checkChooseStorageTypes(warm, replication, chosen, unavailables, isNewBlock,
          StorageType.ARCHIVE, StorageType.ARCHIVE);
      checkChooseStorageTypes(cold, replication, chosen, unavailables, isNewBlock,
          StorageType.ARCHIVE, StorageType.ARCHIVE, StorageType.ARCHIVE);
    }

    {
      final List<StorageType> chosen = Arrays.asList(StorageType.ARCHIVE); 
      checkChooseStorageTypes(hot, replication, chosen, unavailables, isNewBlock,
          StorageType.ARCHIVE, StorageType.ARCHIVE);
      checkChooseStorageTypes(warm, replication, chosen, unavailables, isNewBlock,
          StorageType.ARCHIVE, StorageType.ARCHIVE);
      checkChooseStorageTypes(cold, replication, chosen, unavailables, isNewBlock,
          StorageType.ARCHIVE, StorageType.ARCHIVE);
    }

    {
      final List<StorageType> chosen = Arrays.asList(
          StorageType.DISK, StorageType.DISK); 
      checkChooseStorageTypes(hot, replication, chosen, unavailables, isNewBlock,
          StorageType.ARCHIVE);
      checkChooseStorageTypes(warm, replication, chosen, unavailables, isNewBlock,
          StorageType.ARCHIVE, StorageType.ARCHIVE);
      checkChooseStorageTypes(cold, replication, chosen, unavailables, isNewBlock,
          StorageType.ARCHIVE, StorageType.ARCHIVE, StorageType.ARCHIVE);
    }

    {
      final List<StorageType> chosen = Arrays.asList(
          StorageType.DISK, StorageType.ARCHIVE); 
      checkChooseStorageTypes(hot, replication, chosen, unavailables, isNewBlock,
          StorageType.ARCHIVE);
      checkChooseStorageTypes(warm, replication, chosen, unavailables, isNewBlock,
          StorageType.ARCHIVE);
      checkChooseStorageTypes(cold, replication, chosen, unavailables, isNewBlock,
          StorageType.ARCHIVE, StorageType.ARCHIVE);
    }

    {
      final List<StorageType> chosen = Arrays.asList(
          StorageType.ARCHIVE, StorageType.ARCHIVE); 
      checkChooseStorageTypes(hot, replication, chosen, unavailables, isNewBlock,
          StorageType.ARCHIVE);
      checkChooseStorageTypes(warm, replication, chosen, unavailables, isNewBlock,
          StorageType.ARCHIVE);
      checkChooseStorageTypes(cold, replication, chosen, unavailables, isNewBlock,
          StorageType.ARCHIVE);
    }

    {
      final List<StorageType> chosen = Arrays.asList(
          StorageType.DISK, StorageType.DISK, StorageType.DISK); 
      checkChooseStorageTypes(hot, replication, chosen, unavailables, isNewBlock);
      checkChooseStorageTypes(warm, replication, chosen, unavailables, isNewBlock,
          StorageType.ARCHIVE, StorageType.ARCHIVE);
      checkChooseStorageTypes(cold, replication, chosen, unavailables, isNewBlock,
          StorageType.ARCHIVE, StorageType.ARCHIVE, StorageType.ARCHIVE);
    }

    {
      final List<StorageType> chosen = Arrays.asList(
          StorageType.DISK, StorageType.DISK, StorageType.ARCHIVE); 
      checkChooseStorageTypes(hot, replication, chosen, unavailables, isNewBlock);
      checkChooseStorageTypes(warm, replication, chosen, unavailables, isNewBlock,
          StorageType.ARCHIVE);
      checkChooseStorageTypes(cold, replication, chosen, unavailables, isNewBlock,
          StorageType.ARCHIVE, StorageType.ARCHIVE);
    }

    {
      final List<StorageType> chosen = Arrays.asList(
          StorageType.DISK, StorageType.ARCHIVE, StorageType.ARCHIVE); 
      checkChooseStorageTypes(hot, replication, chosen, unavailables, isNewBlock);
      checkChooseStorageTypes(warm, replication, chosen, unavailables, isNewBlock);
      checkChooseStorageTypes(cold, replication, chosen, unavailables, isNewBlock,
          StorageType.ARCHIVE);
    }

    {
      final List<StorageType> chosen = Arrays.asList(
          StorageType.ARCHIVE, StorageType.ARCHIVE, StorageType.ARCHIVE); 
      checkChooseStorageTypes(hot, replication, chosen, unavailables, isNewBlock);
      checkChooseStorageTypes(warm, replication, chosen, unavailables, isNewBlock);
      checkChooseStorageTypes(cold, replication, chosen, unavailables, isNewBlock);
    }
  }

  static void checkChooseStorageTypes(BlockStoragePolicy p, short replication,
      List<StorageType> chosen, EnumSet<StorageType> unavailables,
      boolean isNewBlock, StorageType... expected) {
    final List<StorageType> types = p.chooseStorageTypes(replication, chosen,
        unavailables, isNewBlock);
    assertStorageTypes(types, expected);
  }

  static void assertStorageTypes(List<StorageType> computed, StorageType... expected) {
    assertStorageTypes(computed.toArray(StorageType.EMPTY_ARRAY), expected);
  }

  static void assertStorageTypes(StorageType[] computed, StorageType... expected) {
    Arrays.sort(expected);
    Arrays.sort(computed);
    Assert.assertArrayEquals(expected, computed);
  }

  @Test
  public void testChooseExcess() {
    final BlockStoragePolicy hot = POLICY_SUITE.getPolicy(HOT);
    final BlockStoragePolicy warm = POLICY_SUITE.getPolicy(WARM);
    final BlockStoragePolicy cold = POLICY_SUITE.getPolicy(COLD);

    final short replication = 3;
    for(int n = 0; n <= 6; n++) {
      for(int d = 0; d <= n; d++) {
        final int a = n - d;
        final List<StorageType> chosen = asList(d, a);
        {
          final int nDisk = Math.max(0, d - replication); 
          final int nArchive = a;
          final StorageType[] expected = newStorageTypes(nDisk, nArchive);
          checkChooseExcess(hot, replication, chosen, expected);
        }

        {
          final int nDisk = Math.max(0, d - 1); 
          final int nArchive = Math.max(0, a - replication + 1);
          final StorageType[] expected = newStorageTypes(nDisk, nArchive);
          checkChooseExcess(warm, replication, chosen, expected);
        }

        {
          final int nDisk = d; 
          final int nArchive = Math.max(0, a - replication );
          final StorageType[] expected = newStorageTypes(nDisk, nArchive);
          checkChooseExcess(cold, replication, chosen, expected);
        }
      }
    }
  }

  static void checkChooseExcess(BlockStoragePolicy p, short replication,
      List<StorageType> chosen, StorageType... expected) {
    final List<StorageType> types = p.chooseExcess(replication, chosen);
    assertStorageTypes(types, expected);
  }

  private void checkDirectoryListing(HdfsFileStatus[] stats, byte... policies) {
    Assert.assertEquals(stats.length, policies.length);
    for (int i = 0; i < stats.length; i++) {
      Assert.assertEquals(stats[i].getStoragePolicy(), policies[i]);
    }
  }

  @Test
  public void testSetStoragePolicy() throws Exception {
    final MiniDFSCluster cluster = new MiniDFSCluster.Builder(conf)
        .numDataNodes(REPLICATION).build();
    cluster.waitActive();
    final DistributedFileSystem fs = cluster.getFileSystem();
    try {
      final Path dir = new Path("/testSetStoragePolicy");
      final Path fooFile = new Path(dir, "foo");
      final Path barDir = new Path(dir, "bar");
      final Path barFile1= new Path(barDir, "f1");
      final Path barFile2= new Path(barDir, "f2");
      DFSTestUtil.createFile(fs, fooFile, FILE_LEN, REPLICATION, 0L);
      DFSTestUtil.createFile(fs, barFile1, FILE_LEN, REPLICATION, 0L);
      DFSTestUtil.createFile(fs, barFile2, FILE_LEN, REPLICATION, 0L);

      final String invalidPolicyName = "INVALID-POLICY";
      try {
        fs.setStoragePolicy(fooFile, invalidPolicyName);
        Assert.fail("Should throw a HadoopIllegalArgumentException");
      } catch (RemoteException e) {
        GenericTestUtils.assertExceptionContains(invalidPolicyName, e);
      }

      // check storage policy
      HdfsFileStatus[] dirList = fs.getClient().listPaths(dir.toString(),
          HdfsFileStatus.EMPTY_NAME, true).getPartialListing();
      HdfsFileStatus[] barList = fs.getClient().listPaths(barDir.toString(),
          HdfsFileStatus.EMPTY_NAME, true).getPartialListing();
      checkDirectoryListing(dirList, ID_UNSPECIFIED, ID_UNSPECIFIED);
      checkDirectoryListing(barList, ID_UNSPECIFIED, ID_UNSPECIFIED);

      final Path invalidPath = new Path("/invalidPath");
      try {
        fs.setStoragePolicy(invalidPath, "WARM");
        Assert.fail("Should throw a FileNotFoundException");
      } catch (FileNotFoundException e) {
        GenericTestUtils.assertExceptionContains(invalidPath.toString(), e);
      }

      fs.setStoragePolicy(fooFile, "COLD");
      fs.setStoragePolicy(barDir, "WARM");
      fs.setStoragePolicy(barFile2, "HOT");

      dirList = fs.getClient().listPaths(dir.toString(),
          HdfsFileStatus.EMPTY_NAME).getPartialListing();
      barList = fs.getClient().listPaths(barDir.toString(),
          HdfsFileStatus.EMPTY_NAME).getPartialListing();
      checkDirectoryListing(dirList, WARM, COLD); // bar is warm, foo is cold
      checkDirectoryListing(barList, WARM, HOT);

      // restart namenode to make sure the editlog is correct
      cluster.restartNameNode(true);
      dirList = fs.getClient().listPaths(dir.toString(),
          HdfsFileStatus.EMPTY_NAME, true).getPartialListing();
      barList = fs.getClient().listPaths(barDir.toString(),
          HdfsFileStatus.EMPTY_NAME, true).getPartialListing();
      checkDirectoryListing(dirList, WARM, COLD); // bar is warm, foo is cold
      checkDirectoryListing(barList, WARM, HOT);

      // restart namenode with checkpoint to make sure the fsimage is correct
      fs.setSafeMode(SafeModeAction.SAFEMODE_ENTER);
      fs.saveNamespace();
      fs.setSafeMode(SafeModeAction.SAFEMODE_LEAVE);
      cluster.restartNameNode(true);
      dirList = fs.getClient().listPaths(dir.toString(),
          HdfsFileStatus.EMPTY_NAME).getPartialListing();
      barList = fs.getClient().listPaths(barDir.toString(),
          HdfsFileStatus.EMPTY_NAME).getPartialListing();
      checkDirectoryListing(dirList, WARM, COLD); // bar is warm, foo is cold
      checkDirectoryListing(barList, WARM, HOT);
    } finally {
      cluster.shutdown();
    }
  }

  @Test
  public void testSetStoragePolicyWithSnapshot() throws Exception {
    final MiniDFSCluster cluster = new MiniDFSCluster.Builder(conf)
        .numDataNodes(REPLICATION).build();
    cluster.waitActive();
    final DistributedFileSystem fs = cluster.getFileSystem();
    try {
      final Path dir = new Path("/testSetStoragePolicyWithSnapshot");
      final Path fooDir = new Path(dir, "foo");
      final Path fooFile1= new Path(fooDir, "f1");
      final Path fooFile2= new Path(fooDir, "f2");
      DFSTestUtil.createFile(fs, fooFile1, FILE_LEN, REPLICATION, 0L);
      DFSTestUtil.createFile(fs, fooFile2, FILE_LEN, REPLICATION, 0L);

      fs.setStoragePolicy(fooDir, "WARM");

      HdfsFileStatus[] dirList = fs.getClient().listPaths(dir.toString(),
          HdfsFileStatus.EMPTY_NAME, true).getPartialListing();
      checkDirectoryListing(dirList, WARM);
      HdfsFileStatus[] fooList = fs.getClient().listPaths(fooDir.toString(),
          HdfsFileStatus.EMPTY_NAME, true).getPartialListing();
      checkDirectoryListing(fooList, WARM, WARM);

      // take snapshot
      SnapshotTestHelper.createSnapshot(fs, dir, "s1");
      // change the storage policy of fooFile1
      fs.setStoragePolicy(fooFile1, "COLD");

      fooList = fs.getClient().listPaths(fooDir.toString(),
          HdfsFileStatus.EMPTY_NAME).getPartialListing();
      checkDirectoryListing(fooList, COLD, WARM);

      // check the policy for /dir/.snapshot/s1/foo/f1. Note we always return
      // the latest storage policy for a file/directory.
      Path s1f1 = SnapshotTestHelper.getSnapshotPath(dir, "s1", "foo/f1");
      DirectoryListing f1Listing = fs.getClient().listPaths(s1f1.toString(),
          HdfsFileStatus.EMPTY_NAME);
      checkDirectoryListing(f1Listing.getPartialListing(), COLD);

      // delete f1
      fs.delete(fooFile1, true);
      fooList = fs.getClient().listPaths(fooDir.toString(),
          HdfsFileStatus.EMPTY_NAME).getPartialListing();
      checkDirectoryListing(fooList, WARM);
      // check the policy for /dir/.snapshot/s1/foo/f1 again after the deletion
      checkDirectoryListing(fs.getClient().listPaths(s1f1.toString(),
          HdfsFileStatus.EMPTY_NAME).getPartialListing(), COLD);

      // change the storage policy of foo dir
      fs.setStoragePolicy(fooDir, "HOT");
      // /dir/foo is now hot
      dirList = fs.getClient().listPaths(dir.toString(),
          HdfsFileStatus.EMPTY_NAME, true).getPartialListing();
      checkDirectoryListing(dirList, HOT);
      // /dir/foo/f2 is hot
      fooList = fs.getClient().listPaths(fooDir.toString(),
          HdfsFileStatus.EMPTY_NAME).getPartialListing();
      checkDirectoryListing(fooList, HOT);

      // check storage policy of snapshot path
      Path s1 = SnapshotTestHelper.getSnapshotRoot(dir, "s1");
      Path s1foo = SnapshotTestHelper.getSnapshotPath(dir, "s1", "foo");
      checkDirectoryListing(fs.getClient().listPaths(s1.toString(),
          HdfsFileStatus.EMPTY_NAME).getPartialListing(), HOT);
      // /dir/.snapshot/.s1/foo/f1 and /dir/.snapshot/.s1/foo/f2 should still
      // follow the latest
      checkDirectoryListing(fs.getClient().listPaths(s1foo.toString(),
          HdfsFileStatus.EMPTY_NAME).getPartialListing(), COLD, HOT);

      // delete foo
      fs.delete(fooDir, true);
      checkDirectoryListing(fs.getClient().listPaths(s1.toString(),
          HdfsFileStatus.EMPTY_NAME).getPartialListing(), HOT);
      checkDirectoryListing(fs.getClient().listPaths(s1foo.toString(),
          HdfsFileStatus.EMPTY_NAME).getPartialListing(), COLD, HOT);
    } finally {
      cluster.shutdown();
    }
  }

  private static StorageType[][] genStorageTypes(int numDataNodes) {
    StorageType[][] types = new StorageType[numDataNodes][];
    for (int i = 0; i < types.length; i++) {
      types[i] = new StorageType[]{StorageType.DISK, StorageType.ARCHIVE};
    }
    return types;
  }

  private void checkLocatedBlocks(HdfsLocatedFileStatus status, int blockNum,
                                  int replicaNum, StorageType... types) {
    List<StorageType> typeList = Lists.newArrayList();
    Collections.addAll(typeList, types);
    LocatedBlocks lbs = status.getBlockLocations();
    Assert.assertEquals(blockNum, lbs.getLocatedBlocks().size());
    for (LocatedBlock lb : lbs.getLocatedBlocks()) {
      Assert.assertEquals(replicaNum, lb.getStorageTypes().length);
      for (StorageType type : lb.getStorageTypes()) {
        Assert.assertTrue(typeList.remove(type));
      }
    }
    Assert.assertTrue(typeList.isEmpty());
  }

  private void testChangeFileRep(String policyName, byte policyId,
                                   StorageType[] before,
                                   StorageType[] after) throws Exception {
    final int numDataNodes = 5;
    final StorageType[][] types = genStorageTypes(numDataNodes);
    final MiniDFSCluster cluster = new MiniDFSCluster.Builder(conf)
        .numDataNodes(numDataNodes).storageTypes(types).build();
    cluster.waitActive();
    final DistributedFileSystem fs = cluster.getFileSystem();
    try {
      final Path dir = new Path("/test");
      fs.mkdirs(dir);
      fs.setStoragePolicy(dir, policyName);

      final Path foo = new Path(dir, "foo");
      DFSTestUtil.createFile(fs, foo, FILE_LEN, REPLICATION, 0L);

      HdfsFileStatus[] status = fs.getClient().listPaths(foo.toString(),
          HdfsFileStatus.EMPTY_NAME, true).getPartialListing();
      checkDirectoryListing(status, policyId);
      HdfsLocatedFileStatus fooStatus = (HdfsLocatedFileStatus) status[0];
      checkLocatedBlocks(fooStatus, 1, 3, before);

      // change the replication factor to 5
      fs.setReplication(foo, (short) numDataNodes);
      Thread.sleep(1000);
      for (DataNode dn : cluster.getDataNodes()) {
        DataNodeTestUtils.triggerHeartbeat(dn);
      }
      Thread.sleep(1000);
      status = fs.getClient().listPaths(foo.toString(),
          HdfsFileStatus.EMPTY_NAME, true).getPartialListing();
      checkDirectoryListing(status, policyId);
      fooStatus = (HdfsLocatedFileStatus) status[0];
      checkLocatedBlocks(fooStatus, 1, numDataNodes, after);

      // change the replication factor back to 3
      fs.setReplication(foo, REPLICATION);
      Thread.sleep(1000);
      for (DataNode dn : cluster.getDataNodes()) {
        DataNodeTestUtils.triggerHeartbeat(dn);
      }
      Thread.sleep(1000);
      for (DataNode dn : cluster.getDataNodes()) {
        DataNodeTestUtils.triggerBlockReport(dn);
      }
      Thread.sleep(1000);
      status = fs.getClient().listPaths(foo.toString(),
          HdfsFileStatus.EMPTY_NAME, true).getPartialListing();
      checkDirectoryListing(status, policyId);
      fooStatus = (HdfsLocatedFileStatus) status[0];
      checkLocatedBlocks(fooStatus, 1, REPLICATION, before);
    } finally {
      cluster.shutdown();
    }
  }

  /**
   * Consider a File with Hot storage policy. Increase replication factor of
   * that file from 3 to 5. Make sure all replications are created in DISKS.
   */
  @Test
  public void testChangeHotFileRep() throws Exception {
    testChangeFileRep("HOT", HOT,
        new StorageType[]{StorageType.DISK, StorageType.DISK,
            StorageType.DISK},
        new StorageType[]{StorageType.DISK, StorageType.DISK, StorageType.DISK,
            StorageType.DISK, StorageType.DISK});
  }

  /**
   * Consider a File with Warm temperature. Increase replication factor of
   * that file from 3 to 5. Make sure all replicas are created in DISKS
   * and ARCHIVE.
   */
  @Test
  public void testChangeWarmRep() throws Exception {
    testChangeFileRep("WARM", WARM,
        new StorageType[]{StorageType.DISK, StorageType.ARCHIVE,
            StorageType.ARCHIVE},
        new StorageType[]{StorageType.DISK, StorageType.ARCHIVE,
            StorageType.ARCHIVE, StorageType.ARCHIVE, StorageType.ARCHIVE});
  }

  /**
   * Consider a File with Cold temperature. Increase replication factor of
   * that file from 3 to 5. Make sure all replicas are created in ARCHIVE.
   */
  @Test
  public void testChangeColdRep() throws Exception {
    testChangeFileRep("COLD", COLD,
        new StorageType[]{StorageType.ARCHIVE, StorageType.ARCHIVE,
            StorageType.ARCHIVE},
        new StorageType[]{StorageType.ARCHIVE, StorageType.ARCHIVE,
            StorageType.ARCHIVE, StorageType.ARCHIVE, StorageType.ARCHIVE});
  }

  @Test
  public void testChooseTargetWithTopology() throws Exception {
    BlockStoragePolicy policy1 = new BlockStoragePolicy((byte) 9, "TEST1",
        new StorageType[]{StorageType.SSD, StorageType.DISK,
            StorageType.ARCHIVE}, new StorageType[]{}, new StorageType[]{});
    BlockStoragePolicy policy2 = new BlockStoragePolicy((byte) 11, "TEST2",
        new StorageType[]{StorageType.DISK, StorageType.SSD,
            StorageType.ARCHIVE}, new StorageType[]{}, new StorageType[]{});

    final String[] racks = {"/d1/r1", "/d1/r2", "/d1/r2"};
    final String[] hosts = {"host1", "host2", "host3"};
    final StorageType[] types = {StorageType.DISK, StorageType.SSD,
        StorageType.ARCHIVE};

    final DatanodeStorageInfo[] storages = DFSTestUtil
        .createDatanodeStorageInfos(3, racks, hosts, types);
    final DatanodeDescriptor[] dataNodes = DFSTestUtil
        .toDatanodeDescriptor(storages);

    FileSystem.setDefaultUri(conf, "hdfs://localhost:0");
    conf.set(DFSConfigKeys.DFS_NAMENODE_HTTP_ADDRESS_KEY, "0.0.0.0:0");
    File baseDir = PathUtils.getTestDir(TestReplicationPolicy.class);
    conf.set(DFSConfigKeys.DFS_NAMENODE_NAME_DIR_KEY,
        new File(baseDir, "name").getPath());
    DFSTestUtil.formatNameNode(conf);
    NameNode namenode = new NameNode(conf);

    final BlockManager bm = namenode.getNamesystem().getBlockManager();
    BlockPlacementPolicy replicator = bm.getBlockPlacementPolicy();
    NetworkTopology cluster = bm.getDatanodeManager().getNetworkTopology();
    for (DatanodeDescriptor datanode : dataNodes) {
      cluster.add(datanode);
    }

    DatanodeStorageInfo[] targets = replicator.chooseTarget("/foo", 3,
        dataNodes[0], Collections.<DatanodeStorageInfo>emptyList(), false,
        new HashSet<Node>(), 0, policy1);
    System.out.println(Arrays.asList(targets));
    Assert.assertEquals(3, targets.length);
    targets = replicator.chooseTarget("/foo", 3,
        dataNodes[0], Collections.<DatanodeStorageInfo>emptyList(), false,
        new HashSet<Node>(), 0, policy2);
    System.out.println(Arrays.asList(targets));
    Assert.assertEquals(3, targets.length);
  }

  /**
   * Test getting all the storage policies from the namenode
   */
  @Test
  public void testGetAllStoragePolicies() throws Exception {
    final MiniDFSCluster cluster = new MiniDFSCluster.Builder(conf)
        .numDataNodes(0).build();
    cluster.waitActive();
    final DistributedFileSystem fs = cluster.getFileSystem();
    try {
      BlockStoragePolicy[] policies = fs.getStoragePolicySuite();
      Assert.assertEquals(3, policies.length);
      Assert.assertEquals(POLICY_SUITE.getPolicy(COLD).toString(),
          policies[0].toString());
      Assert.assertEquals(POLICY_SUITE.getPolicy(WARM).toString(),
          policies[1].toString());
      Assert.assertEquals(POLICY_SUITE.getPolicy(HOT).toString(),
          policies[2].toString());
    } finally {
      IOUtils.cleanup(null, fs);
      cluster.shutdown();
    }
  }
}
