HDFS-8671 Add client support for HTTP/2 stream channels
diff --git a/hadoop-hdfs-project/hadoop-hdfs/pom.xml b/hadoop-hdfs-project/hadoop-hdfs/pom.xml
index 2cb1716..de6be26 100644
--- a/hadoop-hdfs-project/hadoop-hdfs/pom.xml
+++ b/hadoop-hdfs-project/hadoop-hdfs/pom.xml
@@ -199,6 +199,11 @@
       <scope>test</scope>
     </dependency>
     <dependency>
+      <groupId>org.eclipse.jetty.http2</groupId>
+      <artifactId>http2-server</artifactId>
+      <scope>test</scope>
+    </dependency>
+    <dependency>
       <groupId>org.apache.hadoop</groupId>
       <artifactId>hadoop-kms</artifactId>
       <classifier>classes</classifier>
@@ -221,6 +226,40 @@
       <artifactId>mockserver-netty</artifactId>
       <version>3.9.2</version>
       <scope>test</scope>
+      <exclusions>
+        <exclusion>
+          <groupId>io.netty</groupId>
+          <artifactId>netty-buffer</artifactId>
+        </exclusion>
+        <exclusion>
+          <groupId>io.netty</groupId>
+          <artifactId>netty-codec</artifactId>
+        </exclusion>
+        <exclusion>
+          <groupId>io.netty</groupId>
+          <artifactId>netty-codec-http</artifactId>
+        </exclusion>
+        <exclusion>
+          <groupId>io.netty</groupId>
+          <artifactId>netty-common</artifactId>
+        </exclusion>
+        <exclusion>
+          <groupId>io.netty</groupId>
+          <artifactId>netty-handler</artifactId>
+        </exclusion>
+        <exclusion>
+          <groupId>io.netty</groupId>
+          <artifactId>netty-transport</artifactId>
+        </exclusion>
+        <exclusion>
+          <groupId>io.netty</groupId>
+          <artifactId>netty-codec-socks</artifactId>
+        </exclusion>
+        <exclusion>
+          <groupId>javax.servlet</groupId>
+          <artifactId>javax.servlet-api</artifactId>
+        </exclusion>
+      </exclusions>
     </dependency>
     <!-- 'mvn dependency:analyze' fails to detect use of this dependency -->
     <dependency>
diff --git a/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/DFSConfigKeys.java b/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/DFSConfigKeys.java
index cb05fa9..0366ba5 100644
--- a/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/DFSConfigKeys.java
+++ b/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/DFSConfigKeys.java
@@ -1180,4 +1180,11 @@
   @Deprecated
   public static final long    DFS_CLIENT_KEY_PROVIDER_CACHE_EXPIRY_DEFAULT =
       HdfsClientConfigKeys.DFS_CLIENT_KEY_PROVIDER_CACHE_EXPIRY_DEFAULT;
+
+  // http2 related configs
+  public static final String  DFS_HTTP2_INITIAL_WINDOW_SIZE = "dfs.http2.initial.windows.size";
+  public static final int     DFS_HTTP2_INITIAL_WINDOW_SIZE_DEFAULT = 64 * 1024;
+
+  public static final String  DFS_HTTP2_WINDOW_UPDATE_RATIO = "dfs.http2.window.update.ratio";
+  public static final float   DFS_HTTP2_WINDOW_UPDATE_RATIO_DEFAULT = 0.5f;
 }
diff --git a/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/datanode/web/PortUnificationServerHandler.java b/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/datanode/web/PortUnificationServerHandler.java
index e5c5256..ce8a2cc 100644
--- a/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/datanode/web/PortUnificationServerHandler.java
+++ b/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/datanode/web/PortUnificationServerHandler.java
@@ -24,6 +24,7 @@
 import io.netty.handler.codec.ByteToMessageDecoder;
 import io.netty.handler.codec.http.HttpServerCodec;
 import io.netty.handler.codec.http2.Http2CodecUtil;
+import io.netty.handler.codec.http2.Http2Exception;
 import io.netty.handler.stream.ChunkedWriteHandler;
 
 import java.net.InetSocketAddress;
@@ -66,7 +67,7 @@
         new URLDispatcher(proxyHost, conf, confForCreate));
   }
 
-  private void configureHttp2(ChannelHandlerContext ctx) {
+  private void configureHttp2(ChannelHandlerContext ctx) throws Http2Exception {
     ctx.pipeline().addLast(
       ServerHttp2ConnectionHandler.create(ctx.channel(),
         new ChannelInitializer<Http2StreamChannel>() {
@@ -75,7 +76,7 @@
           protected void initChannel(Http2StreamChannel ch) throws Exception {
             ch.pipeline().addLast(new DtpChannelHandler());
           }
-        }));
+        }, conf));
   }
 
   @Override
diff --git a/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/web/http2/AbstractHttp2EventListener.java b/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/web/http2/AbstractHttp2EventListener.java
new file mode 100644
index 0000000..2ab2358
--- /dev/null
+++ b/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/web/http2/AbstractHttp2EventListener.java
@@ -0,0 +1,129 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.hadoop.hdfs.web.http2;
+
+import io.netty.buffer.ByteBuf;
+import io.netty.channel.Channel;
+import io.netty.channel.ChannelHandlerContext;
+import io.netty.handler.codec.http2.Http2Connection;
+import io.netty.handler.codec.http2.Http2Connection.PropertyKey;
+import io.netty.handler.codec.http2.Http2Error;
+import io.netty.handler.codec.http2.Http2EventAdapter;
+import io.netty.handler.codec.http2.Http2Exception;
+import io.netty.handler.codec.http2.Http2Headers;
+import io.netty.handler.codec.http2.Http2Stream;
+
+import java.util.concurrent.atomic.AtomicInteger;
+
+import org.apache.hadoop.classification.InterfaceAudience;
+
+/**
+ * Base class of an HTTP/2 FrameListener and EventListener to manage
+ * {@link Http2StreamChannel}s.
+ * <p>
+ * We do not handle onRstStreamRead here, a stream that being reset will also
+ * call onStreamClosed. The upper layer should not rely on a reset event.
+ */
+@InterfaceAudience.Private
+public abstract class AbstractHttp2EventListener extends Http2EventAdapter {
+
+  protected final Channel parentChannel;
+
+  protected final Http2Connection conn;
+
+  protected final PropertyKey subChannelPropKey;
+
+  protected final AtomicInteger numActiveStreams = new AtomicInteger(0);
+
+  protected AbstractHttp2EventListener(Channel parentChannel,
+      Http2Connection conn) {
+    this.parentChannel = parentChannel;
+    this.conn = conn;
+    this.subChannelPropKey = conn.newKey();
+  }
+
+  protected abstract void initChannelOnStreamActive(
+      Http2StreamChannel subChannel);
+
+  @Override
+  public void onStreamActive(final Http2Stream stream) {
+    numActiveStreams.incrementAndGet();
+    Http2StreamChannel subChannel =
+        new Http2StreamChannel(parentChannel, stream);
+    stream.setProperty(subChannelPropKey, subChannel);
+    initChannelOnStreamActive(subChannel);
+  }
+
+  @Override
+  public void onStreamClosed(Http2Stream stream) {
+    numActiveStreams.decrementAndGet();
+    Http2StreamChannel subChannel = stream.removeProperty(subChannelPropKey);
+    if (subChannel != null && subChannel.isRegistered()) {
+      subChannel.setClosed();
+    }
+  }
+
+  private Http2StreamChannel getSubChannel(int streamId) throws Http2Exception {
+    Http2StreamChannel subChannel =
+        conn.stream(streamId).getProperty(subChannelPropKey);
+    if (subChannel == null) {
+      throw Http2Exception.streamError(streamId, Http2Error.INTERNAL_ERROR,
+        "No sub channel found");
+    }
+    return subChannel;
+  }
+
+  private void writeInbound(int streamId, Object msg, boolean endOfStream,
+      int pendingBytes) throws Http2Exception {
+    Http2StreamChannel subChannel = getSubChannel(streamId);
+    subChannel.writeInbound(msg, pendingBytes);
+    if (endOfStream) {
+      subChannel.writeInbound(LastHttp2Message.get(), 0);
+    }
+    if (subChannel.config().isAutoRead()) {
+      subChannel.read();
+    }
+  }
+
+  @Override
+  public void onHeadersRead(ChannelHandlerContext ctx, int streamId,
+      Http2Headers headers, int padding, boolean endOfStream)
+      throws Http2Exception {
+    writeInbound(streamId, headers, endOfStream, 0);
+  }
+
+  @Override
+  public void onHeadersRead(ChannelHandlerContext ctx, int streamId,
+      Http2Headers headers, int streamDependency, short weight,
+      boolean exclusive, int padding, boolean endOfStream)
+      throws Http2Exception {
+    onHeadersRead(ctx, streamId, headers, padding, endOfStream);
+  }
+
+  @Override
+  public int onDataRead(ChannelHandlerContext ctx, int streamId, ByteBuf data,
+      int padding, boolean endOfStream) throws Http2Exception {
+    int pendingBytes = data.readableBytes() + padding;
+    writeInbound(streamId, data.retain(), endOfStream, pendingBytes);
+    return 0;
+  }
+
+  public int numActiveStreams() {
+    return numActiveStreams.get();
+  }
+}
diff --git a/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/web/http2/ByteBufferReadableInputStream.java b/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/web/http2/ByteBufferReadableInputStream.java
new file mode 100644
index 0000000..a54715a
--- /dev/null
+++ b/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/web/http2/ByteBufferReadableInputStream.java
@@ -0,0 +1,31 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.hadoop.hdfs.web.http2;
+
+import java.io.InputStream;
+
+import org.apache.hadoop.classification.InterfaceAudience;
+import org.apache.hadoop.fs.ByteBufferReadable;
+
+/**
+ * A place holder.
+ */
+@InterfaceAudience.Private
+public abstract class ByteBufferReadableInputStream extends InputStream
+    implements ByteBufferReadable {
+}
diff --git a/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/web/http2/ClientHttp2ConnectionHandler.java b/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/web/http2/ClientHttp2ConnectionHandler.java
new file mode 100644
index 0000000..42be10d
--- /dev/null
+++ b/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/web/http2/ClientHttp2ConnectionHandler.java
@@ -0,0 +1,146 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.hadoop.hdfs.web.http2;
+
+import io.netty.channel.Channel;
+import io.netty.channel.ChannelFuture;
+import io.netty.channel.ChannelFutureListener;
+import io.netty.channel.ChannelHandlerContext;
+import io.netty.channel.ChannelPromise;
+import io.netty.channel.ChannelPromiseAggregator;
+import io.netty.handler.codec.UnsupportedMessageTypeException;
+import io.netty.handler.codec.http2.DefaultHttp2Connection;
+import io.netty.handler.codec.http2.Http2Connection;
+import io.netty.handler.codec.http2.Http2ConnectionDecoder;
+import io.netty.handler.codec.http2.Http2ConnectionEncoder;
+import io.netty.handler.codec.http2.Http2ConnectionHandler;
+import io.netty.handler.codec.http2.Http2Exception;
+import io.netty.handler.codec.http2.Http2FrameLogger;
+import io.netty.handler.codec.http2.Http2Headers;
+import io.netty.handler.logging.LogLevel;
+import io.netty.util.concurrent.Promise;
+
+import java.util.concurrent.atomic.AtomicInteger;
+
+import org.apache.commons.logging.Log;
+import org.apache.commons.logging.LogFactory;
+import org.apache.hadoop.classification.InterfaceAudience;
+import org.apache.hadoop.conf.Configuration;
+
+/**
+ * An {@link Http2ConnectionHandler} used at client side.
+ */
+@InterfaceAudience.Private
+public class ClientHttp2ConnectionHandler extends Http2ConnectionHandler {
+
+  private static final Log LOG = LogFactory
+      .getLog(ClientHttp2ConnectionHandler.class);
+
+  private static final Http2FrameLogger FRAME_LOGGER = new Http2FrameLogger(
+      LogLevel.INFO, ClientHttp2ConnectionHandler.class);
+
+  private AtomicInteger nextStreamId = new AtomicInteger(3);
+
+  private final ClientHttp2EventListener listener;
+
+  private ClientHttp2ConnectionHandler(Http2ConnectionDecoder decoder,
+      Http2ConnectionEncoder encoder) {
+    super(decoder, encoder);
+    this.listener = (ClientHttp2EventListener) decoder.listener();
+  }
+
+  private int nextStreamId() {
+    return nextStreamId.getAndAdd(2);
+  }
+
+  private void writeHeaders(ChannelHandlerContext ctx,
+      Http2ConnectionEncoder encoder, final int streamId, Http2Headers headers,
+      boolean endStream, ChannelPromise promise,
+      final Promise<Http2StreamChannel> callback) {
+    encoder.writeHeaders(ctx, streamId, headers, 0, endStream, promise)
+        .addListener(new ChannelFutureListener() {
+
+          @Override
+          public void operationComplete(ChannelFuture future) throws Exception {
+            if (future.isSuccess()) {
+              callback
+                  .setSuccess(connection().stream(streamId)
+                      .<Http2StreamChannel> getProperty(
+                        listener.subChannelPropKey));
+            } else {
+              callback.setFailure(future.cause());
+            }
+          }
+        });
+  }
+
+  @Override
+  public void write(ChannelHandlerContext ctx, Object msg,
+      ChannelPromise promise) throws Exception {
+    if (msg instanceof StartHttp2StreamRequest) {
+      final StartHttp2StreamRequest request = (StartHttp2StreamRequest) msg;
+      final int streamId = nextStreamId();
+      Http2ConnectionEncoder encoder = encoder();
+      if (request.data.isReadable()) {
+        ChannelPromiseAggregator aggregator =
+            new ChannelPromiseAggregator(promise);
+        ChannelPromise headerPromise = ctx.newPromise();
+        aggregator.add(headerPromise);
+        writeHeaders(ctx, encoder, streamId, request.headers, false,
+          headerPromise, request.promise);
+        ChannelPromise dataPromise = ctx.newPromise();
+        aggregator.add(dataPromise);
+        encoder.writeData(ctx, streamId, request.data, 0, request.endStream,
+          dataPromise);
+      } else {
+        writeHeaders(ctx, encoder, streamId, request.headers,
+          request.endStream, promise, request.promise);
+      }
+    } else {
+      throw new UnsupportedMessageTypeException(msg,
+          StartHttp2StreamRequest.class);
+    }
+  }
+
+  public int numActiveStreams() {
+    return listener.numActiveStreams();
+  }
+
+  public int currentStreamId() {
+    return nextStreamId.get();
+  }
+
+  private static final Http2Util.Http2ConnectionHandlerFactory<ClientHttp2ConnectionHandler> FACTORY =
+      new Http2Util.Http2ConnectionHandlerFactory<ClientHttp2ConnectionHandler>() {
+
+        @Override
+        public ClientHttp2ConnectionHandler create(
+            Http2ConnectionDecoder decoder, Http2ConnectionEncoder encoder) {
+          return new ClientHttp2ConnectionHandler(decoder, encoder);
+        }
+      };
+
+  public static ClientHttp2ConnectionHandler create(Channel channel,
+      Configuration conf) throws Http2Exception {
+    Http2Connection conn = new DefaultHttp2Connection(false);
+    ClientHttp2EventListener listener =
+        new ClientHttp2EventListener(channel, conn);
+    return Http2Util.create(conf, conn, listener, FACTORY,
+      LOG.isDebugEnabled() ? FRAME_LOGGER : null);
+  }
+}
diff --git a/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/web/http2/ClientHttp2EventListener.java b/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/web/http2/ClientHttp2EventListener.java
new file mode 100644
index 0000000..f4bbdba
--- /dev/null
+++ b/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/web/http2/ClientHttp2EventListener.java
@@ -0,0 +1,42 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.hadoop.hdfs.web.http2;
+
+import io.netty.channel.Channel;
+import io.netty.handler.codec.http2.Http2Connection;
+
+import org.apache.hadoop.classification.InterfaceAudience;
+
+/**
+ * An HTTP/2 FrameListener and EventListener to manage
+ * {@link Http2StreamChannel}s for client.
+ */
+@InterfaceAudience.Private
+public class ClientHttp2EventListener extends AbstractHttp2EventListener {
+
+  public ClientHttp2EventListener(Channel parentChannel, Http2Connection conn) {
+    super(parentChannel, conn);
+  }
+
+  @Override
+  protected void initChannelOnStreamActive(Http2StreamChannel subChannel) {
+    // disable read until pipeline initialized
+    subChannel.config().setAutoRead(false);
+  }
+
+}
diff --git a/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/web/http2/Http2DataReceiver.java b/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/web/http2/Http2DataReceiver.java
new file mode 100644
index 0000000..7cfbcce
--- /dev/null
+++ b/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/web/http2/Http2DataReceiver.java
@@ -0,0 +1,326 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.hadoop.hdfs.web.http2;
+
+import io.netty.buffer.ByteBuf;
+import io.netty.channel.Channel;
+import io.netty.channel.ChannelConfig;
+import io.netty.channel.ChannelFuture;
+import io.netty.channel.ChannelFutureListener;
+import io.netty.channel.ChannelHandlerContext;
+import io.netty.channel.ChannelInboundHandlerAdapter;
+import io.netty.handler.codec.http2.Http2Headers;
+import io.netty.handler.timeout.ReadTimeoutException;
+import io.netty.handler.timeout.ReadTimeoutHandler;
+
+import java.io.EOFException;
+import java.io.IOException;
+import java.io.InputStream;
+import java.io.InterruptedIOException;
+import java.nio.ByteBuffer;
+import java.util.ArrayDeque;
+import java.util.Deque;
+
+import org.apache.hadoop.classification.InterfaceAudience;
+
+/**
+ * A helper class that wrapper the HTTP/2 data frame as an {@link InputStream}.
+ * <p>
+ * Notice that, this classes can be used together with
+ * {@link ReadTimeoutHandler} to limit the waiting time when reading data.
+ */
+@InterfaceAudience.Private
+public class Http2DataReceiver extends ChannelInboundHandlerAdapter {
+
+  private static final Component END_OF_STREAM = new Component(null, 0);
+
+  private static final EOFException EOF = new EOFException();
+
+  private static final class Component {
+
+    public final ByteBuf buf;
+
+    public final int length;
+
+    public Component(ByteBuf buf) {
+      this(buf, buf.readableBytes());
+    }
+
+    public Component(ByteBuf buf, int length) {
+      this.buf = buf;
+      this.length = length;
+    }
+
+  }
+
+  private final Deque<Component> queue = new ArrayDeque<Component>();
+
+  private int queuedBytes;
+
+  private Channel channel;
+
+  private Throwable error;
+
+  private Http2Headers headers;
+
+  private final ByteBufferReadableInputStream contentInput =
+      new ByteBufferReadableInputStream() {
+
+        @Override
+        public int read() throws IOException {
+          Component comp = peekUntilAvailable();
+          if (comp == END_OF_STREAM) {
+            return -1;
+          }
+          int b = comp.buf.readByte() & 0xFF;
+          if (!comp.buf.isReadable()) {
+            removeHead();
+          }
+          return b;
+        }
+
+        @Override
+        public int read(byte[] b, int off, int len) throws IOException {
+          Component comp = peekUntilAvailable();
+          if (comp == END_OF_STREAM) {
+            return -1;
+          }
+          int bufReadableBytes = comp.buf.readableBytes();
+          if (len >= bufReadableBytes) {
+            comp.buf.readBytes(b, off, bufReadableBytes);
+            removeHead();
+            return bufReadableBytes;
+          } else {
+            comp.buf.readBytes(b, off, len);
+            return len;
+          }
+        }
+
+        @Override
+        public long skip(long n) throws IOException {
+          Component comp = peekUntilAvailable();
+          if (comp == END_OF_STREAM) {
+            return 0L;
+          }
+          int bufReadableBytes = comp.buf.readableBytes();
+          if (n >= bufReadableBytes) {
+            removeHead();
+            return bufReadableBytes;
+          } else {
+            comp.buf.skipBytes((int) n);
+            return n;
+          }
+        }
+
+        @Override
+        public int read(ByteBuffer bb) throws IOException {
+          Component comp = peekUntilAvailable();
+          if (comp == END_OF_STREAM) {
+            return -1;
+          }
+          int bbRemaining = bb.remaining();
+          int bufReadableBytes = comp.buf.readableBytes();
+          if (bbRemaining >= bufReadableBytes) {
+            int toRestoredLimit = bb.limit();
+            bb.limit(bb.position() + bufReadableBytes);
+            comp.buf.readBytes(bb);
+            bb.limit(toRestoredLimit);
+            removeHead();
+            return bufReadableBytes;
+          } else {
+            comp.buf.readBytes(bb);
+            return bbRemaining;
+          }
+        }
+
+        private boolean closed = false;
+
+        @Override
+        public void close() throws IOException {
+          if (closed) {
+            return;
+          }
+          synchronized (queue) {
+            if (error == null) {
+              error = EOF;
+            }
+          }
+          channel.close().addListener(new ChannelFutureListener() {
+
+            @Override
+            public void operationComplete(ChannelFuture future)
+                throws Exception {
+              synchronized (queue) {
+                for (Component c; (c = queue.peek()) != null;) {
+                  if (c == END_OF_STREAM) {
+                    return;
+                  }
+                  c.buf.release();
+                  queue.remove();
+                }
+              }
+            }
+          });
+        }
+
+      };
+
+  @Override
+  public void channelRead(ChannelHandlerContext ctx, Object msg)
+      throws Exception {
+    if (msg == LastHttp2Message.get()) {
+      enqueue(END_OF_STREAM);
+    } else if (msg instanceof Http2Headers) {
+      synchronized (queue) {
+        headers = (Http2Headers) msg;
+        queue.notifyAll();
+      }
+    } else if (msg instanceof ByteBuf) {
+      ByteBuf buf = (ByteBuf) msg;
+      if (buf.isReadable()) {
+        enqueue(new Component(buf));
+      } else {
+        buf.release();
+      }
+    } else {
+      ctx.fireChannelRead(msg);
+    }
+  }
+
+  @Override
+  public void exceptionCaught(ChannelHandlerContext ctx, Throwable cause)
+      throws Exception {
+    synchronized (queue) {
+      if (error == null) {
+        error = cause;
+        queue.notifyAll();
+      }
+    }
+  }
+
+  @Override
+  public void channelActive(ChannelHandlerContext ctx) throws Exception {
+    channel = ctx.channel();
+  }
+
+  @Override
+  public void channelInactive(ChannelHandlerContext ctx) throws Exception {
+    synchronized (queue) {
+      if (error != null) {
+        return;
+      }
+      Component lastComp = queue.peekLast();
+      if (lastComp == END_OF_STREAM) {
+        return;
+      }
+      error = EOF;
+      notifyAll();
+    }
+  }
+
+  private void enqueue(Component comp) {
+    synchronized (queue) {
+      queuedBytes += comp.length;
+      if (queuedBytes >= channel.config().getWriteBufferHighWaterMark()) {
+        channel.config().setAutoRead(false);
+      }
+      queue.add(comp);
+      queue.notifyAll();
+    }
+  }
+
+  private Component peekUntilAvailable() throws IOException {
+    Throwable cause;
+    synchronized (queue) {
+      for (;;) {
+        if (!queue.isEmpty()) {
+          return queue.peek();
+        }
+        if (error != null) {
+          cause = error;
+          break;
+        }
+        try {
+          queue.wait();
+        } catch (InterruptedException e) {
+          throw new InterruptedIOException();
+        }
+      }
+    }
+    propagate(cause);
+    return null;
+  }
+
+  private void removeHead() {
+    Component comp;
+    synchronized (queue) {
+      comp = queue.remove();
+      queuedBytes -= comp.length;
+      ChannelConfig config = channel.config();
+      if (!config.isAutoRead()
+          && queuedBytes < config.getWriteBufferLowWaterMark()) {
+        config.setAutoRead(true);
+      }
+    }
+    comp.buf.release();
+  }
+
+  private void propagate(Throwable cause) throws IOException {
+    if (cause == ReadTimeoutException.INSTANCE) {
+      throw new IOException("Read timeout");
+    } else if (cause == EOF) {
+      throw new IOException("Stream reset by peer: " + channel.remoteAddress());
+    } else if (cause instanceof IOException) {
+      throw (IOException) cause;
+    } else if (cause instanceof RuntimeException) {
+      throw (RuntimeException) cause;
+    } else {
+      throw new IOException(cause);
+    }
+  }
+
+  public Http2Headers waitForResponse() throws IOException {
+    Throwable cause;
+    synchronized (queue) {
+      for (;;) {
+        if (error != null) {
+          cause = error;
+          break;
+        }
+        if (headers != null) {
+          return headers;
+        }
+        try {
+          queue.wait();
+        } catch (InterruptedException e) {
+          throw new InterruptedIOException();
+        }
+      }
+    }
+    propagate(cause);
+    return null;
+  }
+
+  /**
+   * The returned stream is not thread safe.
+   */
+  public ByteBufferReadableInputStream content() {
+    return contentInput;
+  }
+
+}
\ No newline at end of file
diff --git a/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/web/http2/Http2StreamBootstrap.java b/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/web/http2/Http2StreamBootstrap.java
new file mode 100644
index 0000000..4d0b32b
--- /dev/null
+++ b/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/web/http2/Http2StreamBootstrap.java
@@ -0,0 +1,160 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.hadoop.hdfs.web.http2;
+
+import io.netty.buffer.ByteBuf;
+import io.netty.buffer.Unpooled;
+import io.netty.channel.Channel;
+import io.netty.channel.ChannelFuture;
+import io.netty.channel.ChannelFutureListener;
+import io.netty.channel.ChannelHandler;
+import io.netty.channel.ChannelInitializer;
+import io.netty.channel.EventLoop;
+import io.netty.handler.codec.http2.Http2Headers;
+import io.netty.util.concurrent.Future;
+import io.netty.util.concurrent.FutureListener;
+import io.netty.util.concurrent.Promise;
+
+import org.apache.hadoop.classification.InterfaceAudience;
+
+import com.google.common.base.Preconditions;
+
+/**
+ * A {@link Http2StreamBootstrap} that makes it easy to bootstrap a
+ * {@link Http2StreamChannel} to use for clients.
+ * <p>
+ * Call connect when you finish set up other things to establish an
+ * {@link Http2StreamChannel}.
+ */
+@InterfaceAudience.Private
+public class Http2StreamBootstrap {
+  private Channel channel;
+
+  private Http2Headers headers;
+
+  private ByteBuf data = Unpooled.EMPTY_BUFFER;
+
+  private boolean endStream;
+
+  private ChannelHandler handler;
+
+  /**
+   * Set the {@link Channel} this HTTP/2 stream is running on.
+   */
+  public Http2StreamBootstrap channel(Channel channel) {
+    this.channel = channel;
+    return this;
+  }
+
+  /**
+   * Set the request headers.
+   */
+  public Http2StreamBootstrap headers(Http2Headers headers) {
+    this.headers = headers;
+    return this;
+  }
+
+  /**
+   * Set the request data.
+   * <p>
+   * This is used to avoid one context-switch if you only need to send a small
+   * piece of data.
+   */
+  public Http2StreamBootstrap data(ByteBuf data) {
+    this.data = data;
+    return this;
+  }
+
+  /**
+   * Set whether there is no data after the headers being sent.
+   * <p>
+   * Default is <tt>false </tt>which means you could still send data using the
+   * returned {@link Http2StreamChannel}.
+   */
+  public Http2StreamBootstrap endStream(boolean endStream) {
+    this.endStream = endStream;
+    return this;
+  }
+
+  /**
+   * The {@link ChannelHandler} which is used to serve request.
+   * <p>
+   * Typically, you should use a {@link ChannelInitializer} here.
+   */
+  public Http2StreamBootstrap handler(ChannelHandler handler) {
+    this.handler = handler;
+    return this;
+  }
+
+  /**
+   * Establish the {@link Http2StreamChannel}. You can get it with the returned
+   * {@link Future}.
+   */
+  public Future<Http2StreamChannel> connect() {
+    Preconditions.checkNotNull(headers);
+    Preconditions.checkNotNull(handler);
+    final Promise<Http2StreamChannel> registeredPromise =
+        channel.eventLoop().<Http2StreamChannel> newPromise();
+
+    final StartHttp2StreamRequest request =
+        new StartHttp2StreamRequest(headers, data, endStream, channel
+            .eventLoop().<Http2StreamChannel> newPromise()
+            .addListener(new FutureListener<Http2StreamChannel>() {
+
+              @Override
+              public void operationComplete(Future<Http2StreamChannel> future)
+                  throws Exception {
+                if (future.isSuccess()) {
+                  final Http2StreamChannel subChannel = future.get();
+                  subChannel.pipeline().addFirst(handler);
+                  channel.eventLoop().register(subChannel)
+                      .addListener(new ChannelFutureListener() {
+
+                        @Override
+                        public void operationComplete(ChannelFuture future)
+                            throws Exception {
+                          if (future.isSuccess()) {
+                            subChannel.config().setAutoRead(true);
+                            registeredPromise.setSuccess(subChannel);
+                          } else {
+                            registeredPromise.setFailure(future.cause());
+                          }
+                        }
+                      });
+                } else {
+                  registeredPromise.setFailure(future.cause());
+                }
+              }
+
+            }));
+    EventLoop eventLoop = channel.eventLoop();
+    if (eventLoop.inEventLoop()) {
+      channel.writeAndFlush(request);
+    } else {
+      channel.eventLoop().execute(new Runnable() {
+
+        @Override
+        public void run() {
+          channel.writeAndFlush(request);
+        }
+      });
+    }
+
+    return registeredPromise;
+  }
+}
diff --git a/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/web/http2/Http2StreamChannel.java b/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/web/http2/Http2StreamChannel.java
index 658ffe4..ffaf3ef 100644
--- a/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/web/http2/Http2StreamChannel.java
+++ b/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/web/http2/Http2StreamChannel.java
@@ -18,9 +18,12 @@
 package org.apache.hadoop.hdfs.web.http2;
 
 import io.netty.buffer.ByteBuf;
+import io.netty.buffer.Unpooled;
 import io.netty.channel.AbstractChannel;
 import io.netty.channel.Channel;
 import io.netty.channel.ChannelConfig;
+import io.netty.channel.ChannelFuture;
+import io.netty.channel.ChannelFutureListener;
 import io.netty.channel.ChannelHandlerContext;
 import io.netty.channel.ChannelMetadata;
 import io.netty.channel.ChannelOutboundBuffer;
@@ -32,8 +35,11 @@
 import io.netty.handler.codec.http2.Http2ConnectionEncoder;
 import io.netty.handler.codec.http2.Http2ConnectionHandler;
 import io.netty.handler.codec.http2.Http2Error;
+import io.netty.handler.codec.http2.Http2Exception;
 import io.netty.handler.codec.http2.Http2Headers;
+import io.netty.handler.codec.http2.Http2LocalFlowController;
 import io.netty.handler.codec.http2.Http2Stream;
+import io.netty.util.ReferenceCountUtil;
 import io.netty.util.internal.InternalThreadLocalMap;
 
 import java.net.SocketAddress;
@@ -64,10 +70,33 @@
   private static final int MAX_READER_STACK_DEPTH = 8;
 
   private final ChannelHandlerContext http2ConnHandlerCtx;
+
   private final Http2Stream stream;
+
+  private final Http2LocalFlowController localFlowController;
+
   private final Http2ConnectionEncoder encoder;
+
   private final DefaultChannelConfig config;
-  private final Queue<Object> inboundMessageQueue = new ArrayDeque<>();
+
+  private static final class InboundMessage {
+
+    public final Object msg;
+
+    public final int length;
+
+    public InboundMessage(Object msg, int length) {
+      this.msg = msg;
+      this.length = length;
+    }
+
+  }
+
+  private final Queue<InboundMessage> inboundMessageQueue = new ArrayDeque<>();
+
+  private boolean writePending = false;
+
+  private int pendingOutboundBytes;
 
   private enum State {
     OPEN, HALF_CLOSED_LOCAL, HALF_CLOSED_REMOTE, PRE_CLOSED, CLOSED
@@ -82,10 +111,16 @@
     Http2ConnectionHandler connHandler =
         (Http2ConnectionHandler) http2ConnHandlerCtx.handler();
     this.stream = stream;
+    this.localFlowController =
+        connHandler.connection().local().flowController();
     this.encoder = connHandler.encoder();
     this.config = new DefaultChannelConfig(this);
   }
 
+  public Http2Stream stream() {
+    return stream;
+  }
+
   @Override
   public ChannelConfig config() {
     return config;
@@ -98,8 +133,6 @@
 
   @Override
   public boolean isActive() {
-    // we create this channel after HTTP/2 stream active, so we do not have a
-    // separated 'active' state.
     return isOpen();
   }
 
@@ -115,6 +148,10 @@
         SocketAddress localAddress, ChannelPromise promise) {
       throw new UnsupportedOperationException();
     }
+
+    public void forceFlush() {
+      super.flush0();
+    }
   }
 
   @Override
@@ -149,7 +186,11 @@
 
   @Override
   protected void doClose() throws Exception {
-    if (stream.state() != Http2Stream.State.CLOSED) {
+    for (InboundMessage msg; (msg = inboundMessageQueue.poll()) != null;) {
+      ReferenceCountUtil.release(msg.msg);
+      localFlowController.consumeBytes(stream, msg.length);
+    }
+    if (state != State.PRE_CLOSED) {
       encoder.writeRstStream(http2ConnHandlerCtx, stream.id(),
         Http2Error.INTERNAL_ERROR.code(), http2ConnHandlerCtx.newPromise());
     }
@@ -157,36 +198,44 @@
   }
 
   private final Runnable readTask = new Runnable() {
-
     @Override
     public void run() {
       ChannelPipeline pipeline = pipeline();
-      int maxMessagesPerRead = config().getMaxMessagesPerRead();
-      for (int i = 0; i < maxMessagesPerRead; i++) {
-        Object m = inboundMessageQueue.poll();
-        if (m == null) {
-          break;
-        }
-        if (m == LastHttp2Message.get()) {
+      for (InboundMessage m; (m = inboundMessageQueue.poll()) != null;) {
+        if (m.msg == LastHttp2Message.get()) {
           state =
               state == State.HALF_CLOSED_LOCAL ? State.PRE_CLOSED
                   : State.HALF_CLOSED_REMOTE;
         }
-        pipeline.fireChannelRead(m);
+        try {
+          if (m.length > 0
+              && localFlowController.consumeBytes(stream, m.length)) {
+            http2ConnHandlerCtx.channel().flush();
+          }
+        } catch (Http2Exception e) {
+          // an Http2Exception at least means the stream is broken(maybe the
+          // whole connection), so we are out.
+          http2ConnHandlerCtx.pipeline().fireExceptionCaught(e);
+          return;
+        }
+        pipeline.fireChannelRead(m.msg);
       }
       pipeline.fireChannelReadComplete();
+      if (state == State.PRE_CLOSED) {
+        close();
+      }
     }
   };
 
   @Override
   protected void doBeginRead() throws Exception {
-    State currentState = this.state;
-    if (currentState == State.CLOSED) {
-      throw new ClosedChannelException();
-    }
     if (inboundMessageQueue.isEmpty()) {
       return;
     }
+    State currentState = this.state;
+    if (remoteSideClosed(currentState)) {
+      throw new ClosedChannelException();
+    }
     final InternalThreadLocalMap threadLocals = InternalThreadLocalMap.get();
     final Integer stackDepth = threadLocals.localChannelReaderStackDepth();
     if (stackDepth < MAX_READER_STACK_DEPTH) {
@@ -201,14 +250,25 @@
     }
   }
 
+  private void resumeWrite() {
+    writePending = false;
+    ((Http2Unsafe) unsafe()).forceFlush();
+  }
+
   @Override
   protected void doWrite(ChannelOutboundBuffer in) throws Exception {
     State currentState = this.state;
-    if (currentState == State.CLOSED) {
+    if (localSideClosed(currentState)) {
       throw new ClosedChannelException();
     }
+    int writeBufferHighWaterMark = config().getWriteBufferHighWaterMark();
+
     boolean flush = false;
     for (;;) {
+      if (pendingOutboundBytes >= writeBufferHighWaterMark) {
+        writePending = true;
+        break;
+      }
       Object msg = in.current();
       if (msg == null) {
         break;
@@ -217,15 +277,30 @@
         this.state =
             currentState == State.HALF_CLOSED_REMOTE ? State.PRE_CLOSED
                 : State.HALF_CLOSED_LOCAL;
-        encoder.writeData(http2ConnHandlerCtx, stream.id(), http2ConnHandlerCtx
-            .alloc().buffer(0), 0, true, http2ConnHandlerCtx.newPromise());
+        encoder.writeData(http2ConnHandlerCtx, stream.id(),
+          Unpooled.EMPTY_BUFFER, 0, true, http2ConnHandlerCtx.newPromise());
       } else if (msg instanceof Http2Headers) {
         encoder.writeHeaders(http2ConnHandlerCtx, stream.id(),
           (Http2Headers) msg, 0, false, http2ConnHandlerCtx.newPromise());
       } else if (msg instanceof ByteBuf) {
         ByteBuf data = (ByteBuf) msg;
+        final int pendingBytes = data.readableBytes();
+        pendingOutboundBytes += pendingBytes;
         encoder.writeData(http2ConnHandlerCtx, stream.id(), data.retain(), 0,
-          false, http2ConnHandlerCtx.newPromise());
+          false, http2ConnHandlerCtx.newPromise()).addListener(
+          new ChannelFutureListener() {
+
+            @Override
+            public void operationComplete(ChannelFuture future)
+                throws Exception {
+              pendingOutboundBytes -= pendingBytes;
+              if (writePending
+                  && pendingOutboundBytes <= config()
+                      .getWriteBufferLowWaterMark()) {
+                resumeWrite();
+              }
+            }
+          });
       } else {
         throw new UnsupportedMessageTypeException(msg, Http2Headers.class,
             ByteBuf.class);
@@ -236,33 +311,53 @@
     if (flush) {
       http2ConnHandlerCtx.channel().flush();
     }
+    if (state == State.PRE_CLOSED) {
+      close();
+    }
   }
 
-  /**
-   * Append a message to the inbound queue of this channel. You need to call
-   * {@link #read()} if you want to pass the message to handlers.
-   */
-  void writeInbound(Object msg) {
-    inboundMessageQueue.add(msg);
+  public void writeInbound(Object msg, int length) {
+    inboundMessageQueue.add(new InboundMessage(msg, length));
   }
 
   private static final ImmutableSet<State> REMOTE_SIDE_CLOSED_STATES =
       ImmutableSet.of(State.HALF_CLOSED_REMOTE, State.PRE_CLOSED, State.CLOSED);
 
-  /**
-   * @return true if remote side finishes sending data to us.
-   */
   public boolean remoteSideClosed() {
+    return remoteSideClosed(state);
+  }
+
+  private boolean remoteSideClosed(State state) {
     return REMOTE_SIDE_CLOSED_STATES.contains(state);
   }
 
   private static final ImmutableSet<State> LOCAL_SIDE_CLOSED_STATES =
       ImmutableSet.of(State.HALF_CLOSED_LOCAL, State.PRE_CLOSED, State.CLOSED);
 
-  /**
-   * @return true if we finish sending data to remote side.
-   */
   public boolean localSideClosed() {
+    return localSideClosed(state);
+  }
+
+  private boolean localSideClosed(State state) {
     return LOCAL_SIDE_CLOSED_STATES.contains(state);
   }
+
+  public void setClosed() {
+    State currentState = this.state;
+    if (!remoteSideClosed(currentState)) {
+      writeInbound(LastHttp2Message.get(), 0);
+      if (config().isAutoRead()) {
+        read();
+      }
+    }
+    currentState = this.state;
+    if (!localSideClosed(currentState)) {
+      this.state =
+          currentState == State.HALF_CLOSED_REMOTE ? State.PRE_CLOSED
+              : State.HALF_CLOSED_LOCAL;
+      if (currentState == State.PRE_CLOSED) {
+        close();
+      }
+    }
+  }
 }
diff --git a/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/web/http2/Http2Util.java b/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/web/http2/Http2Util.java
new file mode 100644
index 0000000..dfe9c3e
--- /dev/null
+++ b/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/web/http2/Http2Util.java
@@ -0,0 +1,85 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.hadoop.hdfs.web.http2;
+
+import io.netty.handler.codec.http2.DefaultHttp2ConnectionDecoder;
+import io.netty.handler.codec.http2.DefaultHttp2ConnectionEncoder;
+import io.netty.handler.codec.http2.DefaultHttp2FrameReader;
+import io.netty.handler.codec.http2.DefaultHttp2FrameWriter;
+import io.netty.handler.codec.http2.DefaultHttp2LocalFlowController;
+import io.netty.handler.codec.http2.Http2CodecUtil;
+import io.netty.handler.codec.http2.Http2Connection;
+import io.netty.handler.codec.http2.Http2ConnectionDecoder;
+import io.netty.handler.codec.http2.Http2ConnectionEncoder;
+import io.netty.handler.codec.http2.Http2ConnectionHandler;
+import io.netty.handler.codec.http2.Http2EventAdapter;
+import io.netty.handler.codec.http2.Http2Exception;
+import io.netty.handler.codec.http2.Http2FrameLogger;
+import io.netty.handler.codec.http2.Http2FrameReader;
+import io.netty.handler.codec.http2.Http2FrameWriter;
+import io.netty.handler.codec.http2.Http2InboundFrameLogger;
+import io.netty.handler.codec.http2.Http2OutboundFrameLogger;
+
+import org.apache.hadoop.classification.InterfaceAudience;
+import org.apache.hadoop.conf.Configuration;
+import org.apache.hadoop.hdfs.DFSConfigKeys;
+
+/**
+ *
+ */
+@InterfaceAudience.Private
+public class Http2Util {
+  public interface Http2ConnectionHandlerFactory<T extends Http2ConnectionHandler> {
+    T create(Http2ConnectionDecoder decoder, Http2ConnectionEncoder encoder);
+  }
+
+  @SuppressWarnings("resource")
+  static <T extends Http2ConnectionHandler> T create(Configuration conf,
+      Http2Connection conn, Http2EventAdapter listener,
+      Http2ConnectionHandlerFactory<T> handlerFactory,
+      Http2FrameLogger frameLogger) throws Http2Exception {
+    conn.addListener(listener);
+    DefaultHttp2FrameReader rawFrameReader = new DefaultHttp2FrameReader();
+    DefaultHttp2FrameWriter rawFrameWriter = new DefaultHttp2FrameWriter();
+    rawFrameWriter.maxFrameSize(Http2CodecUtil.MAX_FRAME_SIZE_UPPER_BOUND);
+    Http2FrameReader frameReader;
+    Http2FrameWriter frameWriter;
+    if (frameLogger != null) {
+      frameReader = new Http2InboundFrameLogger(rawFrameReader, frameLogger);
+      frameWriter = new Http2OutboundFrameLogger(rawFrameWriter, frameLogger);
+    } else {
+      frameReader = rawFrameReader;
+      frameWriter = rawFrameWriter;
+    }
+    DefaultHttp2LocalFlowController localFlowController =
+        new DefaultHttp2LocalFlowController(conn, frameWriter, conf.getFloat(
+          DFSConfigKeys.DFS_HTTP2_WINDOW_UPDATE_RATIO,
+          DFSConfigKeys.DFS_HTTP2_WINDOW_UPDATE_RATIO_DEFAULT));
+    int initialWindowsSize =
+        conf.getInt(DFSConfigKeys.DFS_HTTP2_INITIAL_WINDOW_SIZE,
+          DFSConfigKeys.DFS_HTTP2_INITIAL_WINDOW_SIZE_DEFAULT);
+    localFlowController.initialWindowSize(initialWindowsSize);
+    conn.local().flowController(localFlowController);
+
+    DefaultHttp2ConnectionEncoder encoder =
+        new DefaultHttp2ConnectionEncoder(conn, frameWriter);
+    DefaultHttp2ConnectionDecoder decoder =
+        new DefaultHttp2ConnectionDecoder(conn, encoder, frameReader, listener);
+    return handlerFactory.create(decoder, encoder);
+  }
+}
diff --git a/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/web/http2/ServerHttp2ConnectionHandler.java b/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/web/http2/ServerHttp2ConnectionHandler.java
index 1ee733d..964fd0b 100644
--- a/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/web/http2/ServerHttp2ConnectionHandler.java
+++ b/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/web/http2/ServerHttp2ConnectionHandler.java
@@ -20,21 +20,18 @@
 import io.netty.channel.Channel;
 import io.netty.channel.ChannelInitializer;
 import io.netty.handler.codec.http2.DefaultHttp2Connection;
-import io.netty.handler.codec.http2.DefaultHttp2FrameReader;
-import io.netty.handler.codec.http2.DefaultHttp2FrameWriter;
 import io.netty.handler.codec.http2.Http2Connection;
+import io.netty.handler.codec.http2.Http2ConnectionDecoder;
+import io.netty.handler.codec.http2.Http2ConnectionEncoder;
 import io.netty.handler.codec.http2.Http2ConnectionHandler;
-import io.netty.handler.codec.http2.Http2FrameListener;
+import io.netty.handler.codec.http2.Http2Exception;
 import io.netty.handler.codec.http2.Http2FrameLogger;
-import io.netty.handler.codec.http2.Http2FrameReader;
-import io.netty.handler.codec.http2.Http2FrameWriter;
-import io.netty.handler.codec.http2.Http2InboundFrameLogger;
-import io.netty.handler.codec.http2.Http2OutboundFrameLogger;
 import io.netty.handler.logging.LogLevel;
 
 import org.apache.commons.logging.Log;
 import org.apache.commons.logging.LogFactory;
 import org.apache.hadoop.classification.InterfaceAudience;
+import org.apache.hadoop.conf.Configuration;
 
 /**
  * An {@link Http2ConnectionHandler} used at server side.
@@ -48,39 +45,36 @@
   private static final Http2FrameLogger FRAME_LOGGER = new Http2FrameLogger(
       LogLevel.INFO, ServerHttp2ConnectionHandler.class);
 
-  private ServerHttp2ConnectionHandler(Http2Connection connection,
-      Http2FrameReader frameReader, Http2FrameWriter frameWriter,
-      Http2FrameListener listener) {
-    super(connection, frameReader, frameWriter, listener);
+  private ServerHttp2ConnectionHandler(Http2ConnectionDecoder decoder,
+      Http2ConnectionEncoder encoder) {
+    super(decoder, encoder);
   }
 
+  private static final Http2Util.Http2ConnectionHandlerFactory<ServerHttp2ConnectionHandler> FACTORY =
+      new Http2Util.Http2ConnectionHandlerFactory<ServerHttp2ConnectionHandler>() {
+
+        @Override
+        public ServerHttp2ConnectionHandler create(
+            Http2ConnectionDecoder decoder, Http2ConnectionEncoder encoder) {
+          return new ServerHttp2ConnectionHandler(decoder, encoder);
+        }
+      };
+
   /**
    * Create and initialize an {@link ServerHttp2ConnectionHandler}.
    * @param channel
    * @param initializer
-   * @param verbose whether to log inbound and outbound HTTP/2 messages
+   * @param conf
    * @return the initialized {@link ServerHttp2ConnectionHandler}
+   * @throws Http2Exception
    */
   public static ServerHttp2ConnectionHandler create(Channel channel,
-      ChannelInitializer<Http2StreamChannel> initializer) {
+      ChannelInitializer<Http2StreamChannel> initializer, Configuration conf)
+      throws Http2Exception {
     Http2Connection conn = new DefaultHttp2Connection(true);
     ServerHttp2EventListener listener =
         new ServerHttp2EventListener(channel, conn, initializer);
-    conn.addListener(listener);
-    Http2FrameReader frameReader;
-    Http2FrameWriter frameWriter;
-    if (LOG.isDebugEnabled()) {
-      frameReader =
-          new Http2InboundFrameLogger(new DefaultHttp2FrameReader(),
-              FRAME_LOGGER);
-      frameWriter =
-          new Http2OutboundFrameLogger(new DefaultHttp2FrameWriter(),
-              FRAME_LOGGER);
-    } else {
-      frameReader = new DefaultHttp2FrameReader();
-      frameWriter = new DefaultHttp2FrameWriter();
-    }
-    return new ServerHttp2ConnectionHandler(conn, frameReader, frameWriter,
-        listener);
+    return Http2Util.create(conf, conn, listener, FACTORY,
+      LOG.isDebugEnabled() ? FRAME_LOGGER : null);
   }
 }
diff --git a/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/web/http2/ServerHttp2EventListener.java b/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/web/http2/ServerHttp2EventListener.java
index 72e3879..f58f6a3 100644
--- a/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/web/http2/ServerHttp2EventListener.java
+++ b/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/web/http2/ServerHttp2EventListener.java
@@ -17,17 +17,9 @@
  */
 package org.apache.hadoop.hdfs.web.http2;
 
-import io.netty.buffer.ByteBuf;
 import io.netty.channel.Channel;
-import io.netty.channel.ChannelHandlerContext;
 import io.netty.channel.ChannelInitializer;
 import io.netty.handler.codec.http2.Http2Connection;
-import io.netty.handler.codec.http2.Http2Connection.PropertyKey;
-import io.netty.handler.codec.http2.Http2Error;
-import io.netty.handler.codec.http2.Http2EventAdapter;
-import io.netty.handler.codec.http2.Http2Exception;
-import io.netty.handler.codec.http2.Http2Headers;
-import io.netty.handler.codec.http2.Http2Stream;
 import io.netty.util.concurrent.Future;
 import io.netty.util.concurrent.FutureListener;
 
@@ -35,35 +27,21 @@
 
 /**
  * An HTTP/2 FrameListener and EventListener to manage
- * {@link Http2StreamChannel}s.
- * <p>
- * We do not handle onRstStreamRead here, a stream that being reset will also
- * call onStreamClosed. The upper layer should not rely on a reset event.
+ * {@link Http2StreamChannel}s for server.
  */
 @InterfaceAudience.Private
-public class ServerHttp2EventListener extends Http2EventAdapter {
-
-  private final Channel parentChannel;
+public class ServerHttp2EventListener extends AbstractHttp2EventListener {
 
   private final ChannelInitializer<Http2StreamChannel> subChannelInitializer;
 
-  private final Http2Connection conn;
-
-  private final PropertyKey subChannelPropKey;
-
   public ServerHttp2EventListener(Channel parentChannel, Http2Connection conn,
       ChannelInitializer<Http2StreamChannel> subChannelInitializer) {
-    this.parentChannel = parentChannel;
-    this.conn = conn;
+    super(parentChannel, conn);
     this.subChannelInitializer = subChannelInitializer;
-    this.subChannelPropKey = conn.newKey();
   }
 
   @Override
-  public void onStreamActive(final Http2Stream stream) {
-    Http2StreamChannel subChannel =
-        new Http2StreamChannel(parentChannel, stream);
-    stream.setProperty(subChannelPropKey, subChannel);
+  protected void initChannelOnStreamActive(final Http2StreamChannel subChannel) {
     subChannel.pipeline().addFirst(subChannelInitializer);
     parentChannel.eventLoop().register(subChannel)
         .addListener(new FutureListener<Void>() {
@@ -71,65 +49,10 @@
           @Override
           public void operationComplete(Future<Void> future) throws Exception {
             if (!future.isSuccess()) {
-              stream.removeProperty(subChannelPropKey);
+              subChannel.stream().removeProperty(subChannelPropKey);
             }
           }
 
         });
-
-  }
-
-  @Override
-  public void onStreamClosed(Http2Stream stream) {
-    Http2StreamChannel subChannel = stream.removeProperty(subChannelPropKey);
-    if (subChannel != null) {
-      subChannel.close();
-    }
-  }
-
-  private Http2StreamChannel getSubChannel(int streamId) throws Http2Exception {
-    Http2StreamChannel subChannel =
-        conn.stream(streamId).getProperty(subChannelPropKey);
-    if (subChannel == null) {
-      throw Http2Exception.streamError(streamId, Http2Error.INTERNAL_ERROR,
-        "No sub channel found");
-    }
-    return subChannel;
-  }
-
-  private void writeInbound(int streamId, Object msg, boolean endOfStream)
-      throws Http2Exception {
-    Http2StreamChannel subChannel = getSubChannel(streamId);
-    subChannel.writeInbound(msg);
-    if (endOfStream) {
-      subChannel.writeInbound(LastHttp2Message.get());
-    }
-    if (subChannel.config().isAutoRead()) {
-      subChannel.read();
-    }
-
-  }
-
-  @Override
-  public void onHeadersRead(ChannelHandlerContext ctx, int streamId,
-      Http2Headers headers, int padding, boolean endOfStream)
-      throws Http2Exception {
-    writeInbound(streamId, headers, endOfStream);
-  }
-
-  @Override
-  public void onHeadersRead(ChannelHandlerContext ctx, int streamId,
-      Http2Headers headers, int streamDependency, short weight,
-      boolean exclusive, int padding, boolean endOfStream)
-      throws Http2Exception {
-    onHeadersRead(ctx, streamId, headers, padding, endOfStream);
-  }
-
-  @Override
-  public int onDataRead(ChannelHandlerContext ctx, int streamId, ByteBuf data,
-      int padding, boolean endOfStream) throws Http2Exception {
-    int pendingBytes = data.readableBytes() + padding;
-    writeInbound(streamId, data.retain(), endOfStream);
-    return pendingBytes;
   }
 }
diff --git a/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/web/http2/StartHttp2StreamRequest.java b/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/web/http2/StartHttp2StreamRequest.java
new file mode 100644
index 0000000..cda447b
--- /dev/null
+++ b/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/web/http2/StartHttp2StreamRequest.java
@@ -0,0 +1,48 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.hadoop.hdfs.web.http2;
+
+import io.netty.buffer.ByteBuf;
+import io.netty.handler.codec.http2.Http2Headers;
+import io.netty.util.concurrent.Promise;
+
+import org.apache.hadoop.classification.InterfaceAudience;
+
+/**
+ * Used by {@link Http2StreamBootstrap} to establish an
+ * {@link Http2StreamChannel}.
+ */
+@InterfaceAudience.Private
+class StartHttp2StreamRequest {
+
+  final Http2Headers headers;
+
+  final ByteBuf data;
+
+  final boolean endStream;
+
+  final Promise<Http2StreamChannel> promise;
+
+  StartHttp2StreamRequest(Http2Headers headers, ByteBuf data,
+      boolean endStream, Promise<Http2StreamChannel> promise) {
+    this.headers = headers;
+    this.data = data;
+    this.endStream = endStream;
+    this.promise = promise;
+  }
+}
diff --git a/hadoop-hdfs-project/hadoop-hdfs/src/test/java/org/apache/hadoop/hdfs/server/datanode/web/dtp/Http2ResponseHandler.java b/hadoop-hdfs-project/hadoop-hdfs/src/test/java/org/apache/hadoop/hdfs/server/datanode/web/dtp/Http2ResponseHandler.java
deleted file mode 100644
index 1e1acdd..0000000
--- a/hadoop-hdfs-project/hadoop-hdfs/src/test/java/org/apache/hadoop/hdfs/server/datanode/web/dtp/Http2ResponseHandler.java
+++ /dev/null
@@ -1,63 +0,0 @@
-/**
- * Licensed to the Apache Software Foundation (ASF) under one
- * or more contributor license agreements.  See the NOTICE file
- * distributed with this work for additional information
- * regarding copyright ownership.  The ASF licenses this file
- * to you under the Apache License, Version 2.0 (the
- * "License"); you may not use this file except in compliance
- * with the License.  You may obtain a copy of the License at
- *
- *     http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-package org.apache.hadoop.hdfs.server.datanode.web.dtp;
-
-import io.netty.channel.ChannelHandlerContext;
-import io.netty.channel.SimpleChannelInboundHandler;
-import io.netty.handler.codec.http.FullHttpResponse;
-import io.netty.handler.codec.http2.HttpUtil;
-import io.netty.util.concurrent.Promise;
-
-import java.util.concurrent.ConcurrentMap;
-
-import net.sf.ehcache.store.chm.ConcurrentHashMap;
-
-public class Http2ResponseHandler extends
-    SimpleChannelInboundHandler<FullHttpResponse> {
-
-  private ConcurrentMap<Integer, Promise<FullHttpResponse>> streamId2Promise =
-      new ConcurrentHashMap<>();
-
-  @Override
-  protected void channelRead0(ChannelHandlerContext ctx, FullHttpResponse msg)
-      throws Exception {
-    Integer streamId =
-        msg.headers().getInt(HttpUtil.ExtensionHeaderNames.STREAM_ID.text());
-    if (streamId == null) {
-      System.err.println("HttpResponseHandler unexpected message received: "
-          + msg);
-      return;
-    }
-    if (streamId.intValue() == 1) {
-      // this is the upgrade response message, just ignore it.
-      return;
-    }
-    Promise<FullHttpResponse> promise = streamId2Promise.get(streamId);
-    if (promise == null) {
-      System.err.println("Message received for unknown stream id " + streamId);
-    } else {
-      // Do stuff with the message (for now just print it)
-      promise.setSuccess(msg.retain());
-
-    }
-  }
-
-  public void put(Integer streamId, Promise<FullHttpResponse> promise) {
-    streamId2Promise.put(streamId, promise);
-  }
-}
\ No newline at end of file
diff --git a/hadoop-hdfs-project/hadoop-hdfs/src/test/java/org/apache/hadoop/hdfs/server/datanode/web/dtp/TestDtpHttp2.java b/hadoop-hdfs-project/hadoop-hdfs/src/test/java/org/apache/hadoop/hdfs/server/datanode/web/dtp/TestDtpHttp2.java
index eaa63a4..4d4ac37 100644
--- a/hadoop-hdfs-project/hadoop-hdfs/src/test/java/org/apache/hadoop/hdfs/server/datanode/web/dtp/TestDtpHttp2.java
+++ b/hadoop-hdfs-project/hadoop-hdfs/src/test/java/org/apache/hadoop/hdfs/server/datanode/web/dtp/TestDtpHttp2.java
@@ -19,35 +19,16 @@
 
 import static org.junit.Assert.assertEquals;
 import io.netty.bootstrap.Bootstrap;
-import io.netty.buffer.ByteBuf;
 import io.netty.channel.Channel;
 import io.netty.channel.ChannelInitializer;
 import io.netty.channel.EventLoopGroup;
 import io.netty.channel.nio.NioEventLoopGroup;
 import io.netty.channel.socket.nio.NioSocketChannel;
-import io.netty.handler.codec.http.DefaultFullHttpRequest;
-import io.netty.handler.codec.http.FullHttpRequest;
-import io.netty.handler.codec.http.FullHttpResponse;
 import io.netty.handler.codec.http.HttpMethod;
 import io.netty.handler.codec.http.HttpResponseStatus;
-import io.netty.handler.codec.http.HttpVersion;
-import io.netty.handler.codec.http2.DefaultHttp2Connection;
-import io.netty.handler.codec.http2.DefaultHttp2FrameReader;
-import io.netty.handler.codec.http2.DefaultHttp2FrameWriter;
-import io.netty.handler.codec.http2.DelegatingDecompressorFrameListener;
-import io.netty.handler.codec.http2.Http2Connection;
-import io.netty.handler.codec.http2.Http2ConnectionHandler;
-import io.netty.handler.codec.http2.Http2FrameLogger;
-import io.netty.handler.codec.http2.Http2FrameReader;
-import io.netty.handler.codec.http2.Http2FrameWriter;
-import io.netty.handler.codec.http2.Http2InboundFrameLogger;
-import io.netty.handler.codec.http2.Http2OutboundFrameLogger;
-import io.netty.handler.codec.http2.HttpToHttp2ConnectionHandler;
-import io.netty.handler.codec.http2.HttpUtil;
-import io.netty.handler.codec.http2.InboundHttp2ToHttpAdapter;
-import io.netty.handler.logging.LogLevel;
+import io.netty.handler.codec.http2.DefaultHttp2Headers;
 import io.netty.handler.timeout.TimeoutException;
-import io.netty.util.concurrent.Promise;
+import io.netty.util.AsciiString;
 
 import java.io.IOException;
 import java.net.URISyntaxException;
@@ -57,14 +38,17 @@
 import org.apache.hadoop.conf.Configuration;
 import org.apache.hadoop.hdfs.MiniDFSCluster;
 import org.apache.hadoop.hdfs.web.WebHdfsTestUtil;
+import org.apache.hadoop.hdfs.web.http2.ClientHttp2ConnectionHandler;
+import org.apache.hadoop.hdfs.web.http2.Http2DataReceiver;
+import org.apache.hadoop.hdfs.web.http2.Http2StreamBootstrap;
+import org.apache.hadoop.hdfs.web.http2.Http2StreamChannel;
 import org.junit.AfterClass;
 import org.junit.BeforeClass;
 import org.junit.Test;
 
-public class TestDtpHttp2 {
+import com.google.common.io.ByteStreams;
 
-  private static final Http2FrameLogger FRAME_LOGGER = new Http2FrameLogger(
-      LogLevel.INFO, TestDtpHttp2.class);
+public class TestDtpHttp2 {
 
   private static final Configuration CONF = WebHdfsTestUtil.createConf();
 
@@ -74,16 +58,15 @@
 
   private static Channel CHANNEL;
 
-  private static Http2ResponseHandler RESPONSE_HANDLER;
+  private static Http2StreamChannel STREAM;
 
   @BeforeClass
   public static void setUp() throws IOException, URISyntaxException,
-      TimeoutException {
+      TimeoutException, InterruptedException, ExecutionException {
     CLUSTER = new MiniDFSCluster.Builder(CONF).numDataNodes(1).build();
     CLUSTER.waitActive();
 
-    RESPONSE_HANDLER = new Http2ResponseHandler();
-    Bootstrap bootstrap =
+    CHANNEL =
         new Bootstrap()
             .group(WORKER_GROUP)
             .channel(NioSocketChannel.class)
@@ -93,22 +76,31 @@
 
               @Override
               protected void initChannel(Channel ch) throws Exception {
-                Http2Connection connection = new DefaultHttp2Connection(false);
-                Http2ConnectionHandler connectionHandler =
-                    new HttpToHttp2ConnectionHandler(connection, frameReader(),
-                        frameWriter(), new DelegatingDecompressorFrameListener(
-                            connection, new InboundHttp2ToHttpAdapter.Builder(
-                                connection).maxContentLength(Integer.MAX_VALUE)
-                                .propagateSettings(true).build()));
-                ch.pipeline().addLast(connectionHandler, RESPONSE_HANDLER);
+                ch.pipeline().addLast(
+                  ClientHttp2ConnectionHandler.create(ch, CONF));
               }
-            });
-    CHANNEL = bootstrap.connect().syncUninterruptibly().channel();
+            }).connect().syncUninterruptibly().channel();
+    STREAM =
+        new Http2StreamBootstrap()
+            .channel(CHANNEL)
+            .headers(
+              new DefaultHttp2Headers().method(
+                new AsciiString(HttpMethod.GET.name())).path(
+                new AsciiString("/"))).endStream(true)
+            .handler(new ChannelInitializer<Channel>() {
 
+              @Override
+              protected void initChannel(Channel ch) throws Exception {
+                ch.pipeline().addLast(new Http2DataReceiver());
+              }
+            }).connect().syncUninterruptibly().get();
   }
 
   @AfterClass
   public static void tearDown() throws IOException {
+    if (STREAM != null) {
+      STREAM.close();
+    }
     if (CHANNEL != null) {
       CHANNEL.close().syncUninterruptibly();
     }
@@ -118,28 +110,14 @@
     }
   }
 
-  private static Http2FrameReader frameReader() {
-    return new Http2InboundFrameLogger(new DefaultHttp2FrameReader(),
-        FRAME_LOGGER);
-  }
-
-  private static Http2FrameWriter frameWriter() {
-    return new Http2OutboundFrameLogger(new DefaultHttp2FrameWriter(),
-        FRAME_LOGGER);
-  }
-
   @Test
-  public void test() throws InterruptedException, ExecutionException {
-    int streamId = 3;
-    FullHttpRequest request =
-        new DefaultFullHttpRequest(HttpVersion.HTTP_1_1, HttpMethod.GET, "/");
-    request.headers().add(HttpUtil.ExtensionHeaderNames.STREAM_ID.text(),
-      streamId);
-    Promise<FullHttpResponse> promise = CHANNEL.eventLoop().newPromise();
-    RESPONSE_HANDLER.put(streamId, promise);
-    CHANNEL.writeAndFlush(request);
-    assertEquals(HttpResponseStatus.OK, promise.get().status());
-    ByteBuf content = promise.get().content();
-    assertEquals("HTTP/2 DTP", content.toString(StandardCharsets.UTF_8));
+  public void test() throws InterruptedException, ExecutionException,
+      IOException {
+    Http2DataReceiver receiver = STREAM.pipeline().get(Http2DataReceiver.class);
+    assertEquals(HttpResponseStatus.OK.codeAsText(), receiver.waitForResponse()
+        .status());
+    assertEquals("HTTP/2 DTP",
+      new String(ByteStreams.toByteArray(receiver.content()),
+          StandardCharsets.UTF_8));
   }
 }
diff --git a/hadoop-hdfs-project/hadoop-hdfs/src/test/java/org/apache/hadoop/hdfs/web/http2/AbstractTestHttp2Client.java b/hadoop-hdfs-project/hadoop-hdfs/src/test/java/org/apache/hadoop/hdfs/web/http2/AbstractTestHttp2Client.java
new file mode 100644
index 0000000..d7c6d16
--- /dev/null
+++ b/hadoop-hdfs-project/hadoop-hdfs/src/test/java/org/apache/hadoop/hdfs/web/http2/AbstractTestHttp2Client.java
@@ -0,0 +1,132 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.hadoop.hdfs.web.http2;
+
+import io.netty.bootstrap.Bootstrap;
+import io.netty.channel.Channel;
+import io.netty.channel.ChannelInitializer;
+import io.netty.channel.EventLoopGroup;
+import io.netty.channel.nio.NioEventLoopGroup;
+import io.netty.channel.socket.nio.NioSocketChannel;
+import io.netty.handler.codec.http.HttpMethod;
+import io.netty.handler.codec.http2.DefaultHttp2Headers;
+import io.netty.util.ByteString;
+
+import java.io.IOException;
+import java.net.InetSocketAddress;
+import java.nio.charset.StandardCharsets;
+import java.util.concurrent.ExecutionException;
+
+import javax.servlet.ServletException;
+import javax.servlet.http.HttpServletRequest;
+import javax.servlet.http.HttpServletResponse;
+
+import org.apache.commons.io.IOUtils;
+import org.apache.hadoop.conf.Configuration;
+import org.eclipse.jetty.http2.server.HTTP2CServerConnectionFactory;
+import org.eclipse.jetty.server.HttpConfiguration;
+import org.eclipse.jetty.server.Request;
+import org.eclipse.jetty.server.Server;
+import org.eclipse.jetty.server.ServerConnector;
+import org.eclipse.jetty.server.handler.AbstractHandler;
+
+public abstract class AbstractTestHttp2Client {
+
+  protected EventLoopGroup workerGroup = new NioEventLoopGroup();
+
+  protected Server server;
+
+  protected final class EchoHandler extends AbstractHandler {
+
+    @Override
+    public void handle(String target, Request baseRequest,
+        HttpServletRequest request, HttpServletResponse response)
+        throws IOException, ServletException {
+      byte[] msg = IOUtils.toByteArray(request.getInputStream());
+      response.getOutputStream().write(msg);
+      response.getOutputStream().flush();
+    }
+
+  }
+
+  protected Channel channel;
+
+  protected void start() throws Exception {
+    server = new Server();
+    ServerConnector connector =
+        new ServerConnector(server, new HTTP2CServerConnectionFactory(
+            new HttpConfiguration()));
+    connector.setPort(0);
+    server.addConnector(connector);
+    setHandler(server);
+    server.start();
+    channel =
+        new Bootstrap()
+            .group(workerGroup)
+            .channel(NioSocketChannel.class)
+            .handler(new ChannelInitializer<Channel>() {
+
+              @Override
+              protected void initChannel(Channel ch) throws Exception {
+                ch.pipeline().addLast(
+                  ClientHttp2ConnectionHandler.create(ch, new Configuration()));
+              }
+
+            })
+            .connect(
+              new InetSocketAddress("127.0.0.1", connector.getLocalPort()))
+            .sync().channel();
+  }
+
+  protected void stop() throws Exception {
+    if (channel != null) {
+      channel.close();
+    }
+    if (server != null) {
+      server.stop();
+    }
+    workerGroup.shutdownGracefully();
+  }
+
+  protected Http2StreamChannel connect(boolean endStream)
+      throws InterruptedException, ExecutionException {
+    return new Http2StreamBootstrap()
+        .channel(channel)
+        .handler(new ChannelInitializer<Http2StreamChannel>() {
+
+          @Override
+          protected void initChannel(Http2StreamChannel ch) throws Exception {
+            ch.pipeline().addLast(new Http2DataReceiver());
+          }
+
+        })
+        .headers(
+          new DefaultHttp2Headers()
+              .method(
+                new ByteString(HttpMethod.GET.name(), StandardCharsets.UTF_8))
+              .path(new ByteString("/", StandardCharsets.UTF_8))
+              .scheme(new ByteString("http", StandardCharsets.UTF_8))
+              .authority(
+                new ByteString("127.0.0.1:"
+                    + ((InetSocketAddress) channel.remoteAddress()).getPort(),
+                    StandardCharsets.UTF_8))).endStream(endStream).connect()
+        .sync().get();
+  }
+
+  protected abstract void setHandler(Server server);
+}
\ No newline at end of file
diff --git a/hadoop-hdfs-project/hadoop-hdfs/src/test/java/org/apache/hadoop/hdfs/web/http2/TestHttp2Client.java b/hadoop-hdfs-project/hadoop-hdfs/src/test/java/org/apache/hadoop/hdfs/web/http2/TestHttp2Client.java
new file mode 100644
index 0000000..ab3f6d4
--- /dev/null
+++ b/hadoop-hdfs-project/hadoop-hdfs/src/test/java/org/apache/hadoop/hdfs/web/http2/TestHttp2Client.java
@@ -0,0 +1,65 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.hadoop.hdfs.web.http2;
+
+import static org.junit.Assert.assertEquals;
+import io.netty.handler.codec.http.HttpResponseStatus;
+
+import java.io.IOException;
+import java.nio.charset.StandardCharsets;
+import java.util.concurrent.ExecutionException;
+
+import org.eclipse.jetty.server.Server;
+import org.junit.After;
+import org.junit.Before;
+import org.junit.Test;
+
+import com.google.common.io.ByteStreams;
+
+public class TestHttp2Client extends AbstractTestHttp2Client {
+
+  @Before
+  public void setUp() throws Exception {
+    start();
+  }
+
+  @After
+  public void tearDown() throws Exception {
+    stop();
+  }
+
+  @Test
+  public void test() throws InterruptedException, ExecutionException,
+      IOException {
+    Http2StreamChannel stream = connect(false);
+    Http2DataReceiver receiver = stream.pipeline().get(Http2DataReceiver.class);
+    stream.write(stream.alloc().buffer()
+        .writeBytes("Hello World".getBytes(StandardCharsets.UTF_8)));
+    stream.writeAndFlush(LastHttp2Message.get());
+    assertEquals(receiver.waitForResponse().status(),
+      HttpResponseStatus.OK.codeAsText());
+    assertEquals("Hello World",
+      new String(ByteStreams.toByteArray(receiver.content()),
+          StandardCharsets.UTF_8));
+  }
+
+  @Override
+  protected void setHandler(Server server) {
+    server.setHandler(new EchoHandler());
+  }
+}
\ No newline at end of file
diff --git a/hadoop-hdfs-project/hadoop-hdfs/src/test/java/org/apache/hadoop/hdfs/web/http2/TestHttp2ClientMultiThread.java b/hadoop-hdfs-project/hadoop-hdfs/src/test/java/org/apache/hadoop/hdfs/web/http2/TestHttp2ClientMultiThread.java
new file mode 100644
index 0000000..1c483c1
--- /dev/null
+++ b/hadoop-hdfs-project/hadoop-hdfs/src/test/java/org/apache/hadoop/hdfs/web/http2/TestHttp2ClientMultiThread.java
@@ -0,0 +1,100 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.hadoop.hdfs.web.http2;
+
+import static org.junit.Assert.assertArrayEquals;
+import static org.junit.Assert.assertEquals;
+import static org.junit.Assert.assertTrue;
+import io.netty.handler.codec.http.HttpResponseStatus;
+
+import java.io.IOException;
+import java.util.concurrent.ExecutionException;
+import java.util.concurrent.ExecutorService;
+import java.util.concurrent.Executors;
+import java.util.concurrent.ThreadLocalRandom;
+import java.util.concurrent.TimeUnit;
+import java.util.concurrent.atomic.AtomicBoolean;
+
+import org.eclipse.jetty.server.Server;
+import org.junit.After;
+import org.junit.Before;
+import org.junit.Test;
+
+import com.google.common.io.ByteStreams;
+import com.google.common.util.concurrent.ThreadFactoryBuilder;
+
+public class TestHttp2ClientMultiThread extends AbstractTestHttp2Client {
+
+  private int requestCount = 10000;
+
+  private int concurrency = 10;
+
+  private ExecutorService executor = Executors.newFixedThreadPool(concurrency,
+    new ThreadFactoryBuilder().setNameFormat("Echo-Client-%d").setDaemon(true)
+        .build());
+
+  @Before
+  public void setUp() throws Exception {
+    start();
+  }
+
+  @After
+  public void tearDown() throws Exception {
+    stop();
+  }
+
+  private void testEcho() throws InterruptedException, ExecutionException,
+      IOException {
+    Http2StreamChannel stream = connect(false);
+    Http2DataReceiver receiver = stream.pipeline().get(Http2DataReceiver.class);
+    byte[] b = new byte[ThreadLocalRandom.current().nextInt(10, 100)];
+    ThreadLocalRandom.current().nextBytes(b);
+    stream.write(stream.alloc().buffer(b.length).writeBytes(b));
+    stream.writeAndFlush(LastHttp2Message.get());
+    assertEquals(receiver.waitForResponse().status(),
+      HttpResponseStatus.OK.codeAsText());
+    assertArrayEquals(b, ByteStreams.toByteArray(receiver.content()));
+  }
+
+  @Test
+  public void test() throws InterruptedException {
+    final AtomicBoolean succ = new AtomicBoolean(true);
+    for (int i = 0; i < requestCount; i++) {
+      executor.execute(new Runnable() {
+
+        @Override
+        public void run() {
+          try {
+            testEcho();
+          } catch (Throwable t) {
+            t.printStackTrace();
+            succ.set(false);
+          }
+        }
+      });
+    }
+    executor.shutdown();
+    assertTrue(executor.awaitTermination(5, TimeUnit.MINUTES));
+    assertTrue(succ.get());
+  }
+
+  @Override
+  protected void setHandler(Server server) {
+    server.setHandler(new EchoHandler());
+  }
+}
\ No newline at end of file
diff --git a/hadoop-hdfs-project/hadoop-hdfs/src/test/java/org/apache/hadoop/hdfs/web/http2/TestHttp2DataReceiver.java b/hadoop-hdfs-project/hadoop-hdfs/src/test/java/org/apache/hadoop/hdfs/web/http2/TestHttp2DataReceiver.java
new file mode 100644
index 0000000..601f7c9
--- /dev/null
+++ b/hadoop-hdfs-project/hadoop-hdfs/src/test/java/org/apache/hadoop/hdfs/web/http2/TestHttp2DataReceiver.java
@@ -0,0 +1,111 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.hadoop.hdfs.web.http2;
+
+import static org.junit.Assert.assertEquals;
+import io.netty.handler.codec.http.HttpResponseStatus;
+
+import java.io.File;
+import java.io.FileInputStream;
+import java.io.FileOutputStream;
+import java.io.IOException;
+import java.io.InputStream;
+import java.util.concurrent.ExecutionException;
+import java.util.concurrent.ThreadLocalRandom;
+
+import javax.servlet.ServletException;
+import javax.servlet.http.HttpServletRequest;
+import javax.servlet.http.HttpServletResponse;
+
+import org.eclipse.jetty.server.Request;
+import org.eclipse.jetty.server.Server;
+import org.eclipse.jetty.server.handler.AbstractHandler;
+import org.junit.After;
+import org.junit.Before;
+import org.junit.Test;
+
+import com.google.common.io.ByteStreams;
+import com.google.common.io.Files;
+
+public class TestHttp2DataReceiver extends AbstractTestHttp2Client {
+
+  private File largeFile = new File(".largeFile");
+
+  @Before
+  public void setUp() throws Exception {
+    byte[] b = new byte[64 * 1024];
+    try (FileOutputStream out = new FileOutputStream(largeFile)) {
+      for (int i = 0; i < 1024; i++) {
+        ThreadLocalRandom.current().nextBytes(b);
+        out.write(b);
+      }
+    }
+    largeFile.deleteOnExit();
+    start();
+  }
+
+  @After
+  public void tearDown() throws Exception {
+    stop();
+    largeFile.delete();
+  }
+
+  @Override
+  protected void setHandler(Server server) {
+    server.setHandler(new AbstractHandler() {
+
+      @Override
+      public void handle(String target, Request baseRequest,
+          HttpServletRequest request, HttpServletResponse response)
+          throws IOException, ServletException {
+        Files.copy(largeFile, response.getOutputStream());
+        response.getOutputStream().flush();
+      }
+    });
+  }
+
+  private void assertContentEquals(byte[] expected, byte[] actual, int length) {
+    for (int i = 0; i < length; i++) {
+      assertEquals("differ at index " + i + ", expected " + expected[i]
+          + ", actual " + actual[i], expected[i], actual[i]);
+    }
+  }
+
+  @Test
+  public void test() throws InterruptedException, ExecutionException,
+      IOException {
+    Http2StreamChannel stream = connect(true);
+    Http2DataReceiver receiver = stream.pipeline().get(Http2DataReceiver.class);
+    assertEquals(HttpResponseStatus.OK.codeAsText(), receiver.waitForResponse()
+        .status());
+    byte[] buf = new byte[4 * 1024];
+    byte[] fileBuf = new byte[buf.length];
+    try (InputStream in = receiver.content();
+        FileInputStream fileIn = new FileInputStream(largeFile)) {
+      for (;;) {
+        int read = in.read(buf);
+        if (read == -1) {
+          assertEquals(-1, fileIn.read());
+          break;
+        }
+        ByteStreams.readFully(fileIn, fileBuf, 0, read);
+        assertContentEquals(fileBuf, buf, read);
+      }
+    }
+  }
+}
diff --git a/hadoop-hdfs-project/hadoop-hdfs/src/test/java/org/apache/hadoop/hdfs/web/http2/TestHttp2Server.java b/hadoop-hdfs-project/hadoop-hdfs/src/test/java/org/apache/hadoop/hdfs/web/http2/TestHttp2Server.java
index 6a8495b..b6c197d 100644
--- a/hadoop-hdfs-project/hadoop-hdfs/src/test/java/org/apache/hadoop/hdfs/web/http2/TestHttp2Server.java
+++ b/hadoop-hdfs-project/hadoop-hdfs/src/test/java/org/apache/hadoop/hdfs/web/http2/TestHttp2Server.java
@@ -35,6 +35,7 @@
 import java.util.concurrent.ExecutionException;
 import java.util.concurrent.atomic.AtomicInteger;
 
+import org.apache.hadoop.conf.Configuration;
 import org.eclipse.jetty.http.HttpFields;
 import org.eclipse.jetty.http.HttpHeader;
 import org.eclipse.jetty.http.HttpMethod;
@@ -92,7 +93,7 @@
                       throws Exception {
                     ch.pipeline().addLast(new HelloWorldHandler());
                   }
-                }));
+                }, new Configuration()));
           }
 
         }).bind(0).syncUninterruptibly().channel();
@@ -137,4 +138,4 @@
     Thread.sleep(1000);
     assertEquals(2, handlerClosedCount.get());
   }
-}
+}
\ No newline at end of file
diff --git a/hadoop-hdfs-project/hadoop-hdfs/src/test/java/org/apache/hadoop/hdfs/web/http2/TestHttp2ServerMultiThread.java b/hadoop-hdfs-project/hadoop-hdfs/src/test/java/org/apache/hadoop/hdfs/web/http2/TestHttp2ServerMultiThread.java
index e583ca3..d2fdf0f 100644
--- a/hadoop-hdfs-project/hadoop-hdfs/src/test/java/org/apache/hadoop/hdfs/web/http2/TestHttp2ServerMultiThread.java
+++ b/hadoop-hdfs-project/hadoop-hdfs/src/test/java/org/apache/hadoop/hdfs/web/http2/TestHttp2ServerMultiThread.java
@@ -44,6 +44,7 @@
 import java.util.concurrent.atomic.AtomicBoolean;
 import java.util.concurrent.atomic.AtomicInteger;
 
+import org.apache.hadoop.conf.Configuration;
 import org.eclipse.jetty.http.HttpFields;
 import org.eclipse.jetty.http.HttpHeader;
 import org.eclipse.jetty.http.HttpMethod;
@@ -83,8 +84,7 @@
     @Override
     protected void channelRead0(ChannelHandlerContext ctx, ByteBuf msg)
         throws Exception {
-      ByteBuf out = msg.readBytes(msg.readableBytes());
-      ctx.writeAndFlush(out);
+      ctx.writeAndFlush(msg.readBytes(msg.readableBytes()));
     }
 
     @Override
@@ -133,7 +133,7 @@
                       throws Exception {
                     ch.pipeline().addLast(new DispatchHandler());
                   }
-                }));
+                }, new Configuration()));
           }
 
         }).bind(0).syncUninterruptibly().channel();
@@ -203,5 +203,4 @@
     Thread.sleep(1000);
     assertEquals(requestCount, handlerClosedCount.get());
   }
-
-}
+}
\ No newline at end of file
diff --git a/hadoop-project/pom.xml b/hadoop-project/pom.xml
index e70acca..1c2cfad 100644
--- a/hadoop-project/pom.xml
+++ b/hadoop-project/pom.xml
@@ -465,6 +465,11 @@
         <version>2.5</version>
       </dependency>
       <dependency>
+        <groupId>javax.servlet</groupId>
+        <artifactId>javax.servlet-api</artifactId>
+        <version>3.1.0</version>
+      </dependency>
+      <dependency>
         <groupId>org.mortbay.jetty</groupId>
         <artifactId>jetty</artifactId>
         <version>${jetty.version}</version>
@@ -585,14 +590,19 @@
       <dependency>
         <groupId>io.netty</groupId>
         <artifactId>netty-all</artifactId>
-        <version>4.1.0.Beta5</version>
+        <version>4.1.0.Beta6</version>
       </dependency>
 
       <dependency>
         <groupId>org.eclipse.jetty.http2</groupId>
         <artifactId>http2-client</artifactId>
         <version>9.3.0.M2</version>
-        <scope>test</scope>
+      </dependency>
+
+      <dependency>
+        <groupId>org.eclipse.jetty.http2</groupId>
+        <artifactId>http2-server</artifactId>
+        <version>9.3.0.M2</version>
       </dependency>
 
       <dependency>