HBASE-22370 ByteBuf LEAK ERROR (#720)
Signed-off-by: Duo Zhang <zhangduo@apache.org>
diff --git a/hbase-server/src/main/java/org/apache/hadoop/hbase/io/asyncfs/FanOutOneBlockAsyncDFSOutput.java b/hbase-server/src/main/java/org/apache/hadoop/hbase/io/asyncfs/FanOutOneBlockAsyncDFSOutput.java
index ea9a0d8..a805659 100644
--- a/hbase-server/src/main/java/org/apache/hadoop/hbase/io/asyncfs/FanOutOneBlockAsyncDFSOutput.java
+++ b/hbase-server/src/main/java/org/apache/hadoop/hbase/io/asyncfs/FanOutOneBlockAsyncDFSOutput.java
@@ -550,6 +550,10 @@
*/
@Override
public void recoverAndClose(CancelableProgressable reporter) throws IOException {
+ if (buf != null) {
+ buf.release();
+ buf = null;
+ }
datanodeList.forEach(ch -> ch.close());
datanodeList.forEach(ch -> ch.closeFuture().awaitUninterruptibly());
endFileLease(client, fileId);
diff --git a/hbase-server/src/main/java/org/apache/hadoop/hbase/ipc/CallRunner.java b/hbase-server/src/main/java/org/apache/hadoop/hbase/ipc/CallRunner.java
index 0ad3613..48ee664 100644
--- a/hbase-server/src/main/java/org/apache/hadoop/hbase/ipc/CallRunner.java
+++ b/hbase-server/src/main/java/org/apache/hadoop/hbase/ipc/CallRunner.java
@@ -88,6 +88,7 @@
* Cleanup after ourselves... let go of references.
*/
private void cleanup() {
+ this.call.cleanup();
this.call = null;
this.rpcServer = null;
}
diff --git a/hbase-server/src/test/java/org/apache/hadoop/hbase/ipc/TestCallRunner.java b/hbase-server/src/test/java/org/apache/hadoop/hbase/ipc/TestCallRunner.java
index e614c2b..75997af 100644
--- a/hbase-server/src/test/java/org/apache/hadoop/hbase/ipc/TestCallRunner.java
+++ b/hbase-server/src/test/java/org/apache/hadoop/hbase/ipc/TestCallRunner.java
@@ -45,4 +45,30 @@
cr.setStatus(new MonitoredRPCHandlerImpl());
cr.run();
}
+
+ @Test
+ public void testCallCleanup() {
+ RpcServerInterface mockRpcServer = Mockito.mock(RpcServerInterface.class);
+ Mockito.when(mockRpcServer.isStarted()).thenReturn(true);
+ ServerCall mockCall = Mockito.mock(ServerCall.class);
+ Mockito.when(mockCall.disconnectSince()).thenReturn(1L);
+
+ CallRunner cr = new CallRunner(mockRpcServer, mockCall);
+ cr.setStatus(new MonitoredRPCHandlerImpl());
+ cr.run();
+ Mockito.verify(mockCall, Mockito.times(1)).cleanup();
+ }
+
+ @Test
+ public void testCallRunnerDrop() {
+ RpcServerInterface mockRpcServer = Mockito.mock(RpcServerInterface.class);
+ Mockito.when(mockRpcServer.isStarted()).thenReturn(true);
+ ServerCall mockCall = Mockito.mock(ServerCall.class);
+ Mockito.when(mockCall.disconnectSince()).thenReturn(1L);
+
+ CallRunner cr = new CallRunner(mockRpcServer, mockCall);
+ cr.setStatus(new MonitoredRPCHandlerImpl());
+ cr.drop();
+ Mockito.verify(mockCall, Mockito.times(1)).cleanup();
+ }
}