Merge remote-tracking branch 'apache/trunk' into HDFS-7285

Conflicts:
	hadoop-hdfs-project/hadoop-hdfs-client/src/main/java/org/apache/hadoop/hdfs/DFSOutputStream.java
	hadoop-hdfs-project/hadoop-hdfs-client/src/main/java/org/apache/hadoop/hdfs/DataStreamer.java

Change-Id: I53ec1c426dc988d6c4a2c87b00caef49c4057010
diff --git a/hadoop-common-project/hadoop-common/CHANGES.txt b/hadoop-common-project/hadoop-common/CHANGES.txt
index 2af6580..84535d6 100644
--- a/hadoop-common-project/hadoop-common/CHANGES.txt
+++ b/hadoop-common-project/hadoop-common/CHANGES.txt
@@ -1098,6 +1098,11 @@
     HADOOP-12440. TestRPC#testRPCServerShutdown did not produce the desired
     thread states before shutting down. (Xiao Chen via mingma)
 
+    HADOOP-12447. Clean up some htrace integration issues (cmccabe)
+
+    HADOOP-12448. TestTextCommand: use mkdirs rather than mkdir to create test
+    directory. (Contributed by Colin Patrick McCabe and Chris Nauroth)
+
   OPTIMIZATIONS
 
     HADOOP-12051. ProtobufRpcEngine.invoke() should use Exception.toString()
diff --git a/hadoop-common-project/hadoop-common/src/main/java/org/apache/hadoop/fs/FSOutputSummer.java b/hadoop-common-project/hadoop-common/src/main/java/org/apache/hadoop/fs/FSOutputSummer.java
index 616ff3a..2458b2f 100644
--- a/hadoop-common-project/hadoop-common/src/main/java/org/apache/hadoop/fs/FSOutputSummer.java
+++ b/hadoop-common-project/hadoop-common/src/main/java/org/apache/hadoop/fs/FSOutputSummer.java
@@ -22,7 +22,6 @@
 import org.apache.hadoop.classification.InterfaceStability;
 import org.apache.hadoop.util.DataChecksum;
 import org.apache.htrace.core.TraceScope;
-import org.apache.htrace.core.Tracer;
 
 import java.io.IOException;
 import java.io.OutputStream;
@@ -43,8 +42,6 @@
   private byte checksum[];
   // The number of valid bytes in the buffer.
   private int count;
-  // The HTrace tracer to use
-  private Tracer tracer;
   
   // We want this value to be a multiple of 3 because the native code checksums
   // 3 chunks simultaneously. The chosen value of 9 strikes a balance between
diff --git a/hadoop-common-project/hadoop-common/src/main/java/org/apache/hadoop/util/GSet.java b/hadoop-common-project/hadoop-common/src/main/java/org/apache/hadoop/util/GSet.java
index 26e73cf..e4a8d0f 100644
--- a/hadoop-common-project/hadoop-common/src/main/java/org/apache/hadoop/util/GSet.java
+++ b/hadoop-common-project/hadoop-common/src/main/java/org/apache/hadoop/util/GSet.java
@@ -17,6 +17,8 @@
  */
 package org.apache.hadoop.util;
 
+import java.util.Collection;
+
 import org.apache.commons.logging.Log;
 import org.apache.commons.logging.LogFactory;
 import org.apache.hadoop.classification.InterfaceAudience;
@@ -86,5 +88,17 @@
   */
   E remove(K key);
 
+  /**
+   * Clear the set.
+   */
   void clear();
+
+  /**
+   * Returns a {@link Collection} view of the values contained in this set.
+   * The collection is backed by the set, so changes to the set are
+   * reflected in the collection, and vice-versa.
+   *
+   * @return the collection of values.
+   */
+  Collection<E> values();
 }
diff --git a/hadoop-common-project/hadoop-common/src/main/java/org/apache/hadoop/util/GSetByHashMap.java b/hadoop-common-project/hadoop-common/src/main/java/org/apache/hadoop/util/GSetByHashMap.java
index 87488db..e341c74 100644
--- a/hadoop-common-project/hadoop-common/src/main/java/org/apache/hadoop/util/GSetByHashMap.java
+++ b/hadoop-common-project/hadoop-common/src/main/java/org/apache/hadoop/util/GSetByHashMap.java
@@ -17,6 +17,7 @@
  */
 package org.apache.hadoop.util;
 
+import java.util.Collection;
 import java.util.HashMap;
 import java.util.Iterator;
 
@@ -70,4 +71,9 @@
   public void clear() {
     m.clear();
   }
+
+  @Override
+  public Collection<E> values() {
+    return m.values();
+  }
 }
diff --git a/hadoop-common-project/hadoop-common/src/main/java/org/apache/hadoop/util/LightWeightGSet.java b/hadoop-common-project/hadoop-common/src/main/java/org/apache/hadoop/util/LightWeightGSet.java
index 1767d85..7c7878a 100644
--- a/hadoop-common-project/hadoop-common/src/main/java/org/apache/hadoop/util/LightWeightGSet.java
+++ b/hadoop-common-project/hadoop-common/src/main/java/org/apache/hadoop/util/LightWeightGSet.java
@@ -18,12 +18,14 @@
 package org.apache.hadoop.util;
 
 import java.io.PrintStream;
+import java.util.AbstractCollection;
+import java.util.Arrays;
+import java.util.Collection;
 import java.util.ConcurrentModificationException;
 import java.util.Iterator;
 
 import org.apache.hadoop.HadoopIllegalArgumentException;
 import org.apache.hadoop.classification.InterfaceAudience;
-import org.apache.hadoop.util.StringUtils;
 
 import com.google.common.annotations.VisibleForTesting;
 
@@ -49,12 +51,12 @@
   /**
    * Elements of {@link LightWeightGSet}.
    */
-  public static interface LinkedElement {
+  public interface LinkedElement {
     /** Set the next element. */
-    public void setNext(LinkedElement next);
+    void setNext(LinkedElement next);
 
     /** Get the next element. */
-    public LinkedElement getNext();
+    LinkedElement getNext();
   }
 
   static final int MAX_ARRAY_LENGTH = 1 << 30; //prevent int overflow problem
@@ -64,15 +66,20 @@
    * An internal array of entries, which are the rows of the hash table.
    * The size must be a power of two.
    */
-  private final LinkedElement[] entries;
+  protected LinkedElement[] entries;
   /** A mask for computing the array index from the hash value of an element. */
-  private final int hash_mask;
+  protected int hash_mask;
   /** The size of the set (not the entry array). */
-  private int size = 0;
+  protected int size = 0;
   /** Modification version for fail-fast.
    * @see ConcurrentModificationException
    */
-  private int modification = 0;
+  protected int modification = 0;
+
+  private Collection<E> values;
+
+  protected LightWeightGSet() {
+  }
 
   /**
    * @param recommended_length Recommended size of the internal array.
@@ -87,7 +94,7 @@
   }
 
   //compute actual length
-  private static int actualArrayLength(int recommended) {
+  protected static int actualArrayLength(int recommended) {
     if (recommended > MAX_ARRAY_LENGTH) {
       return MAX_ARRAY_LENGTH;
     } else if (recommended < MIN_ARRAY_LENGTH) {
@@ -103,11 +110,11 @@
     return size;
   }
 
-  private int getIndex(final K key) {
+  protected int getIndex(final K key) {
     return key.hashCode() & hash_mask;
   }
 
-  private E convert(final LinkedElement e){
+  protected E convert(final LinkedElement e){
     @SuppressWarnings("unchecked")
     final E r = (E)e;
     return r;
@@ -138,24 +145,26 @@
 
   @Override
   public E put(final E element) {
-    //validate element
+    // validate element
     if (element == null) {
       throw new NullPointerException("Null element is not supported.");
     }
-    if (!(element instanceof LinkedElement)) {
+    LinkedElement e = null;
+    try {
+      e = (LinkedElement)element;
+    } catch (ClassCastException ex) {
       throw new HadoopIllegalArgumentException(
           "!(element instanceof LinkedElement), element.getClass()="
           + element.getClass());
     }
-    final LinkedElement e = (LinkedElement)element;
 
-    //find index
+    // find index
     final int index = getIndex(element);
 
-    //remove if it already exists
+    // remove if it already exists
     final E existing = remove(index, element);
 
-    //insert the element to the head of the linked list
+    // insert the element to the head of the linked list
     modification++;
     size++;
     e.setNext(entries[index]);
@@ -171,7 +180,7 @@
    * @return If such element exists, return it.
    *         Otherwise, return null.
    */
-  private E remove(final int index, final K key) {
+  protected E remove(final int index, final K key) {
     if (entries[index] == null) {
       return null;
     } else if (entries[index].equals(key)) {
@@ -214,6 +223,38 @@
   }
 
   @Override
+  public Collection<E> values() {
+    if (values == null) {
+      values = new Values();
+    }
+    return values;
+  }
+
+  private final class Values extends AbstractCollection<E> {
+
+    @Override
+    public Iterator<E> iterator() {
+      return LightWeightGSet.this.iterator();
+    }
+
+    @Override
+    public int size() {
+      return size;
+    }
+
+    @SuppressWarnings("unchecked")
+    @Override
+    public boolean contains(Object o) {
+      return LightWeightGSet.this.contains((K)o);
+    }
+
+    @Override
+    public void clear() {
+      LightWeightGSet.this.clear();
+    }
+  }
+
+  @Override
   public Iterator<E> iterator() {
     return new SetIterator();
   }
@@ -363,9 +404,8 @@
   }
   
   public void clear() {
-    for (int i = 0; i < entries.length; i++) {
-      entries[i] = null;
-    }
+    modification++;
+    Arrays.fill(entries, null);
     size = 0;
   }
 }
diff --git a/hadoop-common-project/hadoop-common/src/main/java/org/apache/hadoop/util/LightWeightResizableGSet.java b/hadoop-common-project/hadoop-common/src/main/java/org/apache/hadoop/util/LightWeightResizableGSet.java
new file mode 100644
index 0000000..0abcf98
--- /dev/null
+++ b/hadoop-common-project/hadoop-common/src/main/java/org/apache/hadoop/util/LightWeightResizableGSet.java
@@ -0,0 +1,129 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.hadoop.util;
+
+import org.apache.hadoop.HadoopIllegalArgumentException;
+import org.apache.hadoop.classification.InterfaceAudience;
+
+/**
+ * A low memory footprint {@link GSet} implementation,
+ * which uses an array for storing the elements
+ * and linked lists for collision resolution.
+ *
+ * If the size of elements exceeds the threshold,
+ * the internal array will be resized to double length.
+ *
+ * This class does not support null element.
+ *
+ * This class is not thread safe.
+ *
+ * @param <K> Key type for looking up the elements
+ * @param <E> Element type, which must be
+ *       (1) a subclass of K, and
+ *       (2) implementing {@link LinkedElement} interface.
+ */
+@InterfaceAudience.Private
+public class LightWeightResizableGSet<K, E extends K>
+    extends LightWeightGSet<K, E> {
+
+  /**
+   * The default initial capacity - MUST be a power of two.
+   */
+  static final int DEFAULT_INITIAL_CAPACITY = 1 << 4;
+
+  /**
+   * The load factor used when none specified in constructor.
+   */
+  static final float DEFAULT_LOAD_FACTOR = 0.75f;
+
+  /** Size of the entry table. */
+  private int capacity;
+
+  /**
+   * The load factor for the hash set.
+   */
+  private final float loadFactor;
+  private int threshold;
+
+  public LightWeightResizableGSet(int initCapacity, float loadFactor) {
+    if (initCapacity < 0) {
+      throw new HadoopIllegalArgumentException("Illegal initial capacity: " +
+          initCapacity);
+    }
+    if (loadFactor <= 0 || loadFactor > 1.0f) {
+      throw new HadoopIllegalArgumentException("Illegal load factor: " +
+          loadFactor);
+    }
+    this.capacity = actualArrayLength(initCapacity);
+    this.hash_mask = capacity - 1;
+    this.loadFactor = loadFactor;
+    this.threshold = (int) (capacity * loadFactor);
+
+    entries = new LinkedElement[capacity];
+  }
+
+  public LightWeightResizableGSet() {
+    this(DEFAULT_INITIAL_CAPACITY, DEFAULT_LOAD_FACTOR);
+  }
+
+  public LightWeightResizableGSet(int initCapacity) {
+    this(initCapacity, DEFAULT_LOAD_FACTOR);
+  }
+
+  @Override
+  public E put(final E element) {
+    E existing = super.put(element);
+    expandIfNecessary();
+    return existing;
+  }
+
+  /**
+   * Resize the internal table to given capacity.
+   */
+  @SuppressWarnings("unchecked")
+  protected void resize(int cap) {
+    int newCapacity = actualArrayLength(cap);
+    if (newCapacity == this.capacity) {
+      return;
+    }
+    this.capacity = newCapacity;
+    this.threshold = (int) (capacity * loadFactor);
+    this.hash_mask = capacity - 1;
+    LinkedElement[] oldEntries = entries;
+    entries = new LinkedElement[capacity];
+    for (int i = 0; i < oldEntries.length; i++) {
+      LinkedElement e = oldEntries[i];
+      while (e != null) {
+        LinkedElement next = e.getNext();
+        int index = getIndex((E)e);
+        e.setNext(entries[index]);
+        entries[index] = e;
+        e = next;
+      }
+    }
+  }
+
+  /**
+   * Checks if we need to expand, and expands if necessary.
+   */
+  protected void expandIfNecessary() {
+    if (size > this.threshold && capacity < MAX_ARRAY_LENGTH) {
+      resize(capacity * 2);
+    }
+  }
+}
\ No newline at end of file
diff --git a/hadoop-common-project/hadoop-common/src/site/markdown/Tracing.md b/hadoop-common-project/hadoop-common/src/site/markdown/Tracing.md
index 7897855..4cc6a07 100644
--- a/hadoop-common-project/hadoop-common/src/site/markdown/Tracing.md
+++ b/hadoop-common-project/hadoop-common/src/site/markdown/Tracing.md
@@ -85,8 +85,8 @@
 You need to specify the class name of span receiver as argument of `-class` option.
 You can specify the configuration associated with span receiver by `-Ckey=value` options.
 
-      $ hadoop trace -add -class LocalFileSpanReceiver -Cdfs.htrace.local-file-span-receiver.path=/tmp/htrace.out -host 192.168.56.2:9000
-      Added trace span receiver 2 with configuration dfs.htrace.local-file-span-receiver.path = /tmp/htrace.out
+      $ hadoop trace -add -class org.apache.htrace.core.LocalFileSpanReceiver -Chadoop.htrace.local.file.span.receiver.path=/tmp/htrace.out -host 192.168.56.2:9000
+      Added trace span receiver 2 with configuration hadoop.htrace.local.file.span.receiver.path = /tmp/htrace.out
 
       $ hadoop trace -list -host 192.168.56.2:9000
       ID  CLASS
@@ -137,8 +137,7 @@
         FsShell shell = new FsShell();
         conf.setQuietMode(false);
         shell.setConf(conf);
-        Tracer tracer = new Tracer.Builder().
-            name("TracingFsShell).
+        Tracer tracer = new Tracer.Builder("TracingFsShell").
             conf(TraceUtils.wrapHadoopConf("tracing.fs.shell.htrace.", conf)).
             build();
         int res = 0;
@@ -177,15 +176,15 @@
 
 ```xml
       <property>
-        <name>dfs.client.htrace.spanreceiver.classes</name>
+        <name>hadoop.htrace.span.receiver.classes</name>
         <value>LocalFileSpanReceiver</value>
       </property>
       <property>
-        <name>dfs.client.htrace.sampler</name>
+        <name>fs.client.htrace.sampler.classes</name>
         <value>ProbabilitySampler</value>
       </property>
       <property>
-        <name>dfs.client.htrace.sampler.fraction</name>
-        <value>0.5</value>
+        <name>fs.client.htrace.sampler.fraction</name>
+        <value>0.01</value>
       </property>
 ```
diff --git a/hadoop-common-project/hadoop-common/src/test/java/org/apache/hadoop/fs/shell/TestTextCommand.java b/hadoop-common-project/hadoop-common/src/test/java/org/apache/hadoop/fs/shell/TestTextCommand.java
index 70a2f03..0e33d6a 100644
--- a/hadoop-common-project/hadoop-common/src/test/java/org/apache/hadoop/fs/shell/TestTextCommand.java
+++ b/hadoop-common-project/hadoop-common/src/test/java/org/apache/hadoop/fs/shell/TestTextCommand.java
@@ -22,11 +22,13 @@
 
 import java.io.File;
 import java.io.FileOutputStream;
-import java.io.InputStream;
 import java.io.IOException;
+import java.io.InputStream;
 import java.io.StringWriter;
 import java.lang.reflect.Method;
 import java.net.URI;
+import java.nio.file.Files;
+import java.nio.file.Paths;
 
 import org.apache.commons.io.IOUtils;
 import org.apache.hadoop.conf.Configuration;
@@ -38,12 +40,13 @@
  * by the Text command.
  */
 public class TestTextCommand {
-  private static final String TEST_ROOT_DIR =
-    System.getProperty("test.build.data", "build/test/data/") + "/testText";
+  private static final File TEST_ROOT_DIR =
+    Paths.get(System.getProperty("test.build.data", "build/test/data"),
+        "testText").toFile();
   private static final String AVRO_FILENAME =
-    new Path(TEST_ROOT_DIR, "weather.avro").toUri().getPath();
+    new File(TEST_ROOT_DIR, "weather.avro").toURI().getPath();
   private static final String TEXT_FILENAME =
-    new Path(TEST_ROOT_DIR, "testtextfile.txt").toUri().getPath();
+    new File(TEST_ROOT_DIR, "testtextfile.txt").toURI().getPath();
 
   /**
    * Tests whether binary Avro data files are displayed correctly.
@@ -126,7 +129,7 @@
   }
 
   private void createFile(String fileName, byte[] contents) throws IOException {
-    (new File(TEST_ROOT_DIR)).mkdir();
+    Files.createDirectories(TEST_ROOT_DIR.toPath());
     File file = new File(fileName);
     file.createNewFile();
     FileOutputStream stream = new FileOutputStream(file);
diff --git a/hadoop-common-project/hadoop-common/src/test/java/org/apache/hadoop/util/TestGSet.java b/hadoop-common-project/hadoop-common/src/test/java/org/apache/hadoop/util/TestGSet.java
index af880ee..2d39f3d 100644
--- a/hadoop-common-project/hadoop-common/src/test/java/org/apache/hadoop/util/TestGSet.java
+++ b/hadoop-common-project/hadoop-common/src/test/java/org/apache/hadoop/util/TestGSet.java
@@ -17,6 +17,7 @@
  */
 package org.apache.hadoop.util;
 
+import java.util.Collection;
 import java.util.ConcurrentModificationException;
 import java.util.Iterator;
 import java.util.Random;
@@ -41,10 +42,15 @@
 
   @Test
   public void testExceptionCases() {
+    testExceptionCases(false);
+    testExceptionCases(true);
+  }
+
+  private void testExceptionCases(boolean resizable) {
     {
       //test contains
       final LightWeightGSet<Integer, Integer> gset
-        = new LightWeightGSet<Integer, Integer>(16);
+        = createGSet(16, resizable);
       try {
         //test contains with a null element
         gset.contains(null);
@@ -57,7 +63,7 @@
     {
       //test get
       final LightWeightGSet<Integer, Integer> gset
-        = new LightWeightGSet<Integer, Integer>(16);
+        = createGSet(16, resizable);
       try {
         //test get with a null element
         gset.get(null);
@@ -70,7 +76,7 @@
     {
       //test put
       final LightWeightGSet<Integer, Integer> gset
-        = new LightWeightGSet<Integer, Integer>(16);
+        = createGSet(16, resizable);
       try {
         //test put with a null element
         gset.put(null);
@@ -97,7 +103,7 @@
       for(int v = 1; v < data.length-1; v++) {
         {
           //test remove while iterating
-          final GSet<IntElement, IntElement> gset = createGSet(data);
+          final GSet<IntElement, IntElement> gset = createGSet(data, resizable);
           for(IntElement i : gset) {
             if (i.value == v) {
               //okay because data[0] is not in gset
@@ -120,7 +126,7 @@
 
         {
           //test put new element while iterating
-          final GSet<IntElement, IntElement> gset = createGSet(data);
+          final GSet<IntElement, IntElement> gset = createGSet(data, resizable);
           try {
             for(IntElement i : gset) {
               if (i.value == v) {
@@ -135,7 +141,7 @@
 
         {
           //test put existing element while iterating
-          final GSet<IntElement, IntElement> gset = createGSet(data);
+          final GSet<IntElement, IntElement> gset = createGSet(data, resizable);
           try {
             for(IntElement i : gset) {
               if (i.value == v) {
@@ -151,9 +157,17 @@
     }
   }
 
-  private static GSet<IntElement, IntElement> createGSet(final IntElement[] data) {
+  private static LightWeightGSet<Integer, Integer> createGSet(
+      int size, boolean resizable) {
+    return resizable ? new LightWeightResizableGSet<Integer, Integer>(size) :
+      new LightWeightGSet<Integer, Integer>(size);
+  }
+
+  private static GSet<IntElement, IntElement> createGSet(
+      final IntElement[] data, boolean resizable) {
     final GSet<IntElement, IntElement> gset
-      = new LightWeightGSet<IntElement, IntElement>(8);
+      = resizable ? new LightWeightResizableGSet<IntElement, IntElement>(8) :
+        new LightWeightGSet<IntElement, IntElement>(8);
     for(int i = 1; i < data.length; i++) {
       gset.put(data[i]);
     }
@@ -168,6 +182,14 @@
     check(new GSetTestCase(255, 1 << 10, 65537));
   }
 
+  @Test
+  public void testResizableGSet() {
+    //The parameters are: table length, data size, modulus, resizable.
+    check(new GSetTestCase(1, 1 << 4, 65537, true));
+    check(new GSetTestCase(17, 1 << 16, 17, true));
+    check(new GSetTestCase(255, 1 << 10, 65537, true));
+  }
+
   /**
    * A long running test with various data sets and parameters.
    * It may take ~5 hours, 
@@ -177,14 +199,25 @@
   //@Test
   public void runMultipleTestGSet() {
     for(int offset = -2; offset <= 2; offset++) {
-      runTestGSet(1, offset);
+      runTestGSet(1, offset, false);
       for(int i = 1; i < Integer.SIZE - 1; i++) {
-        runTestGSet((1 << i) + 1, offset);
+        runTestGSet((1 << i) + 1, offset, false);
       }
     }
   }
 
-  private static void runTestGSet(final int modulus, final int offset) {
+  //@Test
+  public void runMultipleTestResizableGSet() {
+    for(int offset = -2; offset <= 2; offset++) {
+      runTestGSet(1, offset, true);
+      for(int i = 1; i < Integer.SIZE - 1; i++) {
+        runTestGSet((1 << i) + 1, offset, true);
+      }
+    }
+  }
+
+  private static void runTestGSet(final int modulus, final int offset,
+      boolean resizable) {
     println("\n\nmodulus=" + modulus + ", offset=" + offset);
     for(int i = 0; i <= 16; i += 4) {
       final int tablelength = (1 << i) + offset;
@@ -194,7 +227,7 @@
 
       for(int j = 0; j <= upper; j += steps) {
         final int datasize = 1 << j;
-        check(new GSetTestCase(tablelength, datasize, modulus));
+        check(new GSetTestCase(tablelength, datasize, modulus, resizable));
       }
     }
   }
@@ -265,6 +298,10 @@
     int contain_count = 0;
 
     GSetTestCase(int tablelength, int datasize, int modulus) {
+      this(tablelength, datasize, modulus, false);
+    }
+
+    GSetTestCase(int tablelength, int datasize, int modulus, boolean resizable) {
       denominator = Math.min((datasize >> 7) + 1, 1 << 16);
       info = getClass().getSimpleName()
           + ": tablelength=" + tablelength
@@ -274,7 +311,8 @@
       println(info);
 
       data  = new IntData(datasize, modulus);
-      gset = new LightWeightGSet<IntElement, IntElement>(tablelength);
+      gset = resizable ? new LightWeightResizableGSet<IntElement, IntElement>() :
+        new LightWeightGSet<IntElement, IntElement>(tablelength);
 
       Assert.assertEquals(0, gset.size());
     }
@@ -392,6 +430,11 @@
       gset.clear();
       Assert.assertEquals(0, size());
     }
+
+    @Override
+    public Collection<IntElement> values() {
+      throw new UnsupportedOperationException();
+    }
   }
 
   /** Test data set */
diff --git a/hadoop-common-project/hadoop-common/src/test/java/org/apache/hadoop/util/TestLightWeightCache.java b/hadoop-common-project/hadoop-common/src/test/java/org/apache/hadoop/util/TestLightWeightCache.java
index 68d484f..dff6937 100644
--- a/hadoop-common-project/hadoop-common/src/test/java/org/apache/hadoop/util/TestLightWeightCache.java
+++ b/hadoop-common-project/hadoop-common/src/test/java/org/apache/hadoop/util/TestLightWeightCache.java
@@ -17,6 +17,7 @@
  */
 package org.apache.hadoop.util;
 
+import java.util.Collection;
 import java.util.Date;
 import java.util.Iterator;
 import java.util.Random;
@@ -379,6 +380,11 @@
       cache.clear();
       Assert.assertEquals(0, size());
     }
+
+    @Override
+    public Collection<IntEntry> values() {
+      throw new UnsupportedOperationException();
+    }
   }
 
   private static class IntData {
diff --git a/hadoop-common-project/hadoop-common/src/test/java/org/apache/hadoop/util/TestLightWeightResizableGSet.java b/hadoop-common-project/hadoop-common/src/test/java/org/apache/hadoop/util/TestLightWeightResizableGSet.java
new file mode 100644
index 0000000..3250092
--- /dev/null
+++ b/hadoop-common-project/hadoop-common/src/test/java/org/apache/hadoop/util/TestLightWeightResizableGSet.java
@@ -0,0 +1,252 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.hadoop.util;
+
+import java.util.Collection;
+import java.util.HashSet;
+import java.util.Iterator;
+import java.util.Random;
+import java.util.Set;
+
+import org.apache.commons.logging.Log;
+import org.apache.commons.logging.LogFactory;
+import org.junit.Test;
+
+import static org.junit.Assert.*;
+
+/** Testing {@link LightWeightResizableGSet} */
+public class TestLightWeightResizableGSet {
+  public static final Log LOG = LogFactory.getLog(TestLightWeightResizableGSet.class);
+  private Random random = new Random();
+
+  private TestElement[] generateElements(int length) {
+    TestElement[] elements = new TestElement[length];
+    Set<Long> keys = new HashSet<>();
+    long k = 0;
+    for (int i = 0; i < length; i++) {
+      while (keys.contains(k = random.nextLong()));
+      elements[i] = new TestElement(k, random.nextLong());
+      keys.add(k);
+    }
+    return elements;
+  }
+
+  private TestKey[] getKeys(TestElement[] elements) {
+    TestKey[] keys = new TestKey[elements.length];
+    for (int i = 0; i < elements.length; i++) {
+      keys[i] = new TestKey(elements[i].getKey());
+    }
+    return keys;
+  }
+
+  private TestElement[] generateElements(TestKey[] keys) {
+    TestElement[] elements = new TestElement[keys.length];
+    for (int i = 0; i < keys.length; i++) {
+      elements[i] = new TestElement(keys[i], random.nextLong());
+    }
+    return elements;
+  }
+
+  private static class TestKey {
+    private final long key;
+
+    TestKey(long key) {
+      this.key = key;
+    }
+
+    TestKey(TestKey other) {
+      this.key = other.key;
+    }
+
+    long getKey() {
+      return key;
+    }
+
+    @Override
+    public int hashCode() {
+      return (int)(key^(key>>>32));
+    }
+
+    @Override
+    public boolean equals(Object o) {
+      if (this == o) {
+        return true;
+      }
+      if (!(o instanceof TestKey)) {
+        return false;
+      }
+      TestKey other = (TestKey)o;
+      return key == other.key;
+    }
+  }
+
+  private static class TestElement extends TestKey
+      implements LightWeightResizableGSet.LinkedElement {
+    private final long data;
+    private LightWeightResizableGSet.LinkedElement next;
+
+    TestElement(long key, long data) {
+      super(key);
+      this.data = data;
+    }
+
+    TestElement(TestKey key, long data) {
+      super(key);
+      this.data = data;
+    }
+
+    long getData() {
+      return data;
+    }
+
+    @Override
+    public void setNext(LightWeightResizableGSet.LinkedElement next) {
+      this.next = next;
+    }
+
+    @Override
+    public LightWeightResizableGSet.LinkedElement getNext() {
+      return next;
+    }
+  }
+
+  @Test(timeout = 60000)
+  public void testBasicOperations() {
+    TestElement[] elements = generateElements(1 << 16);
+    final LightWeightResizableGSet<TestKey, TestElement> set =
+        new LightWeightResizableGSet<TestKey, TestElement>();
+
+    assertEquals(set.size(), 0);
+
+    // put all elements
+    for (int i = 0; i < elements.length; i++) {
+      TestElement element = set.put(elements[i]);
+      assertTrue(element == null);
+    }
+
+    // check the set size
+    assertEquals(set.size(), elements.length);
+
+    // check all elements exist in the set and the data is correct
+    for (int i = 0; i < elements.length; i++) {
+      assertTrue(set.contains(elements[i]));
+
+      TestElement element = set.get(elements[i]);
+      assertEquals(elements[i].getData(), element.getData());
+    }
+
+    TestKey[] keys = getKeys(elements);
+    // generate new elements with same key, but new data
+    TestElement[] newElements = generateElements(keys);
+    // update the set
+    for (int i = 0; i < newElements.length; i++) {
+      TestElement element = set.put(newElements[i]);
+      assertTrue(element != null);
+    }
+
+    // check the set size
+    assertEquals(set.size(), elements.length);
+
+    // check all elements exist in the set and the data is updated to new value
+    for (int i = 0; i < keys.length; i++) {
+      assertTrue(set.contains(keys[i]));
+
+      TestElement element = set.get(keys[i]);
+      assertEquals(newElements[i].getData(), element.getData());
+    }
+
+    // test LightWeightHashGSet#values
+    Collection<TestElement> cElements = set.values();
+    assertEquals(cElements.size(), elements.length);
+    for (TestElement element : cElements) {
+      assertTrue(set.contains(element));
+    }
+
+    // remove elements
+    for (int i = 0; i < keys.length; i++) {
+      TestElement element = set.remove(keys[i]);
+
+      assertTrue(element != null);
+
+      // the element should not exist after remove
+      assertFalse(set.contains(keys[i]));
+    }
+
+    // check the set size
+    assertEquals(set.size(), 0);
+  }
+
+  @Test(timeout = 60000)
+  public void testRemoveAll() {
+    TestElement[] elements = generateElements(1 << 16);
+    final LightWeightResizableGSet<TestKey, TestElement> set =
+        new LightWeightResizableGSet<TestKey, TestElement>();
+
+    assertEquals(set.size(), 0);
+
+    // put all elements
+    for (int i = 0; i < elements.length; i++) {
+      TestElement element = set.put(elements[i]);
+      assertTrue(element == null);
+    }
+
+    // check the set size
+    assertEquals(set.size(), elements.length);
+
+    // remove all through clear
+    {
+      set.clear();
+      assertEquals(set.size(), 0);
+
+      // check all elements removed
+      for (int i = 0; i < elements.length; i++) {
+        assertFalse(set.contains(elements[i]));
+      }
+      assertFalse(set.iterator().hasNext());
+    }
+
+    // put all elements back
+    for (int i = 0; i < elements.length; i++) {
+      TestElement element = set.put(elements[i]);
+      assertTrue(element == null);
+    }
+
+    // remove all through iterator
+    {
+      for (Iterator<TestElement> iter = set.iterator(); iter.hasNext(); ) {
+        TestElement element = iter.next();
+        // element should be there before removing
+        assertTrue(set.contains(element));
+        iter.remove();
+        // element should not be there now
+        assertFalse(set.contains(element));
+      }
+
+      // the deleted elements should not be there
+      for (int i = 0; i < elements.length; i++) {
+        assertFalse(set.contains(elements[i]));
+      }
+
+      // iterator should not have next
+      assertFalse(set.iterator().hasNext());
+
+      // check the set size
+      assertEquals(set.size(), 0);
+    }
+  }
+}
diff --git a/hadoop-hdfs-project/hadoop-hdfs-client/src/main/java/org/apache/hadoop/hdfs/BlockReaderFactory.java b/hadoop-hdfs-project/hadoop-hdfs-client/src/main/java/org/apache/hadoop/hdfs/BlockReaderFactory.java
index 4f37090..f249692 100644
--- a/hadoop-hdfs-project/hadoop-hdfs-client/src/main/java/org/apache/hadoop/hdfs/BlockReaderFactory.java
+++ b/hadoop-hdfs-project/hadoop-hdfs-client/src/main/java/org/apache/hadoop/hdfs/BlockReaderFactory.java
@@ -349,17 +349,13 @@
       if (clientContext.getUseLegacyBlockReaderLocal()) {
         reader = getLegacyBlockReaderLocal();
         if (reader != null) {
-          if (LOG.isTraceEnabled()) {
-            LOG.trace(this + ": returning new legacy block reader local.");
-          }
+          LOG.trace("{}: returning new legacy block reader local.", this);
           return reader;
         }
       } else {
         reader = getBlockReaderLocal();
         if (reader != null) {
-          if (LOG.isTraceEnabled()) {
-            LOG.trace(this + ": returning new block reader local.");
-          }
+          LOG.trace("{}: returning new block reader local.", this);
           return reader;
         }
       }
@@ -367,10 +363,8 @@
     if (scConf.isDomainSocketDataTraffic()) {
       reader = getRemoteBlockReaderFromDomain();
       if (reader != null) {
-        if (LOG.isTraceEnabled()) {
-          LOG.trace(this + ": returning new remote block reader using " +
-              "UNIX domain socket on " + pathInfo.getPath());
-        }
+        LOG.trace("{}: returning new remote block reader using UNIX domain "
+            + "socket on {}", this, pathInfo.getPath());
         return reader;
       }
     }
@@ -405,10 +399,8 @@
             setVisibleLength(visibleLength).
             build();
         if (accessor == null) {
-          if (LOG.isTraceEnabled()) {
-            LOG.trace(this + ": No ReplicaAccessor created by " +
-                cls.getName());
-          }
+          LOG.trace("{}: No ReplicaAccessor created by {}",
+              this, cls.getName());
         } else {
           return new ExternalBlockReader(accessor, visibleLength, startOffset);
         }
@@ -427,14 +419,10 @@
    * first introduced in HDFS-2246.
    */
   private BlockReader getLegacyBlockReaderLocal() throws IOException {
-    if (LOG.isTraceEnabled()) {
-      LOG.trace(this + ": trying to construct BlockReaderLocalLegacy");
-    }
+    LOG.trace("{}: trying to construct BlockReaderLocalLegacy", this);
     if (!DFSUtilClient.isLocalAddress(inetSocketAddress)) {
-      if (LOG.isTraceEnabled()) {
-        LOG.trace(this + ": can't construct BlockReaderLocalLegacy because " +
-            "the address " + inetSocketAddress + " is not local");
-      }
+      LOG.trace("{}: can't construct BlockReaderLocalLegacy because the address"
+          + "{} is not local", this, inetSocketAddress);
       return null;
     }
     if (clientContext.getDisableLegacyBlockReaderLocal()) {
@@ -470,10 +458,8 @@
   }
 
   private BlockReader getBlockReaderLocal() throws InvalidToken {
-    if (LOG.isTraceEnabled()) {
-      LOG.trace(this + ": trying to construct a BlockReaderLocal " +
-          "for short-circuit reads.");
-    }
+    LOG.trace("{}: trying to construct a BlockReaderLocal for short-circuit "
+        + " reads.", this);
     if (pathInfo == null) {
       pathInfo = clientContext.getDomainSocketFactory()
           .getPathInfo(inetSocketAddress, conf.getShortCircuitConf());
@@ -488,10 +474,8 @@
     ShortCircuitReplicaInfo info = cache.fetchOrCreate(key, this);
     InvalidToken exc = info.getInvalidTokenException();
     if (exc != null) {
-      if (LOG.isTraceEnabled()) {
-        LOG.trace(this + ": got InvalidToken exception while trying to " +
-            "construct BlockReaderLocal via " + pathInfo.getPath());
-      }
+      LOG.trace("{}: got InvalidToken exception while trying to construct "
+          + "BlockReaderLocal via {}", this, pathInfo.getPath());
       throw exc;
     }
     if (info.getReplica() == null) {
@@ -527,9 +511,7 @@
         createShortCircuitReplicaInfoCallback.createShortCircuitReplicaInfo();
       if (info != null) return info;
     }
-    if (LOG.isTraceEnabled()) {
-      LOG.trace(this + ": trying to create ShortCircuitReplicaInfo.");
-    }
+    LOG.trace("{}: trying to create ShortCircuitReplicaInfo.", this);
     BlockReaderPeer curPeer;
     while (true) {
       curPeer = nextDomainPeer();
@@ -544,10 +526,8 @@
             new ExtendedBlockId(block.getBlockId(), block.getBlockPoolId()),
             clientName);
         if (usedPeer.booleanValue()) {
-          if (LOG.isTraceEnabled()) {
-            LOG.trace(this + ": allocShmSlot used up our previous socket " +
-              peer.getDomainSocket() + ".  Allocating a new one...");
-          }
+          LOG.trace("{}: allocShmSlot used up our previous socket {}.  "
+              + "Allocating a new one...", this, peer.getDomainSocket());
           curPeer = nextDomainPeer();
           if (curPeer == null) break;
           peer = (DomainPeer)curPeer.peer;
@@ -562,9 +542,7 @@
         if (curPeer.fromCache) {
           // Handle an I/O error we got when using a cached socket.
           // These are considered less serious, because the socket may be stale.
-          if (LOG.isDebugEnabled()) {
-            LOG.debug(this + ": closing stale domain peer " + peer, e);
-          }
+          LOG.debug("{}: closing stale domain peer {}", this, peer, e);
           IOUtilsClient.cleanup(LOG, peer);
         } else {
           // Handle an I/O error we got when using a newly created socket.
@@ -617,7 +595,7 @@
         ExtendedBlockId key =
             new ExtendedBlockId(block.getBlockId(), block.getBlockPoolId());
         if (buf[0] == USE_RECEIPT_VERIFICATION.getNumber()) {
-          LOG.trace("Sending receipt verification byte for slot " + slot);
+          LOG.trace("Sending receipt verification byte for slot {}", slot);
           sock.getOutputStream().write(0);
         }
         replica = new ShortCircuitReplica(key, fis[0], fis[1], cache,
@@ -650,9 +628,7 @@
       String msg = "access control error while " +
           "attempting to set up short-circuit access to " +
           fileName + resp.getMessage();
-      if (LOG.isDebugEnabled()) {
-        LOG.debug(this + ":" + msg);
-      }
+      LOG.debug("{}:{}", this, msg);
       return new ShortCircuitReplicaInfo(new InvalidToken(msg));
     default:
       LOG.warn(this + ": unknown response code " + resp.getStatus() +
@@ -684,10 +660,8 @@
            " is not usable.", this, pathInfo);
       return null;
     }
-    if (LOG.isTraceEnabled()) {
-      LOG.trace(this + ": trying to create a remote block reader from the " +
-          "UNIX domain socket at " + pathInfo.getPath());
-    }
+    LOG.trace("{}: trying to create a remote block reader from the UNIX domain "
+        + "socket at {}", this, pathInfo.getPath());
 
     while (true) {
       BlockReaderPeer curPeer = nextDomainPeer();
@@ -701,19 +675,15 @@
       } catch (IOException ioe) {
         IOUtilsClient.cleanup(LOG, peer);
         if (isSecurityException(ioe)) {
-          if (LOG.isTraceEnabled()) {
-            LOG.trace(this + ": got security exception while constructing " +
-                "a remote block reader from the unix domain socket at " +
-                pathInfo.getPath(), ioe);
-          }
+          LOG.trace("{}: got security exception while constructing a remote "
+                  + " block reader from the unix domain socket at {}",
+              this, pathInfo.getPath(), ioe);
           throw ioe;
         }
         if (curPeer.fromCache) {
           // Handle an I/O error we got when using a cached peer.  These are
           // considered less serious, because the underlying socket may be stale.
-          if (LOG.isDebugEnabled()) {
-            LOG.debug("Closed potentially stale domain peer " + peer, ioe);
-          }
+          LOG.debug("Closed potentially stale domain peer {}", peer, ioe);
         } else {
           // Handle an I/O error we got when using a newly created domain peer.
           // We temporarily disable the domain socket path for a few minutes in
@@ -747,10 +717,8 @@
    *             If there was another problem.
    */
   private BlockReader getRemoteBlockReaderFromTcp() throws IOException {
-    if (LOG.isTraceEnabled()) {
-      LOG.trace(this + ": trying to create a remote block reader from a " +
-          "TCP socket");
-    }
+    LOG.trace("{}: trying to create a remote block reader from a TCP socket",
+        this);
     BlockReader blockReader = null;
     while (true) {
       BlockReaderPeer curPeer = null;
@@ -763,19 +731,15 @@
         return blockReader;
       } catch (IOException ioe) {
         if (isSecurityException(ioe)) {
-          if (LOG.isTraceEnabled()) {
-            LOG.trace(this + ": got security exception while constructing " +
-                "a remote block reader from " + peer, ioe);
-          }
+          LOG.trace("{}: got security exception while constructing a remote "
+              + "block reader from {}", this, peer, ioe);
           throw ioe;
         }
         if ((curPeer != null) && curPeer.fromCache) {
           // Handle an I/O error we got when using a cached peer.  These are
           // considered less serious, because the underlying socket may be
           // stale.
-          if (LOG.isDebugEnabled()) {
-            LOG.debug("Closed potentially stale remote peer " + peer, ioe);
-          }
+          LOG.debug("Closed potentially stale remote peer {}", peer, ioe);
         } else {
           // Handle an I/O error we got when using a newly created peer.
           LOG.warn("I/O error constructing remote block reader.", ioe);
@@ -808,9 +772,7 @@
     if (remainingCacheTries > 0) {
       Peer peer = clientContext.getPeerCache().get(datanode, true);
       if (peer != null) {
-        if (LOG.isTraceEnabled()) {
-          LOG.trace("nextDomainPeer: reusing existing peer " + peer);
-        }
+        LOG.trace("nextDomainPeer: reusing existing peer {}", peer);
         return new BlockReaderPeer(peer, true);
       }
     }
@@ -832,24 +794,18 @@
     if (remainingCacheTries > 0) {
       Peer peer = clientContext.getPeerCache().get(datanode, false);
       if (peer != null) {
-        if (LOG.isTraceEnabled()) {
-          LOG.trace("nextTcpPeer: reusing existing peer " + peer);
-        }
+        LOG.trace("nextTcpPeer: reusing existing peer {}", peer);
         return new BlockReaderPeer(peer, true);
       }
     }
     try {
       Peer peer = remotePeerFactory.newConnectedPeer(inetSocketAddress, token,
         datanode);
-      if (LOG.isTraceEnabled()) {
-        LOG.trace("nextTcpPeer: created newConnectedPeer " + peer);
-      }
+      LOG.trace("nextTcpPeer: created newConnectedPeer {}", peer);
       return new BlockReaderPeer(peer, false);
     } catch (IOException e) {
-      if (LOG.isTraceEnabled()) {
-        LOG.trace("nextTcpPeer: failed to create newConnectedPeer " +
-                  "connected to " + datanode);
-      }
+      LOG.trace("nextTcpPeer: failed to create newConnectedPeer connected to"
+          + "{}", datanode);
       throw e;
     }
   }
diff --git a/hadoop-hdfs-project/hadoop-hdfs-client/src/main/java/org/apache/hadoop/hdfs/BlockReaderLocal.java b/hadoop-hdfs-project/hadoop-hdfs-client/src/main/java/org/apache/hadoop/hdfs/BlockReaderLocal.java
index 2575f10..7d0822e 100644
--- a/hadoop-hdfs-project/hadoop-hdfs-client/src/main/java/org/apache/hadoop/hdfs/BlockReaderLocal.java
+++ b/hadoop-hdfs-project/hadoop-hdfs-client/src/main/java/org/apache/hadoop/hdfs/BlockReaderLocal.java
@@ -412,17 +412,10 @@
   public synchronized int read(ByteBuffer buf) throws IOException {
     boolean canSkipChecksum = createNoChecksumContext();
     try {
-      String traceString = null;
-      if (LOG.isTraceEnabled()) {
-        traceString = new StringBuilder().
-            append("read(").
-            append("buf.remaining=").append(buf.remaining()).
-            append(", block=").append(block).
-            append(", filename=").append(filename).
-            append(", canSkipChecksum=").append(canSkipChecksum).
-            append(")").toString();
-        LOG.info(traceString + ": starting");
-      }
+      String traceFormatStr = "read(buf.remaining={}, block={}, filename={}, "
+          + "canSkipChecksum={})";
+      LOG.trace(traceFormatStr + ": starting",
+          buf.remaining(), block, filename, canSkipChecksum);
       int nRead;
       try {
         if (canSkipChecksum && zeroReadaheadRequested) {
@@ -431,14 +424,12 @@
           nRead = readWithBounceBuffer(buf, canSkipChecksum);
         }
       } catch (IOException e) {
-        if (LOG.isTraceEnabled()) {
-          LOG.info(traceString + ": I/O error", e);
-        }
+        LOG.trace(traceFormatStr + ": I/O error",
+            buf.remaining(), block, filename, canSkipChecksum, e);
         throw e;
       }
-      if (LOG.isTraceEnabled()) {
-        LOG.info(traceString + ": returning " + nRead);
-      }
+      LOG.trace(traceFormatStr + ": returning {}",
+          buf.remaining(), block, filename, canSkipChecksum, nRead);
       return nRead;
     } finally {
       if (canSkipChecksum) releaseNoChecksumContext();
@@ -490,10 +481,8 @@
     }
     dataBuf.limit(dataBuf.position());
     dataBuf.position(Math.min(dataBuf.position(), slop));
-    if (LOG.isTraceEnabled()) {
-      LOG.trace("loaded " + dataBuf.remaining() + " bytes into bounce " +
-          "buffer from offset " + oldDataPos + " of " + block);
-    }
+    LOG.trace("loaded {} bytes into bounce buffer from offset {} of {}",
+        dataBuf.remaining(), oldDataPos, block);
     return dataBuf.limit() != maxReadaheadLength;
   }
 
@@ -565,18 +554,10 @@
     boolean canSkipChecksum = createNoChecksumContext();
     int nRead;
     try {
-      String traceString = null;
-      if (LOG.isTraceEnabled()) {
-        traceString = new StringBuilder().
-            append("read(arr.length=").append(arr.length).
-            append(", off=").append(off).
-            append(", len=").append(len).
-            append(", filename=").append(filename).
-            append(", block=").append(block).
-            append(", canSkipChecksum=").append(canSkipChecksum).
-            append(")").toString();
-        LOG.trace(traceString + ": starting");
-      }
+      final String traceFormatStr = "read(arr.length={}, off={}, len={}, "
+          + "filename={}, block={}, canSkipChecksum={})";
+      LOG.trace(traceFormatStr + ": starting",
+          arr.length, off, len, filename, block, canSkipChecksum);
       try {
         if (canSkipChecksum && zeroReadaheadRequested) {
           nRead = readWithoutBounceBuffer(arr, off, len);
@@ -584,14 +565,12 @@
           nRead = readWithBounceBuffer(arr, off, len, canSkipChecksum);
         }
       } catch (IOException e) {
-        if (LOG.isTraceEnabled()) {
-          LOG.trace(traceString + ": I/O error", e);
-        }
+        LOG.trace(traceFormatStr + ": I/O error",
+            arr.length, off, len, filename, block, canSkipChecksum, e);
         throw e;
       }
-      if (LOG.isTraceEnabled()) {
-        LOG.trace(traceString + ": returning " + nRead);
-      }
+      LOG.trace(traceFormatStr + ": returning {}",
+          arr.length, off, len, filename, block, canSkipChecksum, nRead);
     } finally {
       if (canSkipChecksum) releaseNoChecksumContext();
     }
@@ -634,11 +613,9 @@
       dataBuf.position(dataBuf.position() + discardedFromBuf);
       remaining -= discardedFromBuf;
     }
-    if (LOG.isTraceEnabled()) {
-      LOG.trace("skip(n=" + n + ", block=" + block + ", filename=" + 
-        filename + "): discarded " + discardedFromBuf + " bytes from " +
-        "dataBuf and advanced dataPos by " + remaining);
-    }
+    LOG.trace("skip(n={}, block={}, filename={}): discarded {} bytes from "
+            + "dataBuf and advanced dataPos by {}",
+        n, block, filename, discardedFromBuf, remaining);
     dataPos += remaining;
     return n;
   }
@@ -653,9 +630,7 @@
   public synchronized void close() throws IOException {
     if (closed) return;
     closed = true;
-    if (LOG.isTraceEnabled()) {
-      LOG.trace("close(filename=" + filename + ", block=" + block + ")");
-    }
+    LOG.trace("close(filename={}, block={})", filename, block);
     replica.unref();
     freeDataBufIfExists();
     freeChecksumBufIfExists();
@@ -705,11 +680,9 @@
         (opts.contains(ReadOption.SKIP_CHECKSUMS) == false);
     if (anchor) {
       if (!createNoChecksumContext()) {
-        if (LOG.isTraceEnabled()) {
-          LOG.trace("can't get an mmap for " + block + " of " + filename + 
-              " since SKIP_CHECKSUMS was not given, " +
-              "we aren't skipping checksums, and the block is not mlocked.");
-        }
+        LOG.trace("can't get an mmap for {} of {} since SKIP_CHECKSUMS was not "
+            + "given, we aren't skipping checksums, and the block is not "
+            + "mlocked.", block, filename);
         return null;
       }
     }
diff --git a/hadoop-hdfs-project/hadoop-hdfs-client/src/main/java/org/apache/hadoop/hdfs/BlockReaderLocalLegacy.java b/hadoop-hdfs-project/hadoop-hdfs-client/src/main/java/org/apache/hadoop/hdfs/BlockReaderLocalLegacy.java
index c3d7202..0afd4ba 100644
--- a/hadoop-hdfs-project/hadoop-hdfs-client/src/main/java/org/apache/hadoop/hdfs/BlockReaderLocalLegacy.java
+++ b/hadoop-hdfs-project/hadoop-hdfs-client/src/main/java/org/apache/hadoop/hdfs/BlockReaderLocalLegacy.java
@@ -221,11 +221,9 @@
       File blkfile = new File(pathinfo.getBlockPath());
       dataIn = new FileInputStream(blkfile);
 
-      if (LOG.isDebugEnabled()) {
-        LOG.debug("New BlockReaderLocalLegacy for file " + blkfile + " of size "
-            + blkfile.length() + " startOffset " + startOffset + " length "
-            + length + " short circuit checksum " + !skipChecksumCheck);
-      }
+      LOG.debug("New BlockReaderLocalLegacy for file {} of size {} startOffset "
+              + "{} length {} short circuit checksum {}",
+          blkfile, blkfile.length(), startOffset, length, !skipChecksumCheck);
 
       if (!skipChecksumCheck) {
         // get the metadata file
@@ -292,9 +290,7 @@
       // channel for the DataNode to notify the client that the path has been
       // invalidated.  Therefore, our only option is to skip caching.
       if (pathinfo != null && !storageType.isTransient()) {
-        if (LOG.isDebugEnabled()) {
-          LOG.debug("Cached location of block " + blk + " as " + pathinfo);
-        }
+        LOG.debug("Cached location of block {} as {}", blk, pathinfo);
         localDatanodeInfo.setBlockLocalPathInfo(blk, pathinfo);
       }
     } catch (IOException e) {
@@ -603,9 +599,7 @@
 
   @Override
   public synchronized int read(byte[] buf, int off, int len) throws IOException {
-    if (LOG.isTraceEnabled()) {
-      LOG.trace("read off " + off + " len " + len);
-    }
+    LOG.trace("read off {} len {}", off, len);
     if (!verifyChecksum) {
       return dataIn.read(buf, off, len);
     }
@@ -624,9 +618,7 @@
 
   @Override
   public synchronized long skip(long n) throws IOException {
-    if (LOG.isDebugEnabled()) {
-      LOG.debug("skip " + n);
-    }
+    LOG.debug("skip {}", n);
     if (n <= 0) {
       return 0;
     }
diff --git a/hadoop-hdfs-project/hadoop-hdfs-client/src/main/java/org/apache/hadoop/hdfs/DFSClient.java b/hadoop-hdfs-project/hadoop-hdfs-client/src/main/java/org/apache/hadoop/hdfs/DFSClient.java
index be346a4..183602a 100644
--- a/hadoop-hdfs-project/hadoop-hdfs-client/src/main/java/org/apache/hadoop/hdfs/DFSClient.java
+++ b/hadoop-hdfs-project/hadoop-hdfs-client/src/main/java/org/apache/hadoop/hdfs/DFSClient.java
@@ -425,9 +425,7 @@
     }
     final int idx = r.nextInt(localInterfaceAddrs.length);
     final SocketAddress addr = localInterfaceAddrs[idx];
-    if (LOG.isDebugEnabled()) {
-      LOG.debug("Using local interface " + addr);
-    }
+    LOG.debug("Using local interface {}", addr);
     return addr;
   }
 
@@ -1232,9 +1230,7 @@
                              InetSocketAddress[] favoredNodes) throws IOException {
     checkOpen();
     final FsPermission masked = applyUMask(permission);
-    if(LOG.isDebugEnabled()) {
-      LOG.debug(src + ": masked=" + masked);
-    }
+    LOG.debug("{}: masked={}", src, masked);
     final DFSOutputStream result = DFSOutputStream.newStreamForCreate(this,
         src, masked, flag, createParent, replication, blockSize, progress,
         buffersize, dfsClientConf.createChecksum(checksumOpt),
@@ -1831,10 +1827,8 @@
               smallBufferSize));
           in = new DataInputStream(pair.in);
 
-          if (LOG.isDebugEnabled()) {
-            LOG.debug("write to " + datanodes[j] + ": "
-                + Op.BLOCK_CHECKSUM + ", block=" + block);
-          }
+          LOG.debug("write to {}: {}, block={}",
+              datanodes[j], Op.BLOCK_CHECKSUM, block);
           // get block MD5
           new Sender(out).blockChecksum(block, lb.getBlockToken());
 
@@ -1898,12 +1892,10 @@
           }
         } catch (InvalidBlockTokenException ibte) {
           if (i > lastRetriedIndex) {
-            if (LOG.isDebugEnabled()) {
-              LOG.debug("Got access token error in response to OP_BLOCK_CHECKSUM "
-                  + "for file " + src + " for block " + block
-                  + " from datanode " + datanodes[j]
-                  + ". Will retry the block once.");
-            }
+            LOG.debug("Got access token error in response to OP_BLOCK_CHECKSUM "
+                  + "for file {} for block {} from datanode {}. Will retry the "
+                  + "block once.",
+                src, block, datanodes[j]);
             lastRetriedIndex = i;
             done = true; // actually it's not done; but we'll retry
             i--; // repeat at i-th block
@@ -1957,9 +1949,7 @@
     try {
       sock = socketFactory.createSocket();
       String dnAddr = dn.getXferAddr(getConf().isConnectToDnViaHostname());
-      if (LOG.isDebugEnabled()) {
-        LOG.debug("Connecting to datanode " + dnAddr);
-      }
+      LOG.debug("Connecting to datanode {}", dnAddr);
       NetUtils.connect(sock, NetUtils.createSocketAddr(dnAddr), timeout);
       sock.setSoTimeout(timeout);
   
@@ -2579,9 +2569,7 @@
       absPermission = applyUMask(null);
     } 
 
-    if(LOG.isDebugEnabled()) {
-      LOG.debug(src + ": masked=" + absPermission);
-    }
+    LOG.debug("{}: masked={}", src, absPermission);
     TraceScope scope = tracer.newScope("mkdir");
     try {
       return namenode.mkdirs(src, absPermission, createParent);
@@ -3103,9 +3091,7 @@
       }
     });
     HEDGED_READ_THREAD_POOL.allowCoreThreadTimeOut(true);
-    if (LOG.isDebugEnabled()) {
-      LOG.debug("Using hedged reads; pool threads=" + num);
-    }
+    LOG.debug("Using hedged reads; pool threads={}", num);
   }
 
   /**
diff --git a/hadoop-hdfs-project/hadoop-hdfs-client/src/main/java/org/apache/hadoop/hdfs/DFSInputStream.java b/hadoop-hdfs-project/hadoop-hdfs-client/src/main/java/org/apache/hadoop/hdfs/DFSInputStream.java
index d9f409c..ab5faae 100644
--- a/hadoop-hdfs-project/hadoop-hdfs-client/src/main/java/org/apache/hadoop/hdfs/DFSInputStream.java
+++ b/hadoop-hdfs-project/hadoop-hdfs-client/src/main/java/org/apache/hadoop/hdfs/DFSInputStream.java
@@ -314,9 +314,7 @@
     if (locatedBlocks == null || refresh) {
       newInfo = dfsClient.getLocatedBlocks(src, 0);
     }
-    if (DFSClient.LOG.isDebugEnabled()) {
-      DFSClient.LOG.debug("newInfo = " + newInfo);
-    }
+    DFSClient.LOG.debug("newInfo = {}", newInfo);
     if (newInfo == null) {
       throw new IOException("Cannot open filename " + src);
     }
@@ -382,10 +380,8 @@
           replicaNotFoundCount--;
         }
         
-        if (DFSClient.LOG.isDebugEnabled()) {
-          DFSClient.LOG.debug("Failed to getReplicaVisibleLength from datanode "
-              + datanode + " for block " + locatedblock.getBlock(), ioe);
-        }
+        DFSClient.LOG.debug("Failed to getReplicaVisibleLength from datanode {}"
+              + " for block {}", datanode, locatedblock.getBlock(), ioe);
       } finally {
         if (cdp != null) {
           RPC.stopProxy(cdp);
@@ -1064,9 +1060,7 @@
     }
     final String dnAddr =
         chosenNode.getXferAddr(dfsClient.getConf().isConnectToDnViaHostname());
-    if (DFSClient.LOG.isDebugEnabled()) {
-      DFSClient.LOG.debug("Connecting to datanode " + dnAddr);
-    }
+    DFSClient.LOG.debug("Connecting to datanode {}", dnAddr);
     InetSocketAddress targetAddr = NetUtils.createSocketAddr(dnAddr);
     return new DNAddrPair(chosenNode, targetAddr, storageType);
   }
@@ -1281,11 +1275,8 @@
             future.get();
             return;
           }
-          if (DFSClient.LOG.isDebugEnabled()) {
-            DFSClient.LOG.debug("Waited " + conf.getHedgedReadThresholdMillis()
-                + "ms to read from " + chosenNode.info
-                + "; spawning hedged read");
-          }
+          DFSClient.LOG.debug("Waited {}ms to read from {}; spawning hedged "
+              + "read", conf.getHedgedReadThresholdMillis(), chosenNode.info);
           // Ignore this node on next go around.
           ignored.add(chosenNode.info);
           dfsClient.getHedgedReadMetrics().incHedgedReadOps();
@@ -1312,10 +1303,8 @@
               .submit(getFromDataNodeCallable);
           futures.add(oneMoreRequest);
         } catch (IOException ioe) {
-          if (DFSClient.LOG.isDebugEnabled()) {
-            DFSClient.LOG.debug("Failed getting node for hedged read: "
-                + ioe.getMessage());
-          }
+          DFSClient.LOG.debug("Failed getting node for hedged read: {}",
+              ioe.getMessage());
         }
         // if not succeeded. Submit callables for each datanode in a loop, wait
         // for a fixed interval and get the result from the fastest one.
@@ -1571,11 +1560,8 @@
             throw new IOException(errMsg);
           }
         } catch (IOException e) {//make following read to retry
-          if(DFSClient.LOG.isDebugEnabled()) {
-            DFSClient.LOG.debug("Exception while seek to " + targetPos
-                + " from " + getCurrentBlock() + " of " + src + " from "
-                + currentNode, e);
-          }
+          DFSClient.LOG.debug("Exception while seek to {} from {} of {} from "
+              + "{}", targetPos, getCurrentBlock(), src, currentNode, e);
         }
       }
     }
@@ -1791,20 +1777,16 @@
     } else {
       length63 = 1 + curEnd - curPos;
       if (length63 <= 0) {
-        if (DFSClient.LOG.isDebugEnabled()) {
-          DFSClient.LOG.debug("Unable to perform a zero-copy read from offset " +
-            curPos + " of " + src + "; " + length63 + " bytes left in block.  " +
-            "blockPos=" + blockPos + "; curPos=" + curPos +
-            "; curEnd=" + curEnd);
-        }
+        DFSClient.LOG.debug("Unable to perform a zero-copy read from offset {}"
+                + " of {}; {} bytes left in block. blockPos={}; curPos={};"
+                + "curEnd={}",
+            curPos, src, length63, blockPos, curPos, curEnd);
         return null;
       }
-      if (DFSClient.LOG.isDebugEnabled()) {
-        DFSClient.LOG.debug("Reducing read length from " + maxLength +
-            " to " + length63 + " to avoid going more than one byte " +
-            "past the end of the block.  blockPos=" + blockPos +
-            "; curPos=" + curPos + "; curEnd=" + curEnd);
-      }
+      DFSClient.LOG.debug("Reducing read length from {} to {} to avoid going "
+              + "more than one byte past the end of the block.  blockPos={}; "
+              +" curPos={}; curEnd={}",
+          maxLength, length63, blockPos, curPos, curEnd);
     }
     // Make sure that don't go beyond 31-bit offsets in the MappedByteBuffer.
     int length;
@@ -1818,28 +1800,20 @@
         // So we can't mmap the parts of the block higher than the 2 GB offset.
         // FIXME: we could work around this with multiple memory maps.
         // See HDFS-5101.
-        if (DFSClient.LOG.isDebugEnabled()) {
-          DFSClient.LOG.debug("Unable to perform a zero-copy read from offset " +
-            curPos + " of " + src + "; 31-bit MappedByteBuffer limit " +
-            "exceeded.  blockPos=" + blockPos + ", curEnd=" + curEnd);
-        }
+        DFSClient.LOG.debug("Unable to perform a zero-copy read from offset {} "
+            + " of {}; 31-bit MappedByteBuffer limit exceeded.  blockPos={}, "
+            + "curEnd={}", curPos, src, blockPos, curEnd);
         return null;
       }
       length = (int)length31;
-      if (DFSClient.LOG.isDebugEnabled()) {
-        DFSClient.LOG.debug("Reducing read length from " + maxLength +
-            " to " + length + " to avoid 31-bit limit.  " +
-            "blockPos=" + blockPos + "; curPos=" + curPos +
-            "; curEnd=" + curEnd);
-      }
+      DFSClient.LOG.debug("Reducing read length from {} to {} to avoid 31-bit "
+          + "limit.  blockPos={}; curPos={}; curEnd={}",
+          maxLength, length, blockPos, curPos, curEnd);
     }
     final ClientMmap clientMmap = blockReader.getClientMmap(opts);
     if (clientMmap == null) {
-      if (DFSClient.LOG.isDebugEnabled()) {
-        DFSClient.LOG.debug("unable to perform a zero-copy read from offset " +
-          curPos + " of " + src + "; BlockReader#getClientMmap returned " +
-          "null.");
-      }
+      DFSClient.LOG.debug("unable to perform a zero-copy read from offset {} of"
+          + " {}; BlockReader#getClientMmap returned null.", curPos, src);
       return null;
     }
     boolean success = false;
@@ -1853,11 +1827,8 @@
       synchronized (infoLock) {
         readStatistics.addZeroCopyBytes(length);
       }
-      if (DFSClient.LOG.isDebugEnabled()) {
-        DFSClient.LOG.debug("readZeroCopy read " + length + 
-            " bytes from offset " + curPos + " via the zero-copy read " +
-            "path.  blockEnd = " + blockEnd);
-      }
+      DFSClient.LOG.debug("readZeroCopy read {} bytes from offset {} via the "
+          + "zero-copy read path.  blockEnd = {}", length, curPos, blockEnd);
       success = true;
     } finally {
       if (!success) {
diff --git a/hadoop-hdfs-project/hadoop-hdfs-client/src/main/java/org/apache/hadoop/hdfs/DFSOutputStream.java b/hadoop-hdfs-project/hadoop-hdfs-client/src/main/java/org/apache/hadoop/hdfs/DFSOutputStream.java
index 78eaa6c..6039177 100755
--- a/hadoop-hdfs-project/hadoop-hdfs-client/src/main/java/org/apache/hadoop/hdfs/DFSOutputStream.java
+++ b/hadoop-hdfs-project/hadoop-hdfs-client/src/main/java/org/apache/hadoop/hdfs/DFSOutputStream.java
@@ -191,9 +191,9 @@
     this.fileEncryptionInfo = stat.getFileEncryptionInfo();
     this.cachingStrategy = new AtomicReference<CachingStrategy>(
         dfsClient.getDefaultWriteCachingStrategy());
-    if ((progress != null) && DFSClient.LOG.isDebugEnabled()) {
-      DFSClient.LOG.debug(
-          "Set non-null progress callback on DFSOutputStream " + src);
+    if (progress != null) {
+      DFSClient.LOG.debug("Set non-null progress callback on DFSOutputStream "
+          +"{}", src);
     }
 
     this.bytesPerChecksum = checksum.getBytesPerChecksum();
@@ -378,12 +378,9 @@
     final int chunkSize = csize + getChecksumSize();
     chunksPerPacket = Math.max(bodySize/chunkSize, 1);
     packetSize = chunkSize*chunksPerPacket;
-    if (DFSClient.LOG.isDebugEnabled()) {
-      DFSClient.LOG.debug("computePacketChunkSize: src=" + src +
-                ", chunkSize=" + chunkSize +
-                ", chunksPerPacket=" + chunksPerPacket +
-                ", packetSize=" + packetSize);
-    }
+    DFSClient.LOG.debug("computePacketChunkSize: src={}, chunkSize={}, "
+            + "chunksPerPacket={}, packetSize={}",
+        src, chunkSize, chunksPerPacket, packetSize);
   }
 
   protected TraceScope createWriteTraceScope() {
@@ -410,14 +407,10 @@
     if (currentPacket == null) {
       currentPacket = createPacket(packetSize, chunksPerPacket, getStreamer()
           .getBytesCurBlock(), getStreamer().getAndIncCurrentSeqno(), false);
-      if (LOG.isDebugEnabled()) {
-        LOG.debug("WriteChunk allocating new packet seqno=" +
-            currentPacket.getSeqno() +
-            ", src=" + src +
-            ", packetSize=" + packetSize +
-            ", chunksPerPacket=" + chunksPerPacket +
-            ", bytesCurBlock=" + getStreamer().getBytesCurBlock() + ", " + this);
-      }
+      DFSClient.LOG.debug("WriteChunk allocating new packet seqno={},"
+              + " src={}, packetSize={}, chunksPerPacket={}, bytesCurBlock={}",
+          currentPacket.getSeqno(), src, packetSize, chunksPerPacket,
+          getStreamer().getBytesCurBlock() + ", " + this);
     }
 
     currentPacket.writeChecksum(checksum, ckoff, cklen);
@@ -570,12 +563,9 @@
         int numKept = flushBuffer(!endBlock, true);
         // bytesCurBlock potentially incremented if there was buffered data
 
-        if (DFSClient.LOG.isDebugEnabled()) {
-          DFSClient.LOG.debug("DFSClient flush(): "
-              + " bytesCurBlock=" + getStreamer().getBytesCurBlock()
-              + " lastFlushOffset=" + lastFlushOffset
-              + " createNewBlock=" + endBlock);
-        }
+        DFSClient.LOG.debug("DFSClient flush():  bytesCurBlock={}, "
+                + "lastFlushOffset={}, createNewBlock={}",
+            getStreamer().getBytesCurBlock(), lastFlushOffset, endBlock);
         // Flush only if we haven't already flushed till this offset.
         if (lastFlushOffset != getStreamer().getBytesCurBlock()) {
           assert getStreamer().getBytesCurBlock() > lastFlushOffset;
diff --git a/hadoop-hdfs-project/hadoop-hdfs-client/src/main/java/org/apache/hadoop/hdfs/DFSUtilClient.java b/hadoop-hdfs-project/hadoop-hdfs-client/src/main/java/org/apache/hadoop/hdfs/DFSUtilClient.java
index f96ae65..d1829d6 100644
--- a/hadoop-hdfs-project/hadoop-hdfs-client/src/main/java/org/apache/hadoop/hdfs/DFSUtilClient.java
+++ b/hadoop-hdfs-project/hadoop-hdfs-client/src/main/java/org/apache/hadoop/hdfs/DFSUtilClient.java
@@ -463,19 +463,13 @@
     InetAddress addr = targetAddr.getAddress();
     Boolean cached = localAddrMap.get(addr.getHostAddress());
     if (cached != null) {
-      if (LOG.isTraceEnabled()) {
-        LOG.trace("Address " + targetAddr +
-            (cached ? " is local" : " is not local"));
-      }
+      LOG.trace("Address {} is {} local", targetAddr, (cached ? "" : "not"));
       return cached;
     }
 
     boolean local = NetUtils.isLocalAddress(addr);
 
-    if (LOG.isTraceEnabled()) {
-      LOG.trace("Address " + targetAddr +
-          (local ? " is local" : " is not local"));
-    }
+    LOG.trace("Address {} is {} local", targetAddr, (local ? "" : "not"));
     localAddrMap.put(addr.getHostAddress(), local);
     return local;
   }
diff --git a/hadoop-hdfs-project/hadoop-hdfs-client/src/main/java/org/apache/hadoop/hdfs/DataStreamer.java b/hadoop-hdfs-project/hadoop-hdfs-client/src/main/java/org/apache/hadoop/hdfs/DataStreamer.java
index af7a61e..683d98d 100644
--- a/hadoop-hdfs-project/hadoop-hdfs-client/src/main/java/org/apache/hadoop/hdfs/DataStreamer.java
+++ b/hadoop-hdfs-project/hadoop-hdfs-client/src/main/java/org/apache/hadoop/hdfs/DataStreamer.java
@@ -124,18 +124,14 @@
       final int length, final DFSClient client) throws IOException {
     final DfsClientConf conf = client.getConf();
     final String dnAddr = first.getXferAddr(conf.isConnectToDnViaHostname());
-    if (LOG.isDebugEnabled()) {
-      LOG.debug("Connecting to datanode " + dnAddr);
-    }
+    LOG.debug("Connecting to datanode {}", dnAddr);
     final InetSocketAddress isa = NetUtils.createSocketAddr(dnAddr);
     final Socket sock = client.socketFactory.createSocket();
     final int timeout = client.getDatanodeReadTimeout(length);
     NetUtils.connect(sock, isa, client.getRandomLocalInterfaceAddr(), conf.getSocketTimeout());
     sock.setSoTimeout(timeout);
     sock.setSendBufferSize(HdfsConstants.DEFAULT_DATA_SOCKET_SIZE);
-    if (LOG.isDebugEnabled()) {
-      LOG.debug("Send buf size " + sock.getSendBufferSize());
-    }
+    LOG.debug("Send buf size {}", sock.getSendBufferSize());
     return sock;
   }
 
@@ -508,9 +504,7 @@
   }
 
   protected void endBlock() {
-    if(LOG.isDebugEnabled()) {
-      LOG.debug("Closing old block " + block);
-    }
+    LOG.debug("Closing old block " + block);
     this.setName("DataStreamer for file " + src);
     closeResponder();
     closeStream();
@@ -594,9 +588,11 @@
           LOG.debug("stage=" + stage + ", " + this);
         }
         if (stage == BlockConstructionStage.PIPELINE_SETUP_CREATE) {
+          LOG.debug("Allocating new block");
           setPipeline(nextBlockOutputStream());
           initDataStreaming();
         } else if (stage == BlockConstructionStage.PIPELINE_SETUP_APPEND) {
+          LOG.debug("Append to block {}", block);
           setupPipelineForAppendOrRecovery();
           if (streamerClosed) {
             continue;
@@ -645,9 +641,7 @@
           }
         }
 
-        if (LOG.isDebugEnabled()) {
-          LOG.debug(this + " sending " + one);
-        }
+        LOG.debug(this + " sending " + one);
 
         // write out data to remote datanode
         TraceScope writeScope = dfsClient.getTracer().
@@ -758,9 +752,7 @@
     TraceScope scope = dfsClient.getTracer().
         newScope("waitForAckedSeqno");
     try {
-      if (LOG.isDebugEnabled()) {
-        LOG.debug("Waiting for ack for: " + seqno);
-      }
+      LOG.debug("Waiting for ack for: {}", seqno);
       long begin = Time.monotonicNow();
       try {
         synchronized (dataQueue) {
@@ -975,8 +967,8 @@
             LOG.warn("Slow ReadProcessor read fields took " + duration
                 + "ms (threshold=" + dfsclientSlowLogThresholdMs + "ms); ack: "
                 + ack + ", targets: " + Arrays.asList(targets));
-          } else if (LOG.isDebugEnabled()) {
-            LOG.debug("DFSClient " + ack);
+          } else {
+            LOG.debug("DFSClient {}", ack);
           }
 
           long seqno = ack.getSeqno();
@@ -1201,9 +1193,7 @@
   }
 
   private void addDatanode2ExistingPipeline() throws IOException {
-    if (DataTransferProtocol.LOG.isDebugEnabled()) {
-      DataTransferProtocol.LOG.debug("lastAckedSeqno = " + lastAckedSeqno);
-    }
+    DataTransferProtocol.LOG.debug("lastAckedSeqno = {}", lastAckedSeqno);
       /*
        * Is data transfer necessary?  We have the following cases.
        *
@@ -1679,10 +1669,8 @@
           new HashSet<String>(Arrays.asList(favoredNodes));
       for (int i = 0; i < nodes.length; i++) {
         pinnings[i] = favoredSet.remove(nodes[i].getXferAddrWithHostname());
-        if (LOG.isDebugEnabled()) {
-          LOG.debug(nodes[i].getXferAddrWithHostname() +
-              " was chosen by name node (favored=" + pinnings[i] + ").");
-        }
+        LOG.debug("{} was chosen by name node (favored={}).",
+            nodes[i].getXferAddrWithHostname(), pinnings[i]);
       }
       if (shouldLog && !favoredSet.isEmpty()) {
         // There is one or more favored nodes that were not allocated.
@@ -1785,9 +1773,7 @@
       packet.addTraceParent(Tracer.getCurrentSpanId());
       dataQueue.addLast(packet);
       lastQueuedSeqno = packet.getSeqno();
-      if (LOG.isDebugEnabled()) {
-        LOG.debug("Queued " + packet + ", " + this);
-      }
+      LOG.debug("Queued " + packet + ", " + this);
       dataQueue.notifyAll();
     }
   }
diff --git a/hadoop-hdfs-project/hadoop-hdfs-client/src/main/java/org/apache/hadoop/hdfs/RemoteBlockReader.java b/hadoop-hdfs-project/hadoop-hdfs-client/src/main/java/org/apache/hadoop/hdfs/RemoteBlockReader.java
index 5255862..81ae829 100644
--- a/hadoop-hdfs-project/hadoop-hdfs-client/src/main/java/org/apache/hadoop/hdfs/RemoteBlockReader.java
+++ b/hadoop-hdfs-project/hadoop-hdfs-client/src/main/java/org/apache/hadoop/hdfs/RemoteBlockReader.java
@@ -250,9 +250,7 @@
       PacketHeader header = new PacketHeader();
       header.readFields(in);
 
-      if (LOG.isDebugEnabled()) {
-        LOG.debug("DFSClient readChunk got header " + header);
-      }
+      LOG.debug("DFSClient readChunk got header {}", header);
 
       // Sanity check the lengths
       if (!header.sanityCheck(lastSeqNo)) {
diff --git a/hadoop-hdfs-project/hadoop-hdfs-client/src/main/java/org/apache/hadoop/hdfs/RemoteBlockReader2.java b/hadoop-hdfs-project/hadoop-hdfs-client/src/main/java/org/apache/hadoop/hdfs/RemoteBlockReader2.java
index 10f310d..942c37c 100644
--- a/hadoop-hdfs-project/hadoop-hdfs-client/src/main/java/org/apache/hadoop/hdfs/RemoteBlockReader2.java
+++ b/hadoop-hdfs-project/hadoop-hdfs-client/src/main/java/org/apache/hadoop/hdfs/RemoteBlockReader2.java
@@ -135,14 +135,9 @@
   @Override
   public synchronized int read(byte[] buf, int off, int len) 
                                throws IOException {
-
-    UUID randomId = null;
-    if (LOG.isTraceEnabled()) {
-      randomId = UUID.randomUUID();
-      LOG.trace(String.format("Starting read #%s file %s from datanode %s",
-        randomId.toString(), this.filename,
-        this.datanodeID.getHostName()));
-    }
+    UUID randomId = (LOG.isTraceEnabled() ? UUID.randomUUID() : null);
+    LOG.trace("Starting read #{} file {} from datanode {}",
+        randomId, filename, datanodeID.getHostName());
 
     if (curDataSlice == null || curDataSlice.remaining() == 0 && bytesNeededToFinish > 0) {
       TraceScope scope = tracer.newScope(
@@ -154,9 +149,7 @@
       }
     }
 
-    if (LOG.isTraceEnabled()) {
-      LOG.trace(String.format("Finishing read #" + randomId));
-    }
+    LOG.trace("Finishing read #{}", randomId);
 
     if (curDataSlice.remaining() == 0) {
       // we're at EOF now
@@ -203,9 +196,7 @@
     curDataSlice = packetReceiver.getDataSlice();
     assert curDataSlice.capacity() == curHeader.getDataLen();
     
-    if (LOG.isTraceEnabled()) {
-      LOG.trace("DFSClient readNextPacket got header " + curHeader);
-    }
+    LOG.trace("DFSClient readNextPacket got header {}", curHeader);
 
     // Sanity check the lengths
     if (!curHeader.sanityCheck(lastSeqNo)) {
@@ -276,10 +267,8 @@
   }
 
   private void readTrailingEmptyPacket() throws IOException {
-    if (LOG.isTraceEnabled()) {
-      LOG.trace("Reading empty packet at end of read");
-    }
-    
+    LOG.trace("Reading empty packet at end of read");
+
     packetReceiver.receiveNextPacket(in);
 
     PacketHeader trailer = packetReceiver.getHeader();
diff --git a/hadoop-hdfs-project/hadoop-hdfs-client/src/main/java/org/apache/hadoop/hdfs/client/HdfsClientConfigKeys.java b/hadoop-hdfs-project/hadoop-hdfs-client/src/main/java/org/apache/hadoop/hdfs/client/HdfsClientConfigKeys.java
index 11be51f..416384e 100644
--- a/hadoop-hdfs-project/hadoop-hdfs-client/src/main/java/org/apache/hadoop/hdfs/client/HdfsClientConfigKeys.java
+++ b/hadoop-hdfs-project/hadoop-hdfs-client/src/main/java/org/apache/hadoop/hdfs/client/HdfsClientConfigKeys.java
@@ -149,8 +149,6 @@
       "dfs.client.test.drop.namenode.response.number";
   int     DFS_CLIENT_TEST_DROP_NAMENODE_RESPONSE_NUM_DEFAULT = 0;
   String  DFS_CLIENT_LOCAL_INTERFACES = "dfs.client.local.interfaces";
-  // HDFS client HTrace configuration.
-  String  DFS_CLIENT_HTRACE_PREFIX = "dfs.client.htrace.";
   String  DFS_USER_HOME_DIR_PREFIX_KEY = "dfs.user.home.dir.prefix";
   String  DFS_USER_HOME_DIR_PREFIX_DEFAULT = "/user";
 
diff --git a/hadoop-hdfs-project/hadoop-hdfs-client/src/main/java/org/apache/hadoop/hdfs/client/impl/LeaseRenewer.java b/hadoop-hdfs-project/hadoop-hdfs-client/src/main/java/org/apache/hadoop/hdfs/client/impl/LeaseRenewer.java
index c3d2cfc..8457d65 100644
--- a/hadoop-hdfs-project/hadoop-hdfs-client/src/main/java/org/apache/hadoop/hdfs/client/impl/LeaseRenewer.java
+++ b/hadoop-hdfs-project/hadoop-hdfs-client/src/main/java/org/apache/hadoop/hdfs/client/impl/LeaseRenewer.java
@@ -308,10 +308,7 @@
               }
               LeaseRenewer.this.run(id);
             } catch(InterruptedException e) {
-              if (LOG.isDebugEnabled()) {
-                LOG.debug(LeaseRenewer.this.getClass().getSimpleName()
-                    + " is interrupted.", e);
-              }
+              LOG.debug("LeaseRenewer is interrupted.", e);
             } finally {
               synchronized(LeaseRenewer.this) {
                 Factory.INSTANCE.remove(LeaseRenewer.this);
@@ -399,9 +396,7 @@
     }
 
     if (daemonCopy != null) {
-      if(LOG.isDebugEnabled()) {
-        LOG.debug("Wait for lease checker to terminate");
-      }
+      LOG.debug("Wait for lease checker to terminate");
       daemonCopy.join();
     }
   }
@@ -424,16 +419,11 @@
       //skip if current client name is the same as the previous name.
       if (!c.getClientName().equals(previousName)) {
         if (!c.renewLease()) {
-          if (LOG.isDebugEnabled()) {
-            LOG.debug("Did not renew lease for client " +
-                c);
-          }
+          LOG.debug("Did not renew lease for client {}", c);
           continue;
         }
         previousName = c.getClientName();
-        if (LOG.isDebugEnabled()) {
-          LOG.debug("Lease renewed for client " + previousName);
-        }
+        LOG.debug("Lease renewed for client {}", previousName);
       }
     }
   }
diff --git a/hadoop-hdfs-project/hadoop-hdfs-client/src/main/java/org/apache/hadoop/hdfs/protocol/datatransfer/PacketReceiver.java b/hadoop-hdfs-project/hadoop-hdfs-client/src/main/java/org/apache/hadoop/hdfs/protocol/datatransfer/PacketReceiver.java
index c4093b1..e6709d9 100644
--- a/hadoop-hdfs-project/hadoop-hdfs-client/src/main/java/org/apache/hadoop/hdfs/protocol/datatransfer/PacketReceiver.java
+++ b/hadoop-hdfs-project/hadoop-hdfs-client/src/main/java/org/apache/hadoop/hdfs/protocol/datatransfer/PacketReceiver.java
@@ -147,11 +147,9 @@
       throw new IOException("Invalid header length " + headerLen);
     }
     
-    if (LOG.isTraceEnabled()) {
-      LOG.trace("readNextPacket: dataPlusChecksumLen = " + dataPlusChecksumLen +
-          " headerLen = " + headerLen);
-    }
-    
+    LOG.trace("readNextPacket: dataPlusChecksumLen={}, headerLen={}",
+        dataPlusChecksumLen, headerLen);
+
     // Sanity check the buffer size so we don't allocate too much memory
     // and OOME.
     int totalLen = payloadLen + headerLen;
diff --git a/hadoop-hdfs-project/hadoop-hdfs-client/src/main/java/org/apache/hadoop/hdfs/protocol/datatransfer/Sender.java b/hadoop-hdfs-project/hadoop-hdfs-client/src/main/java/org/apache/hadoop/hdfs/protocol/datatransfer/Sender.java
index e856211..d2bc348 100644
--- a/hadoop-hdfs-project/hadoop-hdfs-client/src/main/java/org/apache/hadoop/hdfs/protocol/datatransfer/Sender.java
+++ b/hadoop-hdfs-project/hadoop-hdfs-client/src/main/java/org/apache/hadoop/hdfs/protocol/datatransfer/Sender.java
@@ -73,10 +73,8 @@
 
   private static void send(final DataOutputStream out, final Op opcode,
       final Message proto) throws IOException {
-    if (LOG.isTraceEnabled()) {
-      LOG.trace("Sending DataTransferOp " + proto.getClass().getSimpleName()
-          + ": " + proto);
-    }
+    LOG.trace("Sending DataTransferOp {}: {}",
+        proto.getClass().getSimpleName(), proto);
     op(out, opcode);
     proto.writeDelimitedTo(out);
     out.flush();
diff --git a/hadoop-hdfs-project/hadoop-hdfs-client/src/main/java/org/apache/hadoop/hdfs/protocol/datatransfer/sasl/DataTransferSaslUtil.java b/hadoop-hdfs-project/hadoop-hdfs-client/src/main/java/org/apache/hadoop/hdfs/protocol/datatransfer/sasl/DataTransferSaslUtil.java
index 256caff..006d304 100644
--- a/hadoop-hdfs-project/hadoop-hdfs-client/src/main/java/org/apache/hadoop/hdfs/protocol/datatransfer/sasl/DataTransferSaslUtil.java
+++ b/hadoop-hdfs-project/hadoop-hdfs-client/src/main/java/org/apache/hadoop/hdfs/protocol/datatransfer/sasl/DataTransferSaslUtil.java
@@ -332,11 +332,9 @@
   public static IOStreamPair createStreamPair(Configuration conf,
       CipherOption cipherOption, OutputStream out, InputStream in, 
       boolean isServer) throws IOException {
-    if (LOG.isDebugEnabled()) {
-      LOG.debug("Creating IOStreamPair of CryptoInputStream and " +
-          "CryptoOutputStream.");
-    }
-    CryptoCodec codec = CryptoCodec.getInstance(conf, 
+    LOG.debug("Creating IOStreamPair of CryptoInputStream and "
+        + "CryptoOutputStream.");
+    CryptoCodec codec = CryptoCodec.getInstance(conf,
         cipherOption.getCipherSuite());
     byte[] inKey = cipherOption.getInKey();
     byte[] inIv = cipherOption.getInIv();
diff --git a/hadoop-hdfs-project/hadoop-hdfs-client/src/main/java/org/apache/hadoop/hdfs/protocolPB/ClientDatanodeProtocolTranslatorPB.java b/hadoop-hdfs-project/hadoop-hdfs-client/src/main/java/org/apache/hadoop/hdfs/protocolPB/ClientDatanodeProtocolTranslatorPB.java
index f764275..24e1dd2 100644
--- a/hadoop-hdfs-project/hadoop-hdfs-client/src/main/java/org/apache/hadoop/hdfs/protocolPB/ClientDatanodeProtocolTranslatorPB.java
+++ b/hadoop-hdfs-project/hadoop-hdfs-client/src/main/java/org/apache/hadoop/hdfs/protocolPB/ClientDatanodeProtocolTranslatorPB.java
@@ -130,9 +130,7 @@
       throws IOException {
     final String dnAddr = datanodeid.getIpcAddr(connectToDnViaHostname);
     InetSocketAddress addr = NetUtils.createSocketAddr(dnAddr);
-    if (LOG.isDebugEnabled()) {
-      LOG.debug("Connecting to datanode " + dnAddr + " addr=" + addr);
-    }
+    LOG.debug("Connecting to datanode {} addr={}", dnAddr, addr);
     rpcProxy = createClientDatanodeProtocolProxy(addr,
         UserGroupInformation.getCurrentUser(), conf,
         NetUtils.getDefaultSocketFactory(conf), socketTimeout);
@@ -143,10 +141,8 @@
       boolean connectToDnViaHostname, LocatedBlock locatedBlock) throws IOException {
     final String dnAddr = datanodeid.getIpcAddr(connectToDnViaHostname);
     InetSocketAddress addr = NetUtils.createSocketAddr(dnAddr);
-    if (LOG.isDebugEnabled()) {
-      LOG.debug("Connecting to datanode " + dnAddr + " addr=" + addr);
-    }
-    
+    LOG.debug("Connecting to datanode {} addr={}", dnAddr, addr);
+
     // Since we're creating a new UserGroupInformation here, we know that no
     // future RPC proxies will be able to re-use the same connection. And
     // usages of this proxy tend to be one-off calls.
diff --git a/hadoop-hdfs-project/hadoop-hdfs-client/src/main/java/org/apache/hadoop/hdfs/shortcircuit/DfsClientShmManager.java b/hadoop-hdfs-project/hadoop-hdfs-client/src/main/java/org/apache/hadoop/hdfs/shortcircuit/DfsClientShmManager.java
index f70398a..4ffc108 100644
--- a/hadoop-hdfs-project/hadoop-hdfs-client/src/main/java/org/apache/hadoop/hdfs/shortcircuit/DfsClientShmManager.java
+++ b/hadoop-hdfs-project/hadoop-hdfs-client/src/main/java/org/apache/hadoop/hdfs/shortcircuit/DfsClientShmManager.java
@@ -129,18 +129,13 @@
       ShmId shmId = shm.getShmId();
       Slot slot = shm.allocAndRegisterSlot(blockId);
       if (shm.isFull()) {
-        if (LOG.isTraceEnabled()) {
-          LOG.trace(this + ": pulled the last slot " + slot.getSlotIdx() +
-              " out of " + shm);
-        }
+        LOG.trace("{}: pulled the last slot {} out of {}",
+            this, slot.getSlotIdx(), shm);
         DfsClientShm removedShm = notFull.remove(shmId);
         Preconditions.checkState(removedShm == shm);
         full.put(shmId, shm);
       } else {
-        if (LOG.isTraceEnabled()) {
-          LOG.trace(this + ": pulled slot " + slot.getSlotIdx() +
-              " out of " + shm);
-        }
+        LOG.trace("{}: pulled slot {} out of {}", this, slot.getSlotIdx(), shm);
       }
       return slot;
     }
@@ -187,9 +182,7 @@
           DfsClientShm shm = 
               new DfsClientShm(PBHelperClient.convert(resp.getId()),
                   fis[0], this, peer);
-          if (LOG.isTraceEnabled()) {
-            LOG.trace(this + ": createNewShm: created " + shm);
-          }
+          LOG.trace("{}: createNewShm: created {}", this, shm);
           return shm;
         } finally {
           try {
@@ -234,15 +227,11 @@
         String clientName, ExtendedBlockId blockId) throws IOException {
       while (true) {
         if (closed) {
-          if (LOG.isTraceEnabled()) {
-            LOG.trace(this + ": the DfsClientShmManager has been closed.");
-          }
+          LOG.trace("{}: the DfsClientShmManager has been closed.", this);
           return null;
         }
         if (disabled) {
-          if (LOG.isTraceEnabled()) {
-            LOG.trace(this + ": shared memory segment access is disabled.");
-          }
+          LOG.trace("{}: shared memory segment access is disabled.", this);
           return null;
         }
         // Try to use an existing slot.
@@ -253,9 +242,7 @@
         // There are no free slots.  If someone is loading more slots, wait
         // for that to finish.
         if (loading) {
-          if (LOG.isTraceEnabled()) {
-            LOG.trace(this + ": waiting for loading to finish...");
-          }
+          LOG.trace("{}: waiting for loading to finish...", this);
           finishedLoading.awaitUninterruptibly();
         } else {
           // Otherwise, load the slot ourselves.
@@ -282,11 +269,9 @@
             // fired and marked the shm as disconnected.  In this case, we
             // obviously don't want to add the SharedMemorySegment to our list
             // of valid not-full segments.
-            if (LOG.isDebugEnabled()) {
-              LOG.debug(this + ": the UNIX domain socket associated with " +
-                  "this short-circuit memory closed before we could make " +
-                  "use of the shm.");
-            }
+            LOG.debug("{}: the UNIX domain socket associated with this "
+                + "short-circuit memory closed before we could make use of "
+                + "the shm.", this);
           } else {
             notFull.put(shm.getShmId(), shm);
           }
@@ -309,9 +294,7 @@
         Preconditions.checkState(!full.containsKey(shm.getShmId()));
         Preconditions.checkState(!notFull.containsKey(shm.getShmId()));
         if (shm.isEmpty()) {
-          if (LOG.isTraceEnabled()) {
-            LOG.trace(this + ": freeing empty stale " + shm);
-          }
+          LOG.trace("{}: freeing empty stale {}", this, shm);
           shm.free();
         }
       } else {
@@ -336,10 +319,8 @@
           // lowest ID, but it could still occur.  In most workloads,
           // fragmentation should not be a major concern, since it doesn't impact
           // peak file descriptor usage or the speed of allocation.
-          if (LOG.isTraceEnabled()) {
-            LOG.trace(this + ": shutting down UNIX domain socket for " +
-                "empty " + shm);
-          }
+          LOG.trace("{}: shutting down UNIX domain socket for empty {}",
+              this, shm);
           shutdown(shm);
         } else {
           notFull.put(shmId, shm);
diff --git a/hadoop-hdfs-project/hadoop-hdfs-client/src/main/java/org/apache/hadoop/hdfs/shortcircuit/ShortCircuitCache.java b/hadoop-hdfs-project/hadoop-hdfs-client/src/main/java/org/apache/hadoop/hdfs/shortcircuit/ShortCircuitCache.java
index 52c1a6e..07f5064 100644
--- a/hadoop-hdfs-project/hadoop-hdfs-client/src/main/java/org/apache/hadoop/hdfs/shortcircuit/ShortCircuitCache.java
+++ b/hadoop-hdfs-project/hadoop-hdfs-client/src/main/java/org/apache/hadoop/hdfs/shortcircuit/ShortCircuitCache.java
@@ -103,9 +103,7 @@
         if (ShortCircuitCache.this.closed) return;
         long curMs = Time.monotonicNow();
 
-        if (LOG.isDebugEnabled()) {
-          LOG.debug(this + ": cache cleaner running at " + curMs);
-        }
+        LOG.debug("{}: cache cleaner running at {}", this, curMs);
 
         int numDemoted = demoteOldEvictableMmaped(curMs);
         int numPurged = 0;
@@ -127,11 +125,9 @@
           numPurged++;
         }
 
-        if (LOG.isDebugEnabled()) {
-          LOG.debug(this + ": finishing cache cleaner run started at " +
-            curMs + ".  Demoted " + numDemoted + " mmapped replicas; " +
-            "purged " + numPurged + " replicas.");
-        }
+        LOG.debug("{}: finishing cache cleaner run started at {}. Demoted {} "
+            + "mmapped replicas; purged {} replicas.",
+            this, curMs, numDemoted, numPurged);
       } finally {
         ShortCircuitCache.this.lock.unlock();
       }
@@ -186,9 +182,7 @@
 
     @Override
     public void run() {
-      if (LOG.isTraceEnabled()) {
-        LOG.trace(ShortCircuitCache.this + ": about to release " + slot);
-      }
+      LOG.trace("{}: about to release {}", ShortCircuitCache.this, slot);
       final DfsClientShm shm = (DfsClientShm)slot.getShm();
       final DomainSocket shmSock = shm.getPeer().getDomainSocket();
       final String path = shmSock.getPath();
@@ -205,9 +199,7 @@
           String error = resp.hasError() ? resp.getError() : "(unknown)";
           throw new IOException(resp.getStatus().toString() + ": " + error);
         }
-        if (LOG.isTraceEnabled()) {
-          LOG.trace(ShortCircuitCache.this + ": released " + slot);
-        }
+        LOG.trace("{}: released {}", this, slot);
         success = true;
       } catch (IOException e) {
         LOG.error(ShortCircuitCache.this + ": failed to release " +
@@ -433,9 +425,7 @@
           purgeReason = "purging replica because it is stale.";
         }
         if (purgeReason != null) {
-          if (LOG.isDebugEnabled()) {
-            LOG.debug(this + ": " + purgeReason);
-          }
+          LOG.debug("{}: {}", this, purgeReason);
           purge(replica);
         }
       }
@@ -677,10 +667,8 @@
       ShortCircuitReplicaInfo info = null;
       do {
         if (closed) {
-          if (LOG.isTraceEnabled()) {
-            LOG.trace(this + ": can't fetchOrCreate " + key +
-                " because the cache is closed.");
-          }
+          LOG.trace("{}: can't fethchOrCreate {} because the cache is closed.",
+              this, key);
           return null;
         }
         Waitable<ShortCircuitReplicaInfo> waitable = replicaInfoMap.get(key);
@@ -688,9 +676,7 @@
           try {
             info = fetch(key, waitable);
           } catch (RetriableException e) {
-            if (LOG.isDebugEnabled()) {
-              LOG.debug(this + ": retrying " + e.getMessage());
-            }
+            LOG.debug("{}: retrying {}", this, e.getMessage());
             continue;
           }
         }
@@ -721,9 +707,7 @@
     // ShortCircuitReplica.  So we simply wait for it to complete.
     ShortCircuitReplicaInfo info;
     try {
-      if (LOG.isTraceEnabled()) {
-        LOG.trace(this + ": found waitable for " + key);
-      }
+      LOG.trace("{}: found waitable for {}", this, key);
       info = waitable.await();
     } catch (InterruptedException e) {
       LOG.info(this + ": interrupted while waiting for " + key);
@@ -765,9 +749,7 @@
     // Handle loading a new replica.
     ShortCircuitReplicaInfo info = null;
     try {
-      if (LOG.isTraceEnabled()) {
-        LOG.trace(this + ": loading " + key);
-      }
+      LOG.trace("{}: loading {}", this, key);
       info = creator.createShortCircuitReplicaInfo();
     } catch (RuntimeException e) {
       LOG.warn(this + ": failed to load " + key, e);
@@ -777,9 +759,7 @@
     try {
       if (info.getReplica() != null) {
         // On success, make sure the cache cleaner thread is running.
-        if (LOG.isTraceEnabled()) {
-          LOG.trace(this + ": successfully loaded " + info.getReplica());
-        }
+        LOG.trace("{}: successfully loaded {}", this, info.getReplica());
         startCacheCleanerThreadIfNeeded();
         // Note: new ShortCircuitReplicas start with a refCount of 2,
         // indicating that both this cache and whoever requested the 
@@ -811,10 +791,8 @@
           cleanerExecutor.scheduleAtFixedRate(cacheCleaner, rateMs, rateMs,
               TimeUnit.MILLISECONDS);
       cacheCleaner.setFuture(future);
-      if (LOG.isDebugEnabled()) {
-        LOG.debug(this + ": starting cache cleaner thread which will run " +
-          "every " + rateMs + " ms");
-      }
+      LOG.debug("{}: starting cache cleaner thread which will run every {} ms",
+          this, rateMs);
     }
   }
 
@@ -832,17 +810,12 @@
           long lastAttemptTimeMs = (Long)replica.mmapData;
           long delta = Time.monotonicNow() - lastAttemptTimeMs;
           if (delta < mmapRetryTimeoutMs) {
-            if (LOG.isTraceEnabled()) {
-              LOG.trace(this + ": can't create client mmap for " +
-                  replica + " because we failed to " +
-                  "create one just " + delta + "ms ago.");
-            }
+            LOG.trace("{}: can't create client mmap for {} because we failed to"
+                + " create one just {}ms ago.", this, replica, delta);
             return null;
           }
-          if (LOG.isTraceEnabled()) {
-            LOG.trace(this + ": retrying client mmap for " + replica +
-                ", " + delta + " ms after the previous failure.");
-          }
+          LOG.trace("{}: retrying client mmap for {}, {} ms after the previous "
+              + "failure.", this, replica, delta);
         } else if (replica.mmapData instanceof Condition) {
           Condition cond = (Condition)replica.mmapData;
           cond.awaitUninterruptibly();
@@ -965,38 +938,10 @@
           }
         }
       }
-      if (LOG.isDebugEnabled()) {
-        StringBuilder builder = new StringBuilder();
-        builder.append("visiting ").append(visitor.getClass().getName()).
-            append("with outstandingMmapCount=").append(outstandingMmapCount).
-            append(", replicas=");
-        String prefix = "";
-        for (Entry<ExtendedBlockId, ShortCircuitReplica> entry : replicas.entrySet()) {
-          builder.append(prefix).append(entry.getValue());
-          prefix = ",";
-        }
-        prefix = "";
-        builder.append(", failedLoads=");
-        for (Entry<ExtendedBlockId, InvalidToken> entry : failedLoads.entrySet()) {
-          builder.append(prefix).append(entry.getValue());
-          prefix = ",";
-        }
-        prefix = "";
-        builder.append(", evictable=");
-        for (Entry<Long, ShortCircuitReplica> entry : evictable.entrySet()) {
-          builder.append(prefix).append(entry.getKey()).
-              append(":").append(entry.getValue());
-          prefix = ",";
-        }
-        prefix = "";
-        builder.append(", evictableMmapped=");
-        for (Entry<Long, ShortCircuitReplica> entry : evictableMmapped.entrySet()) {
-          builder.append(prefix).append(entry.getKey()).
-              append(":").append(entry.getValue());
-          prefix = ",";
-        }
-        LOG.debug(builder.toString());
-      }
+      LOG.debug("visiting {} with outstandingMmapCount={}, replicas={}, "
+          + "failedLoads={}, evictable={}, evictableMmapped={}",
+          visitor.getClass().getName(), outstandingMmapCount, replicas,
+          failedLoads, evictable, evictableMmapped);
       visitor.visit(outstandingMmapCount, replicas, failedLoads,
             evictable, evictableMmapped);
     } finally {
diff --git a/hadoop-hdfs-project/hadoop-hdfs-client/src/main/java/org/apache/hadoop/hdfs/shortcircuit/ShortCircuitReplica.java b/hadoop-hdfs-project/hadoop-hdfs-client/src/main/java/org/apache/hadoop/hdfs/shortcircuit/ShortCircuitReplica.java
index 37566e2..38cf22b 100644
--- a/hadoop-hdfs-project/hadoop-hdfs-client/src/main/java/org/apache/hadoop/hdfs/shortcircuit/ShortCircuitReplica.java
+++ b/hadoop-hdfs-project/hadoop-hdfs-client/src/main/java/org/apache/hadoop/hdfs/shortcircuit/ShortCircuitReplica.java
@@ -154,25 +154,19 @@
       // Check staleness by looking at the shared memory area we use to
       // communicate with the DataNode.
       boolean stale = !slot.isValid();
-      if (LOG.isTraceEnabled()) {
-        LOG.trace(this + ": checked shared memory segment.  isStale=" + stale);
-      }
+      LOG.trace("{}: checked shared memory segment.  isStale={}", this, stale);
       return stale;
     } else {
       // Fall back to old, time-based staleness method.
       long deltaMs = Time.monotonicNow() - creationTimeMs;
       long staleThresholdMs = cache.getStaleThresholdMs();
       if (deltaMs > staleThresholdMs) {
-        if (LOG.isTraceEnabled()) {
-          LOG.trace(this + " is stale because it's " + deltaMs +
-              " ms old, and staleThresholdMs = " + staleThresholdMs);
-        }
+        LOG.trace("{} is stale because it's {} ms old and staleThreadholdMS={}",
+            this, deltaMs, staleThresholdMs);
         return true;
       } else {
-        if (LOG.isTraceEnabled()) {
-          LOG.trace(this + " is not stale because it's only " + deltaMs +
-              " ms old, and staleThresholdMs = " + staleThresholdMs);
-        }
+        LOG.trace("{} is not stale because it's only {} ms old "
+            + "and staleThresholdMs={}",  this, deltaMs, staleThresholdMs);
         return false;
       }
     }
@@ -194,13 +188,8 @@
       return false;
     }
     boolean result = slot.addAnchor();
-    if (LOG.isTraceEnabled()) {
-      if (result) {
-        LOG.trace(this + ": added no-checksum anchor to slot " + slot);
-      } else {
-        LOG.trace(this + ": could not add no-checksum anchor to slot " + slot);
-      }
-    }
+    LOG.trace("{}: {} no-checksum anchor to slot {}",
+        this, result ? "added" : "could not add", slot);
     return result;
   }
 
@@ -263,9 +252,7 @@
         suffix += "  scheduling " + slot + " for later release.";
       }
     }
-    if (LOG.isTraceEnabled()) {
-      LOG.trace("closed " + this + suffix);
-    }
+    LOG.trace("closed {}{}", this, suffix);
   }
 
   public FileInputStream getDataStream() {
@@ -293,9 +280,7 @@
       FileChannel channel = dataStream.getChannel();
       MappedByteBuffer mmap = channel.map(MapMode.READ_ONLY, 0, 
           Math.min(Integer.MAX_VALUE, channel.size()));
-      if (LOG.isTraceEnabled()) {
-        LOG.trace(this + ": created mmap of size " + channel.size());
-      }
+      LOG.trace("{}: created mmap of size {}", this, channel.size());
       return mmap;
     } catch (IOException e) {
       LOG.warn(this + ": mmap error", e);
diff --git a/hadoop-hdfs-project/hadoop-hdfs-client/src/main/java/org/apache/hadoop/hdfs/shortcircuit/ShortCircuitShm.java b/hadoop-hdfs-project/hadoop-hdfs-client/src/main/java/org/apache/hadoop/hdfs/shortcircuit/ShortCircuitShm.java
index 78325a3..fa40c15 100644
--- a/hadoop-hdfs-project/hadoop-hdfs-client/src/main/java/org/apache/hadoop/hdfs/shortcircuit/ShortCircuitShm.java
+++ b/hadoop-hdfs-project/hadoop-hdfs-client/src/main/java/org/apache/hadoop/hdfs/shortcircuit/ShortCircuitShm.java
@@ -484,13 +484,9 @@
         POSIX.MMAP_PROT_READ | POSIX.MMAP_PROT_WRITE, true, mmappedLength);
     this.slots = new Slot[mmappedLength / BYTES_PER_SLOT];
     this.allocatedSlots = new BitSet(slots.length);
-    if (LOG.isTraceEnabled()) {
-      LOG.trace("creating " + this.getClass().getSimpleName() +
-          "(shmId=" + shmId +
-          ", mmappedLength=" + mmappedLength +
-          ", baseAddress=" + String.format("%x", baseAddress) +
-          ", slots.length=" + slots.length + ")");
-    }
+    LOG.trace("creating {}(shmId={}, mmappedLength={}, baseAddress={}, "
+        + "slots.length={})", this.getClass().getSimpleName(), shmId,
+        mmappedLength, String.format("%x", baseAddress), slots.length);
   }
 
   public final ShmId getShmId() {
@@ -615,9 +611,7 @@
         "tried to unregister slot " + slotIdx + ", which was not registered.");
     allocatedSlots.set(slotIdx, false);
     slots[slotIdx] = null;
-    if (LOG.isTraceEnabled()) {
-      LOG.trace(this + ": unregisterSlot " + slotIdx);
-    }
+    LOG.trace("{}: unregisterSlot {}", this, slotIdx);
   }
   
   /**
diff --git a/hadoop-hdfs-project/hadoop-hdfs-client/src/main/java/org/apache/hadoop/hdfs/util/ByteArrayManager.java b/hadoop-hdfs-project/hadoop-hdfs-client/src/main/java/org/apache/hadoop/hdfs/util/ByteArrayManager.java
index a9adb7e..e361252 100644
--- a/hadoop-hdfs-project/hadoop-hdfs-client/src/main/java/org/apache/hadoop/hdfs/util/ByteArrayManager.java
+++ b/hadoop-hdfs-project/hadoop-hdfs-client/src/main/java/org/apache/hadoop/hdfs/util/ByteArrayManager.java
@@ -36,18 +36,6 @@
 @InterfaceAudience.Private
 public abstract class ByteArrayManager {
   static final Logger LOG = LoggerFactory.getLogger(ByteArrayManager.class);
-  private static final ThreadLocal<StringBuilder> DEBUG_MESSAGE =
-      new ThreadLocal<StringBuilder>() {
-    protected StringBuilder initialValue() {
-      return new StringBuilder();
-    }
-  };
-
-  private static void logDebugMessage() {
-    final StringBuilder b = DEBUG_MESSAGE.get();
-    LOG.debug(b.toString());
-    b.setLength(0);
-  }
 
   static final int MIN_ARRAY_LENGTH = 32;
   static final byte[] EMPTY_BYTE_ARRAY = {};
@@ -160,27 +148,18 @@
      * via the {@link FixedLengthManager#recycle(byte[])} method.
      */
     synchronized byte[] allocate() throws InterruptedException {
-      if (LOG.isDebugEnabled()) {
-        DEBUG_MESSAGE.get().append(", ").append(this);
-      }
+      LOG.debug(", {}", this);
       for(; numAllocated >= maxAllocated;) {
-        if (LOG.isDebugEnabled()) {
-          DEBUG_MESSAGE.get().append(": wait ...");
-          logDebugMessage();
-        }
+        LOG.debug(": wait ...");
 
         wait();
 
-        if (LOG.isDebugEnabled()) {
-          DEBUG_MESSAGE.get().append("wake up: ").append(this);
-        }
+        LOG.debug("wake up: {}", this);
       }
       numAllocated++;
 
       final byte[] array = freeQueue.poll();
-      if (LOG.isDebugEnabled()) {
-        DEBUG_MESSAGE.get().append(", recycled? ").append(array != null);
-      }
+      LOG.debug(", recycled? {}", array != null);
       return array != null? array : new byte[byteArrayLength];
     }
 
@@ -194,9 +173,7 @@
     synchronized int recycle(byte[] array) {
       Preconditions.checkNotNull(array);
       Preconditions.checkArgument(array.length == byteArrayLength);
-      if (LOG.isDebugEnabled()) {
-        DEBUG_MESSAGE.get().append(", ").append(this);
-      }
+      LOG.debug(", {}", this);
 
       notify();
       numAllocated--;
@@ -207,9 +184,7 @@
       }
 
       if (freeQueue.size() < maxAllocated - numAllocated) {
-        if (LOG.isDebugEnabled()) {
-          DEBUG_MESSAGE.get().append(", freeQueue.offer");
-        }
+        LOG.debug(", freeQueue.offer");
         freeQueue.offer(array);
       }
       return freeQueue.size();
@@ -349,9 +324,7 @@
     public byte[] newByteArray(final int arrayLength)
         throws InterruptedException {
       Preconditions.checkArgument(arrayLength >= 0);
-      if (LOG.isDebugEnabled()) {
-        DEBUG_MESSAGE.get().append("allocate(").append(arrayLength).append(")");
-      }
+      LOG.debug("allocate({})", arrayLength);
 
       final byte[] array;
       if (arrayLength == 0) {
@@ -365,18 +338,12 @@
         final FixedLengthManager manager =
             managers.get(powerOfTwo, aboveThreshold);
 
-        if (LOG.isDebugEnabled()) {
-          DEBUG_MESSAGE.get().append(": count=").append(count)
-              .append(aboveThreshold? ", aboveThreshold": ", belowThreshold");
-        }
+        LOG.debug(": count={}, {}Threshold", count,
+            aboveThreshold ? "above" : "below");
         array = manager != null? manager.allocate(): new byte[powerOfTwo];
       }
 
-      if (LOG.isDebugEnabled()) {
-        DEBUG_MESSAGE.get().append(", return byte[")
-            .append(array.length).append("]");
-        logDebugMessage();
-      }
+      LOG.debug(", return byte[{}]", array.length);
       return array;
     }
 
@@ -391,10 +358,7 @@
     @Override
     public int release(final byte[] array) {
       Preconditions.checkNotNull(array);
-      if (LOG.isDebugEnabled()) {
-        DEBUG_MESSAGE.get()
-            .append("recycle: array.length=").append(array.length);
-      }
+      LOG.debug("recycle: array.length={}", array.length);
 
       final int freeQueueSize;
       if (array.length == 0) {
@@ -404,10 +368,7 @@
         freeQueueSize = manager == null? -1: manager.recycle(array);
       }
 
-      if (LOG.isDebugEnabled()) {
-        DEBUG_MESSAGE.get().append(", freeQueueSize=").append(freeQueueSize);
-        logDebugMessage();
-      }
+      LOG.debug(", freeQueueSize={}", freeQueueSize);
       return freeQueueSize;
     }
 
diff --git a/hadoop-hdfs-project/hadoop-hdfs-client/src/main/java/org/apache/hadoop/hdfs/web/TokenAspect.java b/hadoop-hdfs-project/hadoop-hdfs-client/src/main/java/org/apache/hadoop/hdfs/web/TokenAspect.java
index a864d37..870103ed 100644
--- a/hadoop-hdfs-project/hadoop-hdfs-client/src/main/java/org/apache/hadoop/hdfs/web/TokenAspect.java
+++ b/hadoop-hdfs-project/hadoop-hdfs-client/src/main/java/org/apache/hadoop/hdfs/web/TokenAspect.java
@@ -134,9 +134,7 @@
       if (token != null) {
         fs.setDelegationToken(token);
         addRenewAction(fs);
-        if (LOG.isDebugEnabled()) {
-          LOG.debug("Created new DT for {}", token.getService());
-        }
+        LOG.debug("Created new DT for {}", token.getService());
       }
       hasInitedToken = true;
     }
@@ -149,9 +147,7 @@
   synchronized void initDelegationToken(UserGroupInformation ugi) {
     Token<?> token = selectDelegationToken(ugi);
     if (token != null) {
-      if (LOG.isDebugEnabled()) {
-        LOG.debug("Found existing DT for {}", token.getService());
-      }
+      LOG.debug("Found existing DT for {}", token.getService());
       fs.setDelegationToken(token);
       hasInitedToken = true;
     }
diff --git a/hadoop-hdfs-project/hadoop-hdfs-client/src/main/java/org/apache/hadoop/hdfs/web/URLConnectionFactory.java b/hadoop-hdfs-project/hadoop-hdfs-client/src/main/java/org/apache/hadoop/hdfs/web/URLConnectionFactory.java
index 4c23241..be5f17d 100644
--- a/hadoop-hdfs-project/hadoop-hdfs-client/src/main/java/org/apache/hadoop/hdfs/web/URLConnectionFactory.java
+++ b/hadoop-hdfs-project/hadoop-hdfs-client/src/main/java/org/apache/hadoop/hdfs/web/URLConnectionFactory.java
@@ -182,9 +182,7 @@
   public URLConnection openConnection(URL url, boolean isSpnego)
       throws IOException, AuthenticationException {
     if (isSpnego) {
-      if (LOG.isDebugEnabled()) {
-        LOG.debug("open AuthenticatedURL connection {}", url);
-      }
+      LOG.debug("open AuthenticatedURL connection {}", url);
       UserGroupInformation.getCurrentUser().checkTGTAndReloginFromKeytab();
       final AuthenticatedURL.Token authToken = new AuthenticatedURL.Token();
       return new AuthenticatedURL(new KerberosUgiAuthenticator(),
diff --git a/hadoop-hdfs-project/hadoop-hdfs-client/src/main/java/org/apache/hadoop/hdfs/web/WebHdfsFileSystem.java b/hadoop-hdfs-project/hadoop-hdfs-client/src/main/java/org/apache/hadoop/hdfs/web/WebHdfsFileSystem.java
index e245d2a..e122748 100644
--- a/hadoop-hdfs-project/hadoop-hdfs-client/src/main/java/org/apache/hadoop/hdfs/web/WebHdfsFileSystem.java
+++ b/hadoop-hdfs-project/hadoop-hdfs-client/src/main/java/org/apache/hadoop/hdfs/web/WebHdfsFileSystem.java
@@ -233,16 +233,12 @@
       // refetch tokens.  even if ugi has credentials, don't attempt
       // to get another token to match hdfs/rpc behavior
       if (token != null) {
-        if(LOG.isDebugEnabled()) {
-          LOG.debug("Using UGI token: {}", token);
-        }
+        LOG.debug("Using UGI token: {}", token);
         canRefreshDelegationToken = false;
       } else {
         token = getDelegationToken(null);
         if (token != null) {
-          if(LOG.isDebugEnabled()) {
-            LOG.debug("Fetched new token: {}", token);
-          }
+          LOG.debug("Fetched new token: {}", token);
         } else { // security is disabled
           canRefreshDelegationToken = false;
         }
@@ -257,9 +253,7 @@
     boolean replaced = false;
     if (canRefreshDelegationToken) {
       Token<?> token = getDelegationToken(null);
-      if(LOG.isDebugEnabled()) {
-        LOG.debug("Replaced expired token: {}", token);
-      }
+      LOG.debug("Replaced expired token: {}", token);
       setDelegationToken(token);
       replaced = (token != null);
     }
@@ -442,9 +436,7 @@
     InetSocketAddress nnAddr = getCurrentNNAddr();
     final URL url = new URL(getTransportScheme(), nnAddr.getHostName(),
           nnAddr.getPort(), path + '?' + query);
-    if (LOG.isTraceEnabled()) {
-      LOG.trace("url={}", url);
-    }
+    LOG.trace("url={}", url);
     return url;
   }
 
@@ -479,9 +471,7 @@
         + Param.toSortedString("&", getAuthParameters(op))
         + Param.toSortedString("&", parameters);
     final URL url = getNamenodeURL(path, query);
-    if (LOG.isTraceEnabled()) {
-      LOG.trace("url={}", url);
-    }
+    LOG.trace("url={}", url);
     return url;
   }
 
@@ -769,9 +759,7 @@
       } catch (Exception e) { // catch json parser errors
         final IOException ioe =
             new IOException("Response decoding failure: "+e.toString(), e);
-        if (LOG.isDebugEnabled()) {
-          LOG.debug("Response decoding failure: {}", e.toString(), e);
-        }
+        LOG.debug("Response decoding failure.", e);
         throw ioe;
       } finally {
         conn.disconnect();
@@ -1242,9 +1230,7 @@
         cancelDelegationToken(delegationToken);
       }
     } catch (IOException ioe) {
-      if (LOG.isDebugEnabled()) {
-        LOG.debug("Token cancel failed: ", ioe);
-      }
+      LOG.debug("Token cancel failed: ", ioe);
     } finally {
       super.close();
     }
diff --git a/hadoop-hdfs-project/hadoop-hdfs-client/src/main/resources/META-INF/services/org.apache.hadoop.fs.FileSystem b/hadoop-hdfs-project/hadoop-hdfs-client/src/main/resources/META-INF/services/org.apache.hadoop.fs.FileSystem
index 3c832de..abe2bfc 100644
--- a/hadoop-hdfs-project/hadoop-hdfs-client/src/main/resources/META-INF/services/org.apache.hadoop.fs.FileSystem
+++ b/hadoop-hdfs-project/hadoop-hdfs-client/src/main/resources/META-INF/services/org.apache.hadoop.fs.FileSystem
@@ -13,5 +13,6 @@
 # See the License for the specific language governing permissions and
 # limitations under the License.
 
+org.apache.hadoop.hdfs.DistributedFileSystem
 org.apache.hadoop.hdfs.web.WebHdfsFileSystem
 org.apache.hadoop.hdfs.web.SWebHdfsFileSystem
diff --git a/hadoop-hdfs-project/hadoop-hdfs/CHANGES.txt b/hadoop-hdfs-project/hadoop-hdfs/CHANGES.txt
index d55beae..cedf1a7 100644
--- a/hadoop-hdfs-project/hadoop-hdfs/CHANGES.txt
+++ b/hadoop-hdfs-project/hadoop-hdfs/CHANGES.txt
@@ -991,6 +991,18 @@
     HDFS-9148. Incorrect assert message in TestWriteToReplica#testWriteToTemporary
     (Tony Wu via lei)
 
+    HDFS-8859. Improve DataNode ReplicaMap memory footprint to save about 45%.
+    (yliu)
+
+    HDFS-9165. Move entries in META-INF/services/o.a.h.fs.FileSystem to
+    hdfs-client. (Mingliang Liu via wheat9)
+
+    HDFS-8696. Make the lower and higher watermark in the DN Netty server
+    configurable. (Xiaobing Zhou via wheat9)
+
+    HDFS-8971. Remove guards when calling LOG.debug() and LOG.trace() in client
+    package. (Mingliang Liu via wheat9)
+
   OPTIMIZATIONS
 
     HDFS-8026. Trace FSOutputSummer#writeChecksumChunks rather than
@@ -1450,6 +1462,15 @@
     HDFS-9092. Nfs silently drops overlapping write requests and causes data
     copying to fail. (Yongjun Zhang)
 
+    HDFS-9141. Thread leak in Datanode#refreshVolumes. (Uma Maheswara Rao G
+    via yliu)
+
+    HDFS-9174. Fix findbugs warnings in FSOutputSummer.tracer and
+    DirectoryScanner$ReportCompiler.currentThread. (Yi Liu via wheat9)
+
+    HDFS-9001. DFSUtil.getNsServiceRpcUris() can return too many entries in a
+    non-HA, non-federated cluster. (Daniel Templeton via atm)
+
 Release 2.7.2 - UNRELEASED
 
   INCOMPATIBLE CHANGES
diff --git a/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/DFSConfigKeys.java b/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/DFSConfigKeys.java
index 0166029..0d24c8f 100644
--- a/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/DFSConfigKeys.java
+++ b/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/DFSConfigKeys.java
@@ -64,9 +64,12 @@
       HdfsClientConfigKeys.DFS_HDFS_BLOCKS_METADATA_ENABLED_DEFAULT;
   public static final String DFS_WEBHDFS_ACL_PERMISSION_PATTERN_DEFAULT =
       HdfsClientConfigKeys.DFS_WEBHDFS_ACL_PERMISSION_PATTERN_DEFAULT;
-
-  public static final String DFS_CLIENT_HTRACE_SAMPLER_CLASSES =
-      "dfs.client.htrace.sampler.classes";
+  public static final String  DFS_WEBHDFS_NETTY_LOW_WATERMARK =
+      "dfs.webhdfs.netty.low.watermark";
+  public static final int  DFS_WEBHDFS_NETTY_LOW_WATERMARK_DEFAULT = 32768;
+  public static final String  DFS_WEBHDFS_NETTY_HIGH_WATERMARK =
+      "dfs.webhdfs.netty.high.watermark";
+  public static final int  DFS_WEBHDFS_NETTY_HIGH_WATERMARK_DEFAULT = 65535;
 
   // HA related configuration
   public static final String  DFS_DATANODE_RESTART_REPLICA_EXPIRY_KEY = "dfs.datanode.restart.replica.expiration";
diff --git a/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/DFSUtil.java b/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/DFSUtil.java
index b0ea7ce..2309843 100644
--- a/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/DFSUtil.java
+++ b/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/DFSUtil.java
@@ -734,20 +734,29 @@
         }
       }
     }
-    
-    // Add the default URI if it is an HDFS URI.
-    URI defaultUri = FileSystem.getDefaultUri(conf);
-    // checks if defaultUri is ip:port format
-    // and convert it to hostname:port format
-    if (defaultUri != null && (defaultUri.getPort() != -1)) {
-      defaultUri = createUri(defaultUri.getScheme(),
-          NetUtils.createSocketAddr(defaultUri.getHost(), 
-              defaultUri.getPort()));
-    }
-    if (defaultUri != null &&
-        HdfsConstants.HDFS_URI_SCHEME.equals(defaultUri.getScheme()) &&
-        !nonPreferredUris.contains(defaultUri)) {
-      ret.add(defaultUri);
+
+    // Add the default URI if it is an HDFS URI and we haven't come up with a
+    // valid non-nameservice NN address yet.  Consider the servicerpc-address
+    // and rpc-address to be the "unnamed" nameservice.  defaultFS is our
+    // fallback when rpc-address isn't given.  We therefore only want to add
+    // the defaultFS when neither the servicerpc-address (which is preferred)
+    // nor the rpc-address (which overrides defaultFS) is given.
+    if (!uriFound) {
+      URI defaultUri = FileSystem.getDefaultUri(conf);
+
+      // checks if defaultUri is ip:port format
+      // and convert it to hostname:port format
+      if (defaultUri != null && (defaultUri.getPort() != -1)) {
+        defaultUri = createUri(defaultUri.getScheme(),
+            NetUtils.createSocketAddr(defaultUri.getHost(),
+                defaultUri.getPort()));
+      }
+
+      if (defaultUri != null &&
+          HdfsConstants.HDFS_URI_SCHEME.equals(defaultUri.getScheme()) &&
+          !nonPreferredUris.contains(defaultUri)) {
+        ret.add(defaultUri);
+      }
     }
     
     return ret;
diff --git a/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/datanode/DataNode.java b/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/datanode/DataNode.java
index 43ddf74..10a8a9c 100644
--- a/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/datanode/DataNode.java
+++ b/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/datanode/DataNode.java
@@ -25,9 +25,7 @@
 import static org.apache.hadoop.hdfs.DFSConfigKeys.DFS_DATANODE_DATA_DIR_PERMISSION_KEY;
 import static org.apache.hadoop.hdfs.DFSConfigKeys.DFS_DATANODE_DIRECTORYSCAN_INTERVAL_DEFAULT;
 import static org.apache.hadoop.hdfs.DFSConfigKeys.DFS_DATANODE_DIRECTORYSCAN_INTERVAL_KEY;
-import static org.apache.hadoop.hdfs.DFSConfigKeys.DFS_DATANODE_DNS_INTERFACE_DEFAULT;
 import static org.apache.hadoop.hdfs.DFSConfigKeys.DFS_DATANODE_DNS_INTERFACE_KEY;
-import static org.apache.hadoop.hdfs.DFSConfigKeys.DFS_DATANODE_DNS_NAMESERVER_DEFAULT;
 import static org.apache.hadoop.hdfs.DFSConfigKeys.DFS_DATANODE_DNS_NAMESERVER_KEY;
 import static org.apache.hadoop.hdfs.DFSConfigKeys.DFS_DATANODE_HANDLER_COUNT_DEFAULT;
 import static org.apache.hadoop.hdfs.DFSConfigKeys.DFS_DATANODE_HANDLER_COUNT_KEY;
@@ -606,7 +604,7 @@
   private synchronized void refreshVolumes(String newVolumes) throws IOException {
     Configuration conf = getConf();
     conf.set(DFS_DATANODE_DATA_DIR_KEY, newVolumes);
-
+    ExecutorService service = null;
     int numOldDataDirs = dataDirs.size();
     ChangedVolumes changedVolumes = parseChangedVolumes(newVolumes);
     StringBuilder errorMessageBuilder = new StringBuilder();
@@ -629,8 +627,8 @@
         for (BPOfferService bpos : blockPoolManager.getAllNamenodeThreads()) {
           nsInfos.add(bpos.getNamespaceInfo());
         }
-        ExecutorService service = Executors.newFixedThreadPool(
-            changedVolumes.newLocations.size());
+        service = Executors
+            .newFixedThreadPool(changedVolumes.newLocations.size());
         List<Future<IOException>> exceptions = Lists.newArrayList();
         for (final StorageLocation location : changedVolumes.newLocations) {
           exceptions.add(service.submit(new Callable<IOException>() {
@@ -680,6 +678,9 @@
         throw new IOException(errorMessageBuilder.toString());
       }
     } finally {
+      if (service != null) {
+        service.shutdown();
+      }
       conf.set(DFS_DATANODE_DATA_DIR_KEY,
           Joiner.on(",").join(effectiveVolumes));
       dataDirs = getStorageLocations(conf);
diff --git a/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/datanode/DirectoryScanner.java b/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/datanode/DirectoryScanner.java
index b8ea5bf..392c121 100644
--- a/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/datanode/DirectoryScanner.java
+++ b/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/datanode/DirectoryScanner.java
@@ -785,12 +785,6 @@
     private final StopWatch perfTimer = new StopWatch();
 
     /**
-     * The associated thread.  Used for testing purposes only.
-     */
-    @VisibleForTesting
-    Thread currentThread;
-
-    /**
      * Create a report compiler for the given volume on the given datanode.
      *
      * @param datanode the target datanode
@@ -809,8 +803,6 @@
      */
     @Override
     public ScanInfoPerBlockPool call() throws IOException {
-      currentThread = Thread.currentThread();
-
       String[] bpList = volume.getBlockPoolList();
       ScanInfoPerBlockPool result = new ScanInfoPerBlockPool(bpList.length);
       for (String bpid : bpList) {
diff --git a/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/datanode/ReplicaInfo.java b/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/datanode/ReplicaInfo.java
index 31b14fa..d19e656 100644
--- a/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/datanode/ReplicaInfo.java
+++ b/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/datanode/ReplicaInfo.java
@@ -18,20 +18,13 @@
 package org.apache.hadoop.hdfs.server.datanode;
 
 import java.io.File;
-import java.io.FileInputStream;
-import java.io.FileOutputStream;
-import java.io.IOException;
-import java.util.ArrayList;
 import java.util.HashMap;
-import java.util.List;
 import java.util.Map;
 
 import org.apache.hadoop.classification.InterfaceAudience;
-import org.apache.hadoop.fs.FileUtil;
-import org.apache.hadoop.fs.HardLink;
 import org.apache.hadoop.hdfs.protocol.Block;
 import org.apache.hadoop.hdfs.server.datanode.fsdataset.FsVolumeSpi;
-import org.apache.hadoop.io.IOUtils;
+import org.apache.hadoop.util.LightWeightResizableGSet;
 
 import com.google.common.annotations.VisibleForTesting;
 
@@ -40,8 +33,12 @@
  * It provides a general interface for meta information of a replica.
  */
 @InterfaceAudience.Private
-abstract public class ReplicaInfo extends Block implements Replica {
-  
+abstract public class ReplicaInfo extends Block
+    implements Replica, LightWeightResizableGSet.LinkedElement {
+
+  /** For implementing {@link LightWeightResizableGSet.LinkedElement} interface */
+  private LightWeightResizableGSet.LinkedElement next;
+
   /** volume where the replica belongs */
   private FsVolumeSpi volume;
   
@@ -229,4 +226,14 @@
   public boolean isOnTransientStorage() {
     return volume.isTransientStorage();
   }
+
+  @Override
+  public LightWeightResizableGSet.LinkedElement getNext() {
+    return next;
+  }
+
+  @Override
+  public void setNext(LightWeightResizableGSet.LinkedElement next) {
+    this.next = next;
+  }
 }
diff --git a/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/datanode/fsdataset/impl/BlockPoolSlice.java b/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/datanode/fsdataset/impl/BlockPoolSlice.java
index 68c951a..571f085 100644
--- a/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/datanode/fsdataset/impl/BlockPoolSlice.java
+++ b/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/datanode/fsdataset/impl/BlockPoolSlice.java
@@ -743,7 +743,12 @@
       // Now it is safe to add the replica into volumeMap
       // In case of any exception during parsing this cache file, fall back
       // to scan all the files on disk.
-      for (ReplicaInfo info: tmpReplicaMap.replicas(bpid)) {
+      for (Iterator<ReplicaInfo> iter =
+          tmpReplicaMap.replicas(bpid).iterator(); iter.hasNext(); ) {
+        ReplicaInfo info = iter.next();
+        // We use a lightweight GSet to store replicaInfo, we need to remove
+        // it from one GSet before adding to another.
+        iter.remove();
         volumeMap.add(bpid, info);
       }
       LOG.info("Successfully read replica from cache file : " 
diff --git a/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/datanode/fsdataset/impl/ReplicaMap.java b/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/datanode/fsdataset/impl/ReplicaMap.java
index 617e0fd..6f0b8a7 100644
--- a/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/datanode/fsdataset/impl/ReplicaMap.java
+++ b/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/datanode/fsdataset/impl/ReplicaMap.java
@@ -24,6 +24,7 @@
 import org.apache.hadoop.HadoopIllegalArgumentException;
 import org.apache.hadoop.hdfs.protocol.Block;
 import org.apache.hadoop.hdfs.server.datanode.ReplicaInfo;
+import org.apache.hadoop.util.LightWeightResizableGSet;
 
 /**
  * Maintains the replica map. 
@@ -33,9 +34,9 @@
   private final Object mutex;
   
   // Map of block pool Id to another map of block Id to ReplicaInfo.
-  private final Map<String, Map<Long, ReplicaInfo>> map =
-    new HashMap<String, Map<Long, ReplicaInfo>>();
-  
+  private final Map<String, LightWeightResizableGSet<Block, ReplicaInfo>> map =
+    new HashMap<String, LightWeightResizableGSet<Block, ReplicaInfo>>();
+
   ReplicaMap(Object mutex) {
     if (mutex == null) {
       throw new HadoopIllegalArgumentException(
@@ -91,8 +92,8 @@
   ReplicaInfo get(String bpid, long blockId) {
     checkBlockPool(bpid);
     synchronized(mutex) {
-      Map<Long, ReplicaInfo> m = map.get(bpid);
-      return m != null ? m.get(blockId) : null;
+      LightWeightResizableGSet<Block, ReplicaInfo> m = map.get(bpid);
+      return m != null ? m.get(new Block(blockId)) : null;
     }
   }
   
@@ -108,13 +109,13 @@
     checkBlockPool(bpid);
     checkBlock(replicaInfo);
     synchronized(mutex) {
-      Map<Long, ReplicaInfo> m = map.get(bpid);
+      LightWeightResizableGSet<Block, ReplicaInfo> m = map.get(bpid);
       if (m == null) {
         // Add an entry for block pool if it does not exist already
-        m = new HashMap<Long, ReplicaInfo>();
+        m = new LightWeightResizableGSet<Block, ReplicaInfo>();
         map.put(bpid, m);
       }
-      return  m.put(replicaInfo.getBlockId(), replicaInfo);
+      return  m.put(replicaInfo);
     }
   }
 
@@ -137,14 +138,13 @@
     checkBlockPool(bpid);
     checkBlock(block);
     synchronized(mutex) {
-      Map<Long, ReplicaInfo> m = map.get(bpid);
+      LightWeightResizableGSet<Block, ReplicaInfo> m = map.get(bpid);
       if (m != null) {
-        Long key = Long.valueOf(block.getBlockId());
-        ReplicaInfo replicaInfo = m.get(key);
+        ReplicaInfo replicaInfo = m.get(block);
         if (replicaInfo != null &&
             block.getGenerationStamp() == replicaInfo.getGenerationStamp()) {
-          return m.remove(key);
-        } 
+          return m.remove(block);
+        }
       }
     }
     
@@ -160,9 +160,9 @@
   ReplicaInfo remove(String bpid, long blockId) {
     checkBlockPool(bpid);
     synchronized(mutex) {
-      Map<Long, ReplicaInfo> m = map.get(bpid);
+      LightWeightResizableGSet<Block, ReplicaInfo> m = map.get(bpid);
       if (m != null) {
-        return m.remove(blockId);
+        return m.remove(new Block(blockId));
       }
     }
     return null;
@@ -174,7 +174,7 @@
    * @return the number of replicas in the map
    */
   int size(String bpid) {
-    Map<Long, ReplicaInfo> m = null;
+    LightWeightResizableGSet<Block, ReplicaInfo> m = null;
     synchronized(mutex) {
       m = map.get(bpid);
       return m != null ? m.size() : 0;
@@ -192,7 +192,7 @@
    * @return a collection of the replicas belonging to the block pool
    */
   Collection<ReplicaInfo> replicas(String bpid) {
-    Map<Long, ReplicaInfo> m = null;
+    LightWeightResizableGSet<Block, ReplicaInfo> m = null;
     m = map.get(bpid);
     return m != null ? m.values() : null;
   }
@@ -200,10 +200,10 @@
   void initBlockPool(String bpid) {
     checkBlockPool(bpid);
     synchronized(mutex) {
-      Map<Long, ReplicaInfo> m = map.get(bpid);
+      LightWeightResizableGSet<Block, ReplicaInfo> m = map.get(bpid);
       if (m == null) {
         // Add an entry for block pool if it does not exist already
-        m = new HashMap<Long, ReplicaInfo>();
+        m = new LightWeightResizableGSet<Block, ReplicaInfo>();
         map.put(bpid, m);
       }
     }
diff --git a/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/datanode/web/DatanodeHttpServer.java b/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/datanode/web/DatanodeHttpServer.java
index 62c98e7..441d520 100644
--- a/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/datanode/web/DatanodeHttpServer.java
+++ b/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/datanode/web/DatanodeHttpServer.java
@@ -21,6 +21,7 @@
 import io.netty.channel.ChannelFactory;
 import io.netty.channel.ChannelFuture;
 import io.netty.channel.ChannelInitializer;
+import io.netty.channel.ChannelOption;
 import io.netty.channel.ChannelPipeline;
 import io.netty.channel.EventLoopGroup;
 import io.netty.channel.nio.NioEventLoopGroup;
@@ -30,10 +31,12 @@
 import io.netty.handler.codec.http.HttpResponseEncoder;
 import io.netty.handler.ssl.SslHandler;
 import io.netty.handler.stream.ChunkedWriteHandler;
+
 import org.apache.commons.logging.Log;
 import org.apache.commons.logging.LogFactory;
 import org.apache.hadoop.conf.Configuration;
 import org.apache.hadoop.fs.permission.FsPermission;
+import org.apache.hadoop.hdfs.DFSConfigKeys;
 import org.apache.hadoop.hdfs.DFSUtil;
 import org.apache.hadoop.hdfs.server.common.JspHelper;
 import org.apache.hadoop.hdfs.server.datanode.BlockScanner;
@@ -117,6 +120,18 @@
               conf, confForCreate));
         }
       });
+
+      this.httpServer.childOption(
+          ChannelOption.WRITE_BUFFER_HIGH_WATER_MARK,
+          conf.getInt(
+              DFSConfigKeys.DFS_WEBHDFS_NETTY_HIGH_WATERMARK,
+              DFSConfigKeys.DFS_WEBHDFS_NETTY_HIGH_WATERMARK_DEFAULT));
+      this.httpServer.childOption(
+          ChannelOption.WRITE_BUFFER_LOW_WATER_MARK,
+          conf.getInt(
+              DFSConfigKeys.DFS_WEBHDFS_NETTY_LOW_WATERMARK,
+              DFSConfigKeys.DFS_WEBHDFS_NETTY_LOW_WATERMARK_DEFAULT));
+
       if (externalHttpChannel == null) {
         httpServer.channel(NioServerSocketChannel.class);
       } else {
diff --git a/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/namenode/NameNodeRpcServer.java b/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/namenode/NameNodeRpcServer.java
index e9363b4..328b3c0 100644
--- a/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/namenode/NameNodeRpcServer.java
+++ b/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/namenode/NameNodeRpcServer.java
@@ -420,7 +420,7 @@
         UnresolvedPathException.class);
     clientRpcServer.setTracer(nn.tracer);
     if (serviceRpcServer != null) {
-      clientRpcServer.setTracer(nn.tracer);
+      serviceRpcServer.setTracer(nn.tracer);
     }
  }
 
diff --git a/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/namenode/NamenodeFsck.java b/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/namenode/NamenodeFsck.java
index c6a2118..404a71e 100644
--- a/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/namenode/NamenodeFsck.java
+++ b/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/namenode/NamenodeFsck.java
@@ -81,6 +81,7 @@
 import org.apache.hadoop.security.AccessControlException;
 import org.apache.hadoop.security.UserGroupInformation;
 import org.apache.hadoop.security.token.Token;
+import org.apache.hadoop.tracing.TraceUtils;
 import org.apache.hadoop.util.Time;
 import org.apache.htrace.core.Tracer;
 
@@ -206,7 +207,9 @@
     this.staleInterval =
         conf.getLong(DFSConfigKeys.DFS_NAMENODE_STALE_DATANODE_INTERVAL_KEY,
           DFSConfigKeys.DFS_NAMENODE_STALE_DATANODE_INTERVAL_DEFAULT);
-    this.tracer = new Tracer.Builder("NamenodeFsck").build();
+    this.tracer = new Tracer.Builder("NamenodeFsck").
+        conf(TraceUtils.wrapHadoopConf("namenode.fsck.htrace.", conf)).
+        build();
 
     for (Iterator<String> it = pmap.keySet().iterator(); it.hasNext();) {
       String key = it.next();
diff --git a/hadoop-hdfs-project/hadoop-hdfs/src/main/resources/META-INF/services/org.apache.hadoop.fs.FileSystem b/hadoop-hdfs-project/hadoop-hdfs/src/main/resources/META-INF/services/org.apache.hadoop.fs.FileSystem
deleted file mode 100644
index 120ff94..0000000
--- a/hadoop-hdfs-project/hadoop-hdfs/src/main/resources/META-INF/services/org.apache.hadoop.fs.FileSystem
+++ /dev/null
@@ -1,16 +0,0 @@
-# Licensed to the Apache Software Foundation (ASF) under one or more
-# contributor license agreements.  See the NOTICE file distributed with
-# this work for additional information regarding copyright ownership.
-# The ASF licenses this file to You under the Apache License, Version 2.0
-# (the "License"); you may not use this file except in compliance with
-# the License.  You may obtain a copy of the License at
-#
-#     http://www.apache.org/licenses/LICENSE-2.0
-#
-# Unless required by applicable law or agreed to in writing, software
-# distributed under the License is distributed on an "AS IS" BASIS,
-# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
-# See the License for the specific language governing permissions and
-# limitations under the License.
-
-org.apache.hadoop.hdfs.DistributedFileSystem
diff --git a/hadoop-hdfs-project/hadoop-hdfs/src/main/resources/hdfs-default.xml b/hadoop-hdfs-project/hadoop-hdfs/src/main/resources/hdfs-default.xml
index 96ffec0..1475ecd 100644
--- a/hadoop-hdfs-project/hadoop-hdfs/src/main/resources/hdfs-default.xml
+++ b/hadoop-hdfs-project/hadoop-hdfs/src/main/resources/hdfs-default.xml
@@ -2428,14 +2428,6 @@
 </property>
 
 <property>
-  <name>dfs.client.htrace.sampler.classes</name>
-  <value></value>
-  <description>
-    The class names of the HTrace Samplers to use for the HDFS client.
-  </description>
-</property>
-
-<property>
   <name>dfs.ha.zkfc.nn.http.timeout.ms</name>
   <value>20000</value>
   <description>
diff --git a/hadoop-hdfs-project/hadoop-hdfs/src/test/java/org/apache/hadoop/hdfs/TestDFSUtil.java b/hadoop-hdfs-project/hadoop-hdfs/src/test/java/org/apache/hadoop/hdfs/TestDFSUtil.java
index d878fc1..19b8c69 100644
--- a/hadoop-hdfs-project/hadoop-hdfs/src/test/java/org/apache/hadoop/hdfs/TestDFSUtil.java
+++ b/hadoop-hdfs-project/hadoop-hdfs/src/test/java/org/apache/hadoop/hdfs/TestDFSUtil.java
@@ -616,78 +616,142 @@
         DFSUtil.substituteForWildcardAddress("127.0.0.1:12345", "foo"));
   }
   
+  /**
+   * Test how name service URIs are handled with a variety of configuration
+   * settings
+   * @throws Exception
+   */
   @Test
   public void testGetNNUris() throws Exception {
     HdfsConfiguration conf = new HdfsConfiguration();
-    
+
     final String NS1_NN1_ADDR   = "ns1-nn1.example.com:8020";
     final String NS1_NN2_ADDR   = "ns1-nn2.example.com:8020";
     final String NS2_NN_ADDR    = "ns2-nn.example.com:8020";
     final String NN1_ADDR       = "nn.example.com:8020";
     final String NN1_SRVC_ADDR  = "nn.example.com:8021";
     final String NN2_ADDR       = "nn2.example.com:8020";
-    
+
+    conf.set(DFS_NAMESERVICES, "ns1");
+    conf.set(DFSUtil.addKeySuffixes(
+        DFS_NAMENODE_SERVICE_RPC_ADDRESS_KEY, "ns1"), NS1_NN1_ADDR);
+
+    conf.set(DFS_NAMENODE_SERVICE_RPC_ADDRESS_KEY, "hdfs://" + NN2_ADDR);
+    conf.set(CommonConfigurationKeys.FS_DEFAULT_NAME_KEY, "hdfs://" + NN1_ADDR);
+
+    Collection<URI> uris = DFSUtil.getNameServiceUris(conf,
+        DFS_NAMENODE_SERVICE_RPC_ADDRESS_KEY,  DFS_NAMENODE_RPC_ADDRESS_KEY);
+
+    assertEquals("Incorrect number of URIs returned", 2, uris.size());
+    assertTrue("Missing URI for name service ns1",
+        uris.contains(new URI("hdfs://" + NS1_NN1_ADDR)));
+    assertTrue("Missing URI for service address",
+        uris.contains(new URI("hdfs://" + NN2_ADDR)));
+
+    conf = new HdfsConfiguration();
     conf.set(DFS_NAMESERVICES, "ns1,ns2");
-    conf.set(DFSUtil.addKeySuffixes(DFS_HA_NAMENODES_KEY_PREFIX, "ns1"),"nn1,nn2");
+    conf.set(DFSUtil.addKeySuffixes(DFS_HA_NAMENODES_KEY_PREFIX, "ns1"),
+        "nn1,nn2");
     conf.set(DFSUtil.addKeySuffixes(
         DFS_NAMENODE_RPC_ADDRESS_KEY, "ns1", "nn1"), NS1_NN1_ADDR);
     conf.set(DFSUtil.addKeySuffixes(
         DFS_NAMENODE_RPC_ADDRESS_KEY, "ns1", "nn2"), NS1_NN2_ADDR);
-    
-    conf.set(DFSUtil.addKeySuffixes(DFS_NAMENODE_SERVICE_RPC_ADDRESS_KEY, "ns2"),
-        NS2_NN_ADDR);
-    
+
+    conf.set(DFSUtil.addKeySuffixes(
+        DFS_NAMENODE_SERVICE_RPC_ADDRESS_KEY, "ns2"), NS2_NN_ADDR);
+
     conf.set(DFS_NAMENODE_RPC_ADDRESS_KEY, "hdfs://" + NN1_ADDR);
-    
+
     conf.set(CommonConfigurationKeys.FS_DEFAULT_NAME_KEY, "hdfs://" + NN2_ADDR);
-    
-    Collection<URI> uris = DFSUtil.getNameServiceUris(conf,
+
+    uris = DFSUtil.getNameServiceUris(conf,
         DFS_NAMENODE_SERVICE_RPC_ADDRESS_KEY,  DFS_NAMENODE_RPC_ADDRESS_KEY);
-    
-    assertEquals(4, uris.size());
-    assertTrue(uris.contains(new URI("hdfs://ns1")));
-    assertTrue(uris.contains(new URI("hdfs://" + NS2_NN_ADDR)));
-    assertTrue(uris.contains(new URI("hdfs://" + NN1_ADDR)));
-    assertTrue(uris.contains(new URI("hdfs://" + NN2_ADDR)));
-    
+
+    assertEquals("Incorrect number of URIs returned", 3, uris.size());
+    assertTrue("Missing URI for name service ns1",
+        uris.contains(new URI("hdfs://ns1")));
+    assertTrue("Missing URI for name service ns2",
+        uris.contains(new URI("hdfs://" + NS2_NN_ADDR)));
+    assertTrue("Missing URI for RPC address",
+        uris.contains(new URI("hdfs://" + NN1_ADDR)));
+
     // Make sure that non-HDFS URIs in fs.defaultFS don't get included.
     conf.set(CommonConfigurationKeys.FS_DEFAULT_NAME_KEY,
         "viewfs://vfs-name.example.com");
-    
-    uris = DFSUtil.getNameServiceUris(conf, DFS_NAMENODE_SERVICE_RPC_ADDRESS_KEY, 
-        DFS_NAMENODE_RPC_ADDRESS_KEY);
-    
-    assertEquals(3, uris.size());
-    assertTrue(uris.contains(new URI("hdfs://ns1")));
-    assertTrue(uris.contains(new URI("hdfs://" + NS2_NN_ADDR)));
-    assertTrue(uris.contains(new URI("hdfs://" + NN1_ADDR)));
-    
+
+    uris = DFSUtil.getNameServiceUris(conf,
+        DFS_NAMENODE_SERVICE_RPC_ADDRESS_KEY, DFS_NAMENODE_RPC_ADDRESS_KEY);
+
+    assertEquals("Incorrect number of URIs returned", 3, uris.size());
+    assertTrue("Missing URI for name service ns1",
+        uris.contains(new URI("hdfs://ns1")));
+    assertTrue("Missing URI for name service ns2",
+        uris.contains(new URI("hdfs://" + NS2_NN_ADDR)));
+    assertTrue("Missing URI for RPC address",
+        uris.contains(new URI("hdfs://" + NN1_ADDR)));
+
     // Make sure that an HA URI being the default URI doesn't result in multiple
     // entries being returned.
     conf.set(CommonConfigurationKeys.FS_DEFAULT_NAME_KEY, "hdfs://ns1");
     
-    uris = DFSUtil.getNameServiceUris(conf, DFS_NAMENODE_SERVICE_RPC_ADDRESS_KEY, 
-        DFS_NAMENODE_RPC_ADDRESS_KEY);
-    
-    assertEquals(3, uris.size());
-    assertTrue(uris.contains(new URI("hdfs://ns1")));
-    assertTrue(uris.contains(new URI("hdfs://" + NS2_NN_ADDR)));
-    assertTrue(uris.contains(new URI("hdfs://" + NN1_ADDR)));
-    
+    uris = DFSUtil.getNameServiceUris(conf,
+        DFS_NAMENODE_SERVICE_RPC_ADDRESS_KEY, DFS_NAMENODE_RPC_ADDRESS_KEY);
+
+    assertEquals("Incorrect number of URIs returned", 3, uris.size());
+    assertTrue("Missing URI for name service ns1",
+        uris.contains(new URI("hdfs://ns1")));
+    assertTrue("Missing URI for name service ns2",
+        uris.contains(new URI("hdfs://" + NS2_NN_ADDR)));
+    assertTrue("Missing URI for RPC address",
+        uris.contains(new URI("hdfs://" + NN1_ADDR)));
+
+    // Check that the default URI is returned if there's nothing else to return.
+    conf = new HdfsConfiguration();
+    conf.set(CommonConfigurationKeys.FS_DEFAULT_NAME_KEY, "hdfs://" + NN1_ADDR);
+
+    uris = DFSUtil.getNameServiceUris(conf,
+        DFS_NAMENODE_SERVICE_RPC_ADDRESS_KEY, DFS_NAMENODE_RPC_ADDRESS_KEY);
+
+    assertEquals("Incorrect number of URIs returned", 1, uris.size());
+    assertTrue("Missing URI for RPC address (defaultFS)",
+        uris.contains(new URI("hdfs://" + NN1_ADDR)));
+
+    // Check that the RPC address is the only address returned when the RPC
+    // and the default FS is given.
+    conf.set(DFS_NAMENODE_RPC_ADDRESS_KEY, NN2_ADDR);
+
+    uris = DFSUtil.getNameServiceUris(conf,
+        DFS_NAMENODE_SERVICE_RPC_ADDRESS_KEY, DFS_NAMENODE_RPC_ADDRESS_KEY);
+
+    assertEquals("Incorrect number of URIs returned", 1, uris.size());
+    assertTrue("Missing URI for RPC address",
+        uris.contains(new URI("hdfs://" + NN2_ADDR)));
+
     // Make sure that when a service RPC address is used that is distinct from
     // the client RPC address, and that client RPC address is also used as the
     // default URI, that the client URI does not end up in the set of URIs
     // returned.
+    conf.set(DFS_NAMENODE_SERVICE_RPC_ADDRESS_KEY, NN1_ADDR);
+
+    uris = DFSUtil.getNameServiceUris(conf,
+        DFS_NAMENODE_SERVICE_RPC_ADDRESS_KEY, DFS_NAMENODE_RPC_ADDRESS_KEY);
+
+    assertEquals("Incorrect number of URIs returned", 1, uris.size());
+    assertTrue("Missing URI for service ns1",
+        uris.contains(new URI("hdfs://" + NN1_ADDR)));
+
+    // Check that when the default FS and service address are given, but
+    // the RPC address isn't, that only the service address is returned.
     conf = new HdfsConfiguration();
     conf.set(CommonConfigurationKeys.FS_DEFAULT_NAME_KEY, "hdfs://" + NN1_ADDR);
-    conf.set(DFS_NAMENODE_RPC_ADDRESS_KEY, NN1_ADDR);
     conf.set(DFS_NAMENODE_SERVICE_RPC_ADDRESS_KEY, NN1_SRVC_ADDR);
     
-    uris = DFSUtil.getNameServiceUris(conf, DFS_NAMENODE_SERVICE_RPC_ADDRESS_KEY, 
-        DFS_NAMENODE_RPC_ADDRESS_KEY);
-    
-    assertEquals(1, uris.size());
-    assertTrue(uris.contains(new URI("hdfs://" + NN1_SRVC_ADDR)));
+    uris = DFSUtil.getNameServiceUris(conf,
+        DFS_NAMENODE_SERVICE_RPC_ADDRESS_KEY, DFS_NAMENODE_RPC_ADDRESS_KEY);
+
+    assertEquals("Incorrect number of URIs returned", 1, uris.size());
+    assertTrue("Missing URI for service address",
+        uris.contains(new URI("hdfs://" + NN1_SRVC_ADDR)));
   }
 
   @Test (timeout=15000)
diff --git a/hadoop-hdfs-project/hadoop-hdfs/src/test/java/org/apache/hadoop/hdfs/server/blockmanagement/TestBlockTokenWithDFS.java b/hadoop-hdfs-project/hadoop-hdfs/src/test/java/org/apache/hadoop/hdfs/server/blockmanagement/TestBlockTokenWithDFS.java
index ce7aee3..8e65ff6 100644
--- a/hadoop-hdfs-project/hadoop-hdfs/src/test/java/org/apache/hadoop/hdfs/server/blockmanagement/TestBlockTokenWithDFS.java
+++ b/hadoop-hdfs-project/hadoop-hdfs/src/test/java/org/apache/hadoop/hdfs/server/blockmanagement/TestBlockTokenWithDFS.java
@@ -32,6 +32,7 @@
 import org.apache.hadoop.fs.FSDataInputStream;
 import org.apache.hadoop.fs.FSDataOutputStream;
 import org.apache.hadoop.fs.FileSystem;
+import org.apache.hadoop.fs.FsTracer;
 import org.apache.hadoop.fs.Path;
 import org.apache.hadoop.hdfs.BlockReader;
 import org.apache.hadoop.hdfs.BlockReaderFactory;
@@ -164,6 +165,7 @@
           setCachingStrategy(CachingStrategy.newDefaultStrategy()).
           setClientCacheContext(ClientContext.getFromConf(conf)).
           setConfiguration(conf).
+          setTracer(FsTracer.get(conf)).
           setRemotePeerFactory(new RemotePeerFactory() {
             @Override
             public Peer newConnectedPeer(InetSocketAddress addr,
diff --git a/hadoop-hdfs-project/hadoop-hdfs/src/test/java/org/apache/hadoop/hdfs/server/datanode/TestDataNodeVolumeFailure.java b/hadoop-hdfs-project/hadoop-hdfs/src/test/java/org/apache/hadoop/hdfs/server/datanode/TestDataNodeVolumeFailure.java
index cb50edc..2c4fcc5 100644
--- a/hadoop-hdfs-project/hadoop-hdfs/src/test/java/org/apache/hadoop/hdfs/server/datanode/TestDataNodeVolumeFailure.java
+++ b/hadoop-hdfs-project/hadoop-hdfs/src/test/java/org/apache/hadoop/hdfs/server/datanode/TestDataNodeVolumeFailure.java
@@ -37,6 +37,7 @@
 import org.apache.hadoop.conf.ReconfigurationException;
 import org.apache.hadoop.fs.FileSystem;
 import org.apache.hadoop.fs.FileUtil;
+import org.apache.hadoop.fs.FsTracer;
 import org.apache.hadoop.fs.Path;
 import org.apache.hadoop.hdfs.BlockReader;
 import org.apache.hadoop.hdfs.BlockReaderFactory;
@@ -515,6 +516,7 @@
       setCachingStrategy(CachingStrategy.newDefaultStrategy()).
       setClientCacheContext(ClientContext.getFromConf(conf)).
       setConfiguration(conf).
+      setTracer(FsTracer.get(conf)).
       setRemotePeerFactory(new RemotePeerFactory() {
         @Override
         public Peer newConnectedPeer(InetSocketAddress addr,
diff --git a/hadoop-hdfs-project/hadoop-hdfs/src/test/java/org/apache/hadoop/tracing/TestTraceAdmin.java b/hadoop-hdfs-project/hadoop-hdfs/src/test/java/org/apache/hadoop/tracing/TestTraceAdmin.java
index b08866b5..7e10d90 100644
--- a/hadoop-hdfs-project/hadoop-hdfs/src/test/java/org/apache/hadoop/tracing/TestTraceAdmin.java
+++ b/hadoop-hdfs-project/hadoop-hdfs/src/test/java/org/apache/hadoop/tracing/TestTraceAdmin.java
@@ -73,10 +73,10 @@
       Assert.assertEquals("ret:0, [no span receivers found]" + NEWLINE,
           runTraceCommand(trace, "-list", "-host", getHostPortForNN(cluster)));
       Assert.assertEquals("ret:0, Added trace span receiver 1 with " +
-          "configuration dfs.htrace.local-file-span-receiver.path = " + tracePath + NEWLINE,
+          "configuration hadoop.htrace.local.file.span.receiver.path = " + tracePath + NEWLINE,
           runTraceCommand(trace, "-add", "-host", getHostPortForNN(cluster),
               "-class", "org.apache.htrace.core.LocalFileSpanReceiver",
-              "-Cdfs.htrace.local-file-span-receiver.path=" + tracePath));
+              "-Chadoop.htrace.local.file.span.receiver.path=" + tracePath));
       String list =
           runTraceCommand(trace, "-list", "-host", getHostPortForNN(cluster));
       Assert.assertTrue(list.startsWith("ret:0"));
@@ -87,10 +87,10 @@
       Assert.assertEquals("ret:0, [no span receivers found]" + NEWLINE,
           runTraceCommand(trace, "-list", "-host", getHostPortForNN(cluster)));
       Assert.assertEquals("ret:0, Added trace span receiver 2 with " +
-          "configuration dfs.htrace.local-file-span-receiver.path = " + tracePath + NEWLINE,
+          "configuration hadoop.htrace.local.file.span.receiver.path = " + tracePath + NEWLINE,
           runTraceCommand(trace, "-add", "-host", getHostPortForNN(cluster),
               "-class", "LocalFileSpanReceiver",
-              "-Cdfs.htrace.local-file-span-receiver.path=" + tracePath));
+              "-Chadoop.htrace.local.file.span.receiver.path=" + tracePath));
       Assert.assertEquals("ret:0, Removed trace span receiver 2" + NEWLINE,
           runTraceCommand(trace, "-remove", "2", "-host",
               getHostPortForNN(cluster)));
diff --git a/hadoop-yarn-project/CHANGES.txt b/hadoop-yarn-project/CHANGES.txt
index e9d04d3..bf3ac12 100644
--- a/hadoop-yarn-project/CHANGES.txt
+++ b/hadoop-yarn-project/CHANGES.txt
@@ -509,6 +509,9 @@
     YARN-3635. Refactored current queue mapping implementation in CapacityScheduler
     to use a generic PlacementManager framework. (Wangda Tan via jianhe)
 
+    YARN-4066. Large number of queues choke fair scheduler.
+    (Johan Gustavsson via kasha)
+
   BUG FIXES
 
     YARN-3197. Confusing log generated by CapacityScheduler. (Varun Saxena 
@@ -1002,6 +1005,9 @@
     YARN-4180. AMLauncher does not retry on failures when talking to NM.
     (adhoot)
 
+    YARN-3727. For better error recovery, check if the directory exists before
+    using it for localization. (Zhihai Xu via jlowe)
+
 Release 2.7.1 - 2015-07-06
 
   INCOMPATIBLE CHANGES
diff --git a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-nodemanager/src/main/java/org/apache/hadoop/yarn/server/nodemanager/containermanager/localizer/LocalResourcesTracker.java b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-nodemanager/src/main/java/org/apache/hadoop/yarn/server/nodemanager/containermanager/localizer/LocalResourcesTracker.java
index 14ec911..56e3de5 100644
--- a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-nodemanager/src/main/java/org/apache/hadoop/yarn/server/nodemanager/containermanager/localizer/LocalResourcesTracker.java
+++ b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-nodemanager/src/main/java/org/apache/hadoop/yarn/server/nodemanager/containermanager/localizer/LocalResourcesTracker.java
@@ -33,7 +33,8 @@
 
   boolean remove(LocalizedResource req, DeletionService delService);
 
-  Path getPathForLocalization(LocalResourceRequest req, Path localDirPath);
+  Path getPathForLocalization(LocalResourceRequest req, Path localDirPath,
+      DeletionService delService);
 
   String getUser();
 
diff --git a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-nodemanager/src/main/java/org/apache/hadoop/yarn/server/nodemanager/containermanager/localizer/LocalResourcesTrackerImpl.java b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-nodemanager/src/main/java/org/apache/hadoop/yarn/server/nodemanager/containermanager/localizer/LocalResourcesTrackerImpl.java
index a1e6817..51dbcaa 100644
--- a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-nodemanager/src/main/java/org/apache/hadoop/yarn/server/nodemanager/containermanager/localizer/LocalResourcesTrackerImpl.java
+++ b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-nodemanager/src/main/java/org/apache/hadoop/yarn/server/nodemanager/containermanager/localizer/LocalResourcesTrackerImpl.java
@@ -440,10 +440,12 @@
    * @param {@link LocalResourceRequest} Resource localization request to
    *        localize the resource.
    * @param {@link Path} local directory path
+   * @param {@link DeletionService} Deletion Service to delete existing
+   *        path for localization.
    */
   @Override
-  public Path
-      getPathForLocalization(LocalResourceRequest req, Path localDirPath) {
+  public Path getPathForLocalization(LocalResourceRequest req,
+      Path localDirPath, DeletionService delService) {
     Path rPath = localDirPath;
     if (useLocalCacheDirectoryManager && localDirPath != null) {
 
@@ -463,8 +465,22 @@
       inProgressLocalResourcesMap.put(req, rPath);
     }
 
-    rPath = new Path(rPath,
-        Long.toString(uniqueNumberGenerator.incrementAndGet()));
+    while (true) {
+      Path uniquePath = new Path(rPath,
+          Long.toString(uniqueNumberGenerator.incrementAndGet()));
+      File file = new File(uniquePath.toUri().getRawPath());
+      if (!file.exists()) {
+        rPath = uniquePath;
+        break;
+      }
+      // If the directory already exists, delete it and move to next one.
+      LOG.warn("Directory " + uniquePath + " already exists, " +
+          "try next one.");
+      if (delService != null) {
+        delService.delete(getUser(), uniquePath);
+      }
+    }
+
     Path localPath = new Path(rPath, req.getPath().getName());
     LocalizedResource rsrc = localrsrc.get(req);
     rsrc.setLocalPath(localPath);
diff --git a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-nodemanager/src/main/java/org/apache/hadoop/yarn/server/nodemanager/containermanager/localizer/ResourceLocalizationService.java b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-nodemanager/src/main/java/org/apache/hadoop/yarn/server/nodemanager/containermanager/localizer/ResourceLocalizationService.java
index e239e34..2cc5683 100644
--- a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-nodemanager/src/main/java/org/apache/hadoop/yarn/server/nodemanager/containermanager/localizer/ResourceLocalizationService.java
+++ b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-nodemanager/src/main/java/org/apache/hadoop/yarn/server/nodemanager/containermanager/localizer/ResourceLocalizationService.java
@@ -830,7 +830,8 @@
                     + ContainerLocalizer.FILECACHE,
                   ContainerLocalizer.getEstimatedSize(resource), true);
             Path publicDirDestPath =
-                publicRsrc.getPathForLocalization(key, publicRootPath);
+                publicRsrc.getPathForLocalization(key, publicRootPath,
+                    delService);
             if (!publicDirDestPath.getParent().equals(publicRootPath)) {
               DiskChecker.checkDir(new File(publicDirDestPath.toUri().getPath()));
             }
@@ -1116,7 +1117,7 @@
           dirsHandler.getLocalPathForWrite(cacheDirectory,
             ContainerLocalizer.getEstimatedSize(rsrc), false);
       return tracker.getPathForLocalization(new LocalResourceRequest(rsrc),
-          dirPath);
+          dirPath, delService);
     }
 
     @Override
diff --git a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-nodemanager/src/test/java/org/apache/hadoop/yarn/server/nodemanager/containermanager/localizer/TestLocalResourcesTrackerImpl.java b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-nodemanager/src/test/java/org/apache/hadoop/yarn/server/nodemanager/containermanager/localizer/TestLocalResourcesTrackerImpl.java
index 350cecb..e6aeae0 100644
--- a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-nodemanager/src/test/java/org/apache/hadoop/yarn/server/nodemanager/containermanager/localizer/TestLocalResourcesTrackerImpl.java
+++ b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-nodemanager/src/test/java/org/apache/hadoop/yarn/server/nodemanager/containermanager/localizer/TestLocalResourcesTrackerImpl.java
@@ -35,6 +35,7 @@
 import java.util.concurrent.ConcurrentMap;
 
 import org.apache.hadoop.conf.Configuration;
+import org.apache.hadoop.fs.FileContext;
 import org.apache.hadoop.fs.Path;
 import org.apache.hadoop.yarn.api.records.ApplicationId;
 import org.apache.hadoop.yarn.api.records.ContainerId;
@@ -427,7 +428,7 @@
       // Simulate the process of localization of lr1
       // NOTE: Localization path from tracker has resource ID at end
       Path hierarchicalPath1 =
-          tracker.getPathForLocalization(lr1, localDir).getParent();
+          tracker.getPathForLocalization(lr1, localDir, null).getParent();
       // Simulate lr1 getting localized
       ResourceLocalizedEvent rle1 =
           new ResourceLocalizedEvent(lr1,
@@ -444,7 +445,7 @@
       tracker.handle(reqEvent2);
 
       Path hierarchicalPath2 =
-          tracker.getPathForLocalization(lr2, localDir).getParent();
+          tracker.getPathForLocalization(lr2, localDir, null).getParent();
       // localization failed.
       ResourceFailedLocalizationEvent rfe2 =
           new ResourceFailedLocalizationEvent(
@@ -463,7 +464,7 @@
           LocalResourceVisibility.PUBLIC, lc1);
       tracker.handle(reqEvent3);
       Path hierarchicalPath3 =
-          tracker.getPathForLocalization(lr3, localDir).getParent();
+          tracker.getPathForLocalization(lr3, localDir, null).getParent();
       // localization successful
       ResourceLocalizedEvent rle3 =
           new ResourceLocalizedEvent(lr3, new Path(hierarchicalPath3.toUri()
@@ -542,7 +543,8 @@
       dispatcher.await();
 
       // Simulate the process of localization of lr1
-      Path hierarchicalPath1 = tracker.getPathForLocalization(lr1, localDir);
+      Path hierarchicalPath1 = tracker.getPathForLocalization(lr1, localDir,
+          null);
 
       ArgumentCaptor<LocalResourceProto> localResourceCaptor =
           ArgumentCaptor.forClass(LocalResourceProto.class);
@@ -622,7 +624,8 @@
       dispatcher.await();
 
       // Simulate the process of localization of lr1
-      Path hierarchicalPath1 = tracker.getPathForLocalization(lr1, localDir);
+      Path hierarchicalPath1 = tracker.getPathForLocalization(lr1, localDir,
+          null);
 
       ArgumentCaptor<LocalResourceProto> localResourceCaptor =
           ArgumentCaptor.forClass(LocalResourceProto.class);
@@ -691,7 +694,8 @@
           LocalResourceVisibility.APPLICATION, lc2);
       tracker.handle(reqEvent2);
       dispatcher.await();
-      Path hierarchicalPath2 = tracker.getPathForLocalization(lr2, localDir);
+      Path hierarchicalPath2 = tracker.getPathForLocalization(lr2, localDir,
+          null);
       long localizedId2 = Long.parseLong(hierarchicalPath2.getName());
       Assert.assertEquals(localizedId1 + 1, localizedId2);
     } finally {
@@ -785,6 +789,49 @@
     }
   }
 
+  @Test
+  @SuppressWarnings("unchecked")
+  public void testGetPathForLocalization() throws Exception {
+    FileContext lfs = FileContext.getLocalFSFileContext();
+    Path base_path = new Path("target",
+        TestLocalResourcesTrackerImpl.class.getSimpleName());
+    final String user = "someuser";
+    final ApplicationId appId = ApplicationId.newInstance(1, 1);
+    Configuration conf = new YarnConfiguration();
+    DrainDispatcher dispatcher = null;
+    dispatcher = createDispatcher(conf);
+    EventHandler<LocalizerEvent> localizerEventHandler =
+        mock(EventHandler.class);
+    EventHandler<LocalizerEvent> containerEventHandler =
+        mock(EventHandler.class);
+    dispatcher.register(LocalizerEventType.class, localizerEventHandler);
+    dispatcher.register(ContainerEventType.class, containerEventHandler);
+    NMStateStoreService stateStore = mock(NMStateStoreService.class);
+    DeletionService delService = mock(DeletionService.class);
+    try {
+      LocalResourceRequest req1 = createLocalResourceRequest(user, 1, 1,
+          LocalResourceVisibility.PUBLIC);
+      LocalizedResource lr1 = createLocalizedResource(req1, dispatcher);
+      ConcurrentMap<LocalResourceRequest, LocalizedResource> localrsrc =
+          new ConcurrentHashMap<LocalResourceRequest, LocalizedResource>();
+      localrsrc.put(req1, lr1);
+      LocalResourcesTrackerImpl tracker = new LocalResourcesTrackerImpl(user,
+          appId, dispatcher, localrsrc, true, conf, stateStore, null);
+      Path conflictPath = new Path(base_path, "10");
+      Path qualifiedConflictPath = lfs.makeQualified(conflictPath);
+      lfs.mkdir(qualifiedConflictPath, null, true);
+      Path rPath = tracker.getPathForLocalization(req1, base_path,
+          delService);
+      Assert.assertFalse(lfs.util().exists(rPath));
+      verify(delService, times(1)).delete(eq(user), eq(conflictPath));
+    } finally {
+      lfs.delete(base_path, true);
+      if (dispatcher != null) {
+        dispatcher.stop();
+      }
+    }
+  }
+
   @SuppressWarnings("unchecked")
   @Test
   public void testResourcePresentInGoodDir() throws IOException {
@@ -832,8 +879,10 @@
       tracker.handle(req21Event);
       dispatcher.await();
       // Localize resource1
-      Path p1 = tracker.getPathForLocalization(req1, new Path("/tmp/somedir1"));
-      Path p2 = tracker.getPathForLocalization(req2, new Path("/tmp/somedir2"));
+      Path p1 = tracker.getPathForLocalization(req1,
+          new Path("/tmp/somedir1"), null);
+      Path p2 = tracker.getPathForLocalization(req2,
+          new Path("/tmp/somedir2"), null);
       ResourceLocalizedEvent rle1 = new ResourceLocalizedEvent(req1, p1, 1);
       tracker.handle(rle1);
       ResourceLocalizedEvent rle2 = new ResourceLocalizedEvent(req2, p2, 1);
diff --git a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-nodemanager/src/test/java/org/apache/hadoop/yarn/server/nodemanager/containermanager/localizer/TestResourceLocalizationService.java b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-nodemanager/src/test/java/org/apache/hadoop/yarn/server/nodemanager/containermanager/localizer/TestResourceLocalizationService.java
index c515506..e7a9db8 100644
--- a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-nodemanager/src/test/java/org/apache/hadoop/yarn/server/nodemanager/containermanager/localizer/TestResourceLocalizationService.java
+++ b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-nodemanager/src/test/java/org/apache/hadoop/yarn/server/nodemanager/containermanager/localizer/TestResourceLocalizationService.java
@@ -624,29 +624,31 @@
       // Simulate start of localization for all resources
       privTracker1.getPathForLocalization(privReq1,
           dirsHandler.getLocalPathForWrite(
-              ContainerLocalizer.USERCACHE + user1));
+              ContainerLocalizer.USERCACHE + user1), null);
       privTracker1.getPathForLocalization(privReq2,
           dirsHandler.getLocalPathForWrite(
-              ContainerLocalizer.USERCACHE + user1));
+              ContainerLocalizer.USERCACHE + user1), null);
       LocalizedResource privLr1 = privTracker1.getLocalizedResource(privReq1);
       LocalizedResource privLr2 = privTracker1.getLocalizedResource(privReq2);
       appTracker1.getPathForLocalization(appReq1,
           dirsHandler.getLocalPathForWrite(
-              ContainerLocalizer.APPCACHE + appId1));
+              ContainerLocalizer.APPCACHE + appId1), null);
       LocalizedResource appLr1 = appTracker1.getLocalizedResource(appReq1);
       appTracker2.getPathForLocalization(appReq2,
           dirsHandler.getLocalPathForWrite(
-              ContainerLocalizer.APPCACHE + appId2));
+              ContainerLocalizer.APPCACHE + appId2), null);
       LocalizedResource appLr2 = appTracker2.getLocalizedResource(appReq2);
       appTracker2.getPathForLocalization(appReq3,
           dirsHandler.getLocalPathForWrite(
-              ContainerLocalizer.APPCACHE + appId2));
+              ContainerLocalizer.APPCACHE + appId2), null);
       LocalizedResource appLr3 = appTracker2.getLocalizedResource(appReq3);
       pubTracker.getPathForLocalization(pubReq1,
-          dirsHandler.getLocalPathForWrite(ContainerLocalizer.FILECACHE));
+          dirsHandler.getLocalPathForWrite(ContainerLocalizer.FILECACHE),
+          null);
       LocalizedResource pubLr1 = pubTracker.getLocalizedResource(pubReq1);
       pubTracker.getPathForLocalization(pubReq2,
-          dirsHandler.getLocalPathForWrite(ContainerLocalizer.FILECACHE));
+          dirsHandler.getLocalPathForWrite(ContainerLocalizer.FILECACHE),
+          null);
       LocalizedResource pubLr2 = pubTracker.getLocalizedResource(pubReq2);
 
       // Simulate completion of localization for most resources with
diff --git a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/scheduler/fair/QueueManager.java b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/scheduler/fair/QueueManager.java
index 0092845..51a298b 100644
--- a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/scheduler/fair/QueueManager.java
+++ b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/scheduler/fair/QueueManager.java
@@ -87,7 +87,19 @@
    * could be referred to as just "parent1.queue2".
    */
   public FSLeafQueue getLeafQueue(String name, boolean create) {
-    FSQueue queue = getQueue(name, create, FSQueueType.LEAF);
+    return getLeafQueue(name, create, true);
+  }
+
+  public FSLeafQueue getLeafQueue(
+      String name,
+      boolean create,
+      boolean recomputeSteadyShares) {
+    FSQueue queue = getQueue(
+        name,
+        create,
+        FSQueueType.LEAF,
+        recomputeSteadyShares
+    );
     if (queue instanceof FSParentQueue) {
       return null;
     }
@@ -117,28 +129,46 @@
    * could be referred to as just "parent1.queue2".
    */
   public FSParentQueue getParentQueue(String name, boolean create) {
-    FSQueue queue = getQueue(name, create, FSQueueType.PARENT);
+    return getParentQueue(name, create, true);
+  }
+
+  public FSParentQueue getParentQueue(
+      String name,
+      boolean create,
+      boolean recomputeSteadyShares) {
+    FSQueue queue = getQueue(
+        name,
+        create,
+        FSQueueType.PARENT,
+        recomputeSteadyShares
+    );
     if (queue instanceof FSLeafQueue) {
       return null;
     }
     return (FSParentQueue) queue;
   }
-  
-  private FSQueue getQueue(String name, boolean create, FSQueueType queueType) {
+
+  private FSQueue getQueue(
+      String name,
+      boolean create,
+      FSQueueType queueType,
+      boolean recomputeSteadyShares) {
+    boolean recompute = recomputeSteadyShares;
     name = ensureRootPrefix(name);
+    FSQueue queue;
     synchronized (queues) {
-      FSQueue queue = queues.get(name);
+      queue = queues.get(name);
       if (queue == null && create) {
         // if the queue doesn't exist,create it and return
         queue = createQueue(name, queueType);
-
-        // Update steady fair share for all queues
-        if (queue != null) {
-          rootQueue.recomputeSteadyShares();
-        }
+      } else {
+        recompute = false;
       }
-      return queue;
     }
+    if (recompute) {
+      rootQueue.recomputeSteadyShares();
+    }
+    return queue;
   }
   
   /**
@@ -376,21 +406,25 @@
   
   public void updateAllocationConfiguration(AllocationConfiguration queueConf) {
     // Create leaf queues and the parent queues in a leaf's ancestry if they do not exist
-    for (String name : queueConf.getConfiguredQueues().get(FSQueueType.LEAF)) {
-      if (removeEmptyIncompatibleQueues(name, FSQueueType.LEAF)) {
-        getLeafQueue(name, true);
+    synchronized (queues) {
+      for (String name : queueConf.getConfiguredQueues().get(
+              FSQueueType.LEAF)) {
+        if (removeEmptyIncompatibleQueues(name, FSQueueType.LEAF)) {
+          getLeafQueue(name, true, false);
+        }
+      }
+      // At this point all leaves and 'parents with
+      // at least one child' would have been created.
+      // Now create parents with no configured leaf.
+      for (String name : queueConf.getConfiguredQueues().get(
+          FSQueueType.PARENT)) {
+        if (removeEmptyIncompatibleQueues(name, FSQueueType.PARENT)) {
+          getParentQueue(name, true, false);
+        }
       }
     }
+    rootQueue.recomputeSteadyShares();
 
-    // At this point all leaves and 'parents with at least one child' would have been created.
-    // Now create parents with no configured leaf.
-    for (String name : queueConf.getConfiguredQueues().get(
-        FSQueueType.PARENT)) {
-      if (removeEmptyIncompatibleQueues(name, FSQueueType.PARENT)) {
-        getParentQueue(name, true);
-      }
-    }
-    
     for (FSQueue queue : queues.values()) {
       // Update queue metrics
       FSQueueMetrics queueMetrics = queue.getMetrics();