HADOOP-11932. MetricsSinkAdapter may hang when being stopped. Contributed by Brahma Reddy Battula
(cherry picked from commit f59612edd74d1bef2b60870c24c1f67c56b2b3cb)
diff --git a/hadoop-common-project/hadoop-common/CHANGES.txt b/hadoop-common-project/hadoop-common/CHANGES.txt
index 3cbc5d5..bfad714 100644
--- a/hadoop-common-project/hadoop-common/CHANGES.txt
+++ b/hadoop-common-project/hadoop-common/CHANGES.txt
@@ -581,6 +581,9 @@
HADOOP-12304. Applications using FileContext fail with the default file
system configured to be wasb/s3/etc. (cnauroth)
+ HADOOP-11932. MetricsSinkAdapter may hang when being stopped.
+ (Brahma Reddy Battula via jianhe)
+
Release 2.7.1 - 2015-07-06
INCOMPATIBLE CHANGES
diff --git a/hadoop-common-project/hadoop-common/src/main/java/org/apache/hadoop/metrics2/impl/MetricsSinkAdapter.java b/hadoop-common-project/hadoop-common/src/main/java/org/apache/hadoop/metrics2/impl/MetricsSinkAdapter.java
index 478c316..62498ea 100644
--- a/hadoop-common-project/hadoop-common/src/main/java/org/apache/hadoop/metrics2/impl/MetricsSinkAdapter.java
+++ b/hadoop-common-project/hadoop-common/src/main/java/org/apache/hadoop/metrics2/impl/MetricsSinkAdapter.java
@@ -209,15 +209,15 @@
void stop() {
stopping = true;
sinkThread.interrupt();
+ if (sink instanceof Closeable) {
+ IOUtils.cleanup(LOG, (Closeable)sink);
+ }
try {
sinkThread.join();
}
catch (InterruptedException e) {
LOG.warn("Stop interrupted", e);
}
- if (sink instanceof Closeable) {
- IOUtils.cleanup(LOG, (Closeable)sink);
- }
}
String name() {
diff --git a/hadoop-common-project/hadoop-common/src/test/java/org/apache/hadoop/metrics2/impl/TestMetricsSystemImpl.java b/hadoop-common-project/hadoop-common/src/test/java/org/apache/hadoop/metrics2/impl/TestMetricsSystemImpl.java
index 0f7b15f..6238d79 100644
--- a/hadoop-common-project/hadoop-common/src/test/java/org/apache/hadoop/metrics2/impl/TestMetricsSystemImpl.java
+++ b/hadoop-common-project/hadoop-common/src/test/java/org/apache/hadoop/metrics2/impl/TestMetricsSystemImpl.java
@@ -18,6 +18,8 @@
package org.apache.hadoop.metrics2.impl;
+import java.io.Closeable;
+import java.io.IOException;
import java.util.*;
import java.util.concurrent.*;
import java.util.concurrent.atomic.*;
@@ -484,6 +486,64 @@
}
}
+ /**
+ * Class to verify HADOOP-11932. Instead of reading from HTTP, going in loop
+ * until closed.
+ */
+ private static class TestClosableSink implements MetricsSink, Closeable {
+
+ boolean closed = false;
+ CountDownLatch collectingLatch;
+
+ public TestClosableSink(CountDownLatch collectingLatch) {
+ this.collectingLatch = collectingLatch;
+ }
+
+ @Override
+ public void init(SubsetConfiguration conf) {
+ }
+
+ @Override
+ public void close() throws IOException {
+ closed = true;
+ }
+
+ @Override
+ public void putMetrics(MetricsRecord record) {
+ while (!closed) {
+ collectingLatch.countDown();
+ }
+ }
+
+ @Override
+ public void flush() {
+ }
+ }
+
+ /**
+ * HADOOP-11932
+ */
+ @Test(timeout = 5000)
+ public void testHangOnSinkRead() throws Exception {
+ new ConfigBuilder().add("*.period", 8)
+ .add("test.sink.test.class", TestSink.class.getName())
+ .save(TestMetricsConfig.getTestFilename("hadoop-metrics2-test"));
+ MetricsSystemImpl ms = new MetricsSystemImpl("Test");
+ ms.start();
+ try {
+ CountDownLatch collectingLatch = new CountDownLatch(1);
+ MetricsSink sink = new TestClosableSink(collectingLatch);
+ ms.registerSink("closeableSink",
+ "The sink will be used to test closeability", sink);
+ // trigger metric collection first time
+ ms.onTimerEvent();
+ // Make sure that sink is collecting metrics
+ assertTrue(collectingLatch.await(1, TimeUnit.SECONDS));
+ } finally {
+ ms.stop();
+ }
+ }
+
@Metrics(context="test")
private static class TestSource {
@Metric("C1 desc") MutableCounterLong c1;