HDFS-14575. LeaseRenewer#daemon threads leak in DFSClient. Contributed by Renukaprasad C.
Co-authored-by: Tao Yang <taoyang1@apache.org>
Reviewed-by: He Xiaoqiao <hexiaoqiao@apache.org>
Reviewed-by: Wei-Chiu Chuang <weichiu@apache.org>
diff --git a/hadoop-hdfs-project/hadoop-hdfs-client/src/main/java/org/apache/hadoop/hdfs/DFSClient.java b/hadoop-hdfs-project/hadoop-hdfs-client/src/main/java/org/apache/hadoop/hdfs/DFSClient.java
index 3fa4dd0..56adc5c 100755
--- a/hadoop-hdfs-project/hadoop-hdfs-client/src/main/java/org/apache/hadoop/hdfs/DFSClient.java
+++ b/hadoop-hdfs-project/hadoop-hdfs-client/src/main/java/org/apache/hadoop/hdfs/DFSClient.java
@@ -505,7 +505,15 @@
throws IOException {
synchronized (filesBeingWritten) {
putFileBeingWritten(inodeId, out);
- getLeaseRenewer().put(this);
+ LeaseRenewer renewer = getLeaseRenewer();
+ boolean result = renewer.put(this);
+ if (!result) {
+ // Existing LeaseRenewer cannot add another Daemon, so remove existing
+ // and add new one.
+ LeaseRenewer.remove(renewer);
+ renewer = getLeaseRenewer();
+ renewer.put(this);
+ }
}
}
diff --git a/hadoop-hdfs-project/hadoop-hdfs-client/src/main/java/org/apache/hadoop/hdfs/client/impl/LeaseRenewer.java b/hadoop-hdfs-project/hadoop-hdfs-client/src/main/java/org/apache/hadoop/hdfs/client/impl/LeaseRenewer.java
index d108af9..6b4c899 100644
--- a/hadoop-hdfs-project/hadoop-hdfs-client/src/main/java/org/apache/hadoop/hdfs/client/impl/LeaseRenewer.java
+++ b/hadoop-hdfs-project/hadoop-hdfs-client/src/main/java/org/apache/hadoop/hdfs/client/impl/LeaseRenewer.java
@@ -26,6 +26,7 @@
import java.util.Iterator;
import java.util.List;
import java.util.Map;
+import java.util.concurrent.atomic.AtomicBoolean;
import org.apache.hadoop.HadoopIllegalArgumentException;
import org.apache.hadoop.classification.InterfaceAudience;
@@ -79,6 +80,8 @@
private static long leaseRenewerGraceDefault = 60*1000L;
static final long LEASE_RENEWER_SLEEP_DEFAULT = 1000L;
+ private AtomicBoolean isLSRunning = new AtomicBoolean(false);
+
/** Get a {@link LeaseRenewer} instance */
public static LeaseRenewer getInstance(final String authority,
final UserGroupInformation ugi, final DFSClient dfsc) {
@@ -88,6 +91,15 @@
}
/**
+ * Remove the given renewer from the Factory.
+ * Subsequent call will receive new {@link LeaseRenewer} instance.
+ * @param renewer Instance to be cleared from Factory
+ */
+ public static void remove(LeaseRenewer renewer) {
+ Factory.INSTANCE.remove(renewer);
+ }
+
+ /**
* A factory for sharing {@link LeaseRenewer} objects
* among {@link DFSClient} instances
* so that there is only one renewer per authority per user.
@@ -156,6 +168,9 @@
final LeaseRenewer stored = renewers.get(r.factorykey);
//Since a renewer may expire, the stored renewer can be different.
if (r == stored) {
+ // Expire LeaseRenewer daemon thread as soon as possible.
+ r.clearClients();
+ r.setEmptyTime(0);
renewers.remove(r.factorykey);
}
}
@@ -241,6 +256,10 @@
}
}
+ private synchronized void clearClients() {
+ dfsclients.clear();
+ }
+
private synchronized boolean clientsRunning() {
for(Iterator<DFSClient> i = dfsclients.iterator(); i.hasNext(); ) {
if (!i.next().isClientRunning()) {
@@ -292,11 +311,18 @@
&& Time.monotonicNow() - emptyTime > gracePeriod;
}
- public synchronized void put(final DFSClient dfsc) {
+ public synchronized boolean put(final DFSClient dfsc) {
if (dfsc.isClientRunning()) {
if (!isRunning() || isRenewerExpired()) {
- //start a new deamon with a new id.
+ // Start a new daemon with a new id.
final int id = ++currentId;
+ if (isLSRunning.get()) {
+ // Not allowed to add multiple daemons into LeaseRenewer, let client
+ // create new LR and continue to acquire lease.
+ return false;
+ }
+ isLSRunning.getAndSet(true);
+
daemon = new Daemon(new Runnable() {
@Override
public void run() {
@@ -328,6 +354,7 @@
}
emptyTime = Long.MAX_VALUE;
}
+ return true;
}
@VisibleForTesting
@@ -426,9 +453,6 @@
synchronized (this) {
DFSClientFaultInjector.get().delayWhenRenewLeaseTimeout();
dfsclientsCopy = new ArrayList<>(dfsclients);
- dfsclients.clear();
- //Expire the current LeaseRenewer thread.
- emptyTime = 0;
Factory.INSTANCE.remove(LeaseRenewer.this);
}
for (DFSClient dfsClient : dfsclientsCopy) {
diff --git a/hadoop-hdfs-project/hadoop-hdfs-client/src/test/java/org/apache/hadoop/hdfs/client/impl/TestLeaseRenewer.java b/hadoop-hdfs-project/hadoop-hdfs-client/src/test/java/org/apache/hadoop/hdfs/client/impl/TestLeaseRenewer.java
index 1ffec85e..f1a11ed 100644
--- a/hadoop-hdfs-project/hadoop-hdfs-client/src/test/java/org/apache/hadoop/hdfs/client/impl/TestLeaseRenewer.java
+++ b/hadoop-hdfs-project/hadoop-hdfs-client/src/test/java/org/apache/hadoop/hdfs/client/impl/TestLeaseRenewer.java
@@ -31,7 +31,11 @@
import org.mockito.stubbing.Answer;
import java.io.IOException;
+import java.lang.management.ManagementFactory;
+import java.lang.management.ThreadInfo;
+import java.lang.management.ThreadMXBean;
import java.util.concurrent.atomic.AtomicInteger;
+import java.util.regex.Pattern;
import static org.junit.Assert.assertSame;
@@ -168,6 +172,11 @@
renewer.closeClient(mockClient1);
renewer.closeClient(mockClient2);
+ renewer.closeClient(MOCK_DFSCLIENT);
+
+ // Make sure renewer is not running due to expiration.
+ Thread.sleep(FAST_GRACE_PERIOD * 2);
+ Assert.assertTrue(!renewer.isRunning());
}
@Test
@@ -197,4 +206,82 @@
Assert.assertFalse(renewer.isRunning());
}
+ /**
+ * Test for HDFS-14575. In this fix, the LeaseRenewer clears all clients
+ * and expires immediately via setting empty time to 0 before it's removed
+ * from factory. Previously, LeaseRenewer#daemon thread might leak.
+ */
+ @Test
+ public void testDaemonThreadLeak() throws Exception {
+ Assert.assertFalse("Renewer not initially running", renewer.isRunning());
+
+ // Pretend to create a file#1, daemon#1 starts
+ renewer.put(MOCK_DFSCLIENT);
+ Assert.assertTrue("Renewer should have started running",
+ renewer.isRunning());
+ Pattern daemonThreadNamePattern = Pattern.compile("LeaseRenewer:\\S+");
+ Assert.assertEquals(1, countThreadMatching(daemonThreadNamePattern));
+
+ // Pretend to create file#2, daemon#2 starts due to expiration
+ LeaseRenewer lastRenewer = renewer;
+ renewer =
+ LeaseRenewer.getInstance(FAKE_AUTHORITY, FAKE_UGI_A, MOCK_DFSCLIENT);
+ Assert.assertEquals(lastRenewer, renewer);
+
+ // Pretend to close file#1
+ renewer.closeClient(MOCK_DFSCLIENT);
+ Assert.assertEquals(1, countThreadMatching(daemonThreadNamePattern));
+
+ // Pretend to be expired
+ renewer.setEmptyTime(0);
+
+ renewer =
+ LeaseRenewer.getInstance(FAKE_AUTHORITY, FAKE_UGI_A, MOCK_DFSCLIENT);
+ renewer.setGraceSleepPeriod(FAST_GRACE_PERIOD);
+ boolean success = renewer.put(MOCK_DFSCLIENT);
+ if (!success) {
+ LeaseRenewer.remove(renewer);
+ renewer =
+ LeaseRenewer.getInstance(FAKE_AUTHORITY, FAKE_UGI_A, MOCK_DFSCLIENT);
+ renewer.setGraceSleepPeriod(FAST_GRACE_PERIOD);
+ renewer.put(MOCK_DFSCLIENT);
+ }
+
+ int threadCount = countThreadMatching(daemonThreadNamePattern);
+ //Sometimes old LR#Daemon gets closed and lead to count 1 (rare scenario)
+ Assert.assertTrue(1 == threadCount || 2 == threadCount);
+
+ // After grace period, both daemon#1 and renewer#1 will be removed due to
+ // expiration, then daemon#2 will leak before HDFS-14575.
+ Thread.sleep(FAST_GRACE_PERIOD * 2);
+
+ // Pretend to close file#2, renewer#2 will be created
+ lastRenewer = renewer;
+ renewer =
+ LeaseRenewer.getInstance(FAKE_AUTHORITY, FAKE_UGI_A, MOCK_DFSCLIENT);
+ Assert.assertEquals(lastRenewer, renewer);
+ renewer.setGraceSleepPeriod(FAST_GRACE_PERIOD);
+ renewer.closeClient(MOCK_DFSCLIENT);
+ renewer.setEmptyTime(0);
+ // Make sure LeaseRenewer#daemon threads will terminate after grace period
+ Thread.sleep(FAST_GRACE_PERIOD * 2);
+ Assert.assertEquals("LeaseRenewer#daemon thread leaks", 0,
+ countThreadMatching(daemonThreadNamePattern));
+ }
+
+ private static int countThreadMatching(Pattern pattern) {
+ ThreadMXBean threadBean = ManagementFactory.getThreadMXBean();
+ ThreadInfo[] infos =
+ threadBean.getThreadInfo(threadBean.getAllThreadIds(), 1);
+ int count = 0;
+ for (ThreadInfo info : infos) {
+ if (info == null) {
+ continue;
+ }
+ if (pattern.matcher(info.getThreadName()).matches()) {
+ count++;
+ }
+ }
+ return count;
+ }
}