HADOOP-11932. MetricsSinkAdapter may hang  when being stopped. Contributed by Brahma Reddy Battula
(cherry picked from commit f59612edd74d1bef2b60870c24c1f67c56b2b3cb)
diff --git a/hadoop-common-project/hadoop-common/CHANGES.txt b/hadoop-common-project/hadoop-common/CHANGES.txt
index 3cbc5d5..bfad714 100644
--- a/hadoop-common-project/hadoop-common/CHANGES.txt
+++ b/hadoop-common-project/hadoop-common/CHANGES.txt
@@ -581,6 +581,9 @@
     HADOOP-12304. Applications using FileContext fail with the default file
     system configured to be wasb/s3/etc. (cnauroth)
 
+    HADOOP-11932. MetricsSinkAdapter may hang  when being stopped.
+    (Brahma Reddy Battula via jianhe)
+
 Release 2.7.1 - 2015-07-06 
 
   INCOMPATIBLE CHANGES
diff --git a/hadoop-common-project/hadoop-common/src/main/java/org/apache/hadoop/metrics2/impl/MetricsSinkAdapter.java b/hadoop-common-project/hadoop-common/src/main/java/org/apache/hadoop/metrics2/impl/MetricsSinkAdapter.java
index 478c316..62498ea 100644
--- a/hadoop-common-project/hadoop-common/src/main/java/org/apache/hadoop/metrics2/impl/MetricsSinkAdapter.java
+++ b/hadoop-common-project/hadoop-common/src/main/java/org/apache/hadoop/metrics2/impl/MetricsSinkAdapter.java
@@ -209,15 +209,15 @@
   void stop() {
     stopping = true;
     sinkThread.interrupt();
+    if (sink instanceof Closeable) {
+      IOUtils.cleanup(LOG, (Closeable)sink);
+    }
     try {
       sinkThread.join();
     }
     catch (InterruptedException e) {
       LOG.warn("Stop interrupted", e);
     }
-    if (sink instanceof Closeable) {
-      IOUtils.cleanup(LOG, (Closeable)sink);
-    }
   }
 
   String name() {
diff --git a/hadoop-common-project/hadoop-common/src/test/java/org/apache/hadoop/metrics2/impl/TestMetricsSystemImpl.java b/hadoop-common-project/hadoop-common/src/test/java/org/apache/hadoop/metrics2/impl/TestMetricsSystemImpl.java
index 0f7b15f..6238d79 100644
--- a/hadoop-common-project/hadoop-common/src/test/java/org/apache/hadoop/metrics2/impl/TestMetricsSystemImpl.java
+++ b/hadoop-common-project/hadoop-common/src/test/java/org/apache/hadoop/metrics2/impl/TestMetricsSystemImpl.java
@@ -18,6 +18,8 @@
 
 package org.apache.hadoop.metrics2.impl;
 
+import java.io.Closeable;
+import java.io.IOException;
 import java.util.*;
 import java.util.concurrent.*;
 import java.util.concurrent.atomic.*;
@@ -484,6 +486,64 @@
     }
   }
 
+  /**
+   * Class to verify HADOOP-11932. Instead of reading from HTTP, going in loop
+   * until closed.
+   */
+  private static class TestClosableSink implements MetricsSink, Closeable {
+
+    boolean closed = false;
+    CountDownLatch collectingLatch;
+
+    public TestClosableSink(CountDownLatch collectingLatch) {
+      this.collectingLatch = collectingLatch;
+    }
+
+    @Override
+    public void init(SubsetConfiguration conf) {
+    }
+
+    @Override
+    public void close() throws IOException {
+      closed = true;
+    }
+
+    @Override
+    public void putMetrics(MetricsRecord record) {
+      while (!closed) {
+        collectingLatch.countDown();
+      }
+    }
+
+    @Override
+    public void flush() {
+    }
+  }
+
+  /**
+   * HADOOP-11932
+   */
+  @Test(timeout = 5000)
+  public void testHangOnSinkRead() throws Exception {
+    new ConfigBuilder().add("*.period", 8)
+        .add("test.sink.test.class", TestSink.class.getName())
+        .save(TestMetricsConfig.getTestFilename("hadoop-metrics2-test"));
+    MetricsSystemImpl ms = new MetricsSystemImpl("Test");
+    ms.start();
+    try {
+      CountDownLatch collectingLatch = new CountDownLatch(1);
+      MetricsSink sink = new TestClosableSink(collectingLatch);
+      ms.registerSink("closeableSink",
+          "The sink will be used to test closeability", sink);
+      // trigger metric collection first time
+      ms.onTimerEvent();
+      // Make sure that sink is collecting metrics
+      assertTrue(collectingLatch.await(1, TimeUnit.SECONDS));
+    } finally {
+      ms.stop();
+    }
+  }
+
   @Metrics(context="test")
   private static class TestSource {
     @Metric("C1 desc") MutableCounterLong c1;