HBASE-22370 ByteBuf LEAK ERROR (#720)

Signed-off-by: Duo Zhang <zhangduo@apache.org>
diff --git a/hbase-server/src/main/java/org/apache/hadoop/hbase/io/asyncfs/FanOutOneBlockAsyncDFSOutput.java b/hbase-server/src/main/java/org/apache/hadoop/hbase/io/asyncfs/FanOutOneBlockAsyncDFSOutput.java
index ea9a0d8..a805659 100644
--- a/hbase-server/src/main/java/org/apache/hadoop/hbase/io/asyncfs/FanOutOneBlockAsyncDFSOutput.java
+++ b/hbase-server/src/main/java/org/apache/hadoop/hbase/io/asyncfs/FanOutOneBlockAsyncDFSOutput.java
@@ -550,6 +550,10 @@
    */
   @Override
   public void recoverAndClose(CancelableProgressable reporter) throws IOException {
+    if (buf != null) {
+      buf.release();
+      buf = null;
+    }
     datanodeList.forEach(ch -> ch.close());
     datanodeList.forEach(ch -> ch.closeFuture().awaitUninterruptibly());
     endFileLease(client, fileId);
diff --git a/hbase-server/src/main/java/org/apache/hadoop/hbase/ipc/CallRunner.java b/hbase-server/src/main/java/org/apache/hadoop/hbase/ipc/CallRunner.java
index 0ad3613..48ee664 100644
--- a/hbase-server/src/main/java/org/apache/hadoop/hbase/ipc/CallRunner.java
+++ b/hbase-server/src/main/java/org/apache/hadoop/hbase/ipc/CallRunner.java
@@ -88,6 +88,7 @@
    * Cleanup after ourselves... let go of references.
    */
   private void cleanup() {
+    this.call.cleanup();
     this.call = null;
     this.rpcServer = null;
   }
diff --git a/hbase-server/src/test/java/org/apache/hadoop/hbase/ipc/TestCallRunner.java b/hbase-server/src/test/java/org/apache/hadoop/hbase/ipc/TestCallRunner.java
index e614c2b..75997af 100644
--- a/hbase-server/src/test/java/org/apache/hadoop/hbase/ipc/TestCallRunner.java
+++ b/hbase-server/src/test/java/org/apache/hadoop/hbase/ipc/TestCallRunner.java
@@ -45,4 +45,30 @@
     cr.setStatus(new MonitoredRPCHandlerImpl());
     cr.run();
   }
+
+  @Test
+  public void testCallCleanup() {
+    RpcServerInterface mockRpcServer = Mockito.mock(RpcServerInterface.class);
+    Mockito.when(mockRpcServer.isStarted()).thenReturn(true);
+    ServerCall mockCall = Mockito.mock(ServerCall.class);
+    Mockito.when(mockCall.disconnectSince()).thenReturn(1L);
+
+    CallRunner cr = new CallRunner(mockRpcServer, mockCall);
+    cr.setStatus(new MonitoredRPCHandlerImpl());
+    cr.run();
+    Mockito.verify(mockCall, Mockito.times(1)).cleanup();
+  }
+
+  @Test
+  public void testCallRunnerDrop() {
+    RpcServerInterface mockRpcServer = Mockito.mock(RpcServerInterface.class);
+    Mockito.when(mockRpcServer.isStarted()).thenReturn(true);
+    ServerCall mockCall = Mockito.mock(ServerCall.class);
+    Mockito.when(mockCall.disconnectSince()).thenReturn(1L);
+
+    CallRunner cr = new CallRunner(mockRpcServer, mockCall);
+    cr.setStatus(new MonitoredRPCHandlerImpl());
+    cr.drop();
+    Mockito.verify(mockCall, Mockito.times(1)).cleanup();
+  }
 }