HDFS-8671 Add client support for HTTP/2 stream channels
diff --git a/hadoop-hdfs-project/hadoop-hdfs/pom.xml b/hadoop-hdfs-project/hadoop-hdfs/pom.xml
index 2cb1716..de6be26 100644
--- a/hadoop-hdfs-project/hadoop-hdfs/pom.xml
+++ b/hadoop-hdfs-project/hadoop-hdfs/pom.xml
@@ -199,6 +199,11 @@
<scope>test</scope>
</dependency>
<dependency>
+ <groupId>org.eclipse.jetty.http2</groupId>
+ <artifactId>http2-server</artifactId>
+ <scope>test</scope>
+ </dependency>
+ <dependency>
<groupId>org.apache.hadoop</groupId>
<artifactId>hadoop-kms</artifactId>
<classifier>classes</classifier>
@@ -221,6 +226,40 @@
<artifactId>mockserver-netty</artifactId>
<version>3.9.2</version>
<scope>test</scope>
+ <exclusions>
+ <exclusion>
+ <groupId>io.netty</groupId>
+ <artifactId>netty-buffer</artifactId>
+ </exclusion>
+ <exclusion>
+ <groupId>io.netty</groupId>
+ <artifactId>netty-codec</artifactId>
+ </exclusion>
+ <exclusion>
+ <groupId>io.netty</groupId>
+ <artifactId>netty-codec-http</artifactId>
+ </exclusion>
+ <exclusion>
+ <groupId>io.netty</groupId>
+ <artifactId>netty-common</artifactId>
+ </exclusion>
+ <exclusion>
+ <groupId>io.netty</groupId>
+ <artifactId>netty-handler</artifactId>
+ </exclusion>
+ <exclusion>
+ <groupId>io.netty</groupId>
+ <artifactId>netty-transport</artifactId>
+ </exclusion>
+ <exclusion>
+ <groupId>io.netty</groupId>
+ <artifactId>netty-codec-socks</artifactId>
+ </exclusion>
+ <exclusion>
+ <groupId>javax.servlet</groupId>
+ <artifactId>javax.servlet-api</artifactId>
+ </exclusion>
+ </exclusions>
</dependency>
<!-- 'mvn dependency:analyze' fails to detect use of this dependency -->
<dependency>
diff --git a/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/DFSConfigKeys.java b/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/DFSConfigKeys.java
index cb05fa9..0366ba5 100644
--- a/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/DFSConfigKeys.java
+++ b/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/DFSConfigKeys.java
@@ -1180,4 +1180,11 @@
@Deprecated
public static final long DFS_CLIENT_KEY_PROVIDER_CACHE_EXPIRY_DEFAULT =
HdfsClientConfigKeys.DFS_CLIENT_KEY_PROVIDER_CACHE_EXPIRY_DEFAULT;
+
+ // http2 related configs
+ public static final String DFS_HTTP2_INITIAL_WINDOW_SIZE = "dfs.http2.initial.windows.size";
+ public static final int DFS_HTTP2_INITIAL_WINDOW_SIZE_DEFAULT = 64 * 1024;
+
+ public static final String DFS_HTTP2_WINDOW_UPDATE_RATIO = "dfs.http2.window.update.ratio";
+ public static final float DFS_HTTP2_WINDOW_UPDATE_RATIO_DEFAULT = 0.5f;
}
diff --git a/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/datanode/web/PortUnificationServerHandler.java b/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/datanode/web/PortUnificationServerHandler.java
index e5c5256..ce8a2cc 100644
--- a/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/datanode/web/PortUnificationServerHandler.java
+++ b/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/datanode/web/PortUnificationServerHandler.java
@@ -24,6 +24,7 @@
import io.netty.handler.codec.ByteToMessageDecoder;
import io.netty.handler.codec.http.HttpServerCodec;
import io.netty.handler.codec.http2.Http2CodecUtil;
+import io.netty.handler.codec.http2.Http2Exception;
import io.netty.handler.stream.ChunkedWriteHandler;
import java.net.InetSocketAddress;
@@ -66,7 +67,7 @@
new URLDispatcher(proxyHost, conf, confForCreate));
}
- private void configureHttp2(ChannelHandlerContext ctx) {
+ private void configureHttp2(ChannelHandlerContext ctx) throws Http2Exception {
ctx.pipeline().addLast(
ServerHttp2ConnectionHandler.create(ctx.channel(),
new ChannelInitializer<Http2StreamChannel>() {
@@ -75,7 +76,7 @@
protected void initChannel(Http2StreamChannel ch) throws Exception {
ch.pipeline().addLast(new DtpChannelHandler());
}
- }));
+ }, conf));
}
@Override
diff --git a/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/web/http2/AbstractHttp2EventListener.java b/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/web/http2/AbstractHttp2EventListener.java
new file mode 100644
index 0000000..2ab2358
--- /dev/null
+++ b/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/web/http2/AbstractHttp2EventListener.java
@@ -0,0 +1,129 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.hadoop.hdfs.web.http2;
+
+import io.netty.buffer.ByteBuf;
+import io.netty.channel.Channel;
+import io.netty.channel.ChannelHandlerContext;
+import io.netty.handler.codec.http2.Http2Connection;
+import io.netty.handler.codec.http2.Http2Connection.PropertyKey;
+import io.netty.handler.codec.http2.Http2Error;
+import io.netty.handler.codec.http2.Http2EventAdapter;
+import io.netty.handler.codec.http2.Http2Exception;
+import io.netty.handler.codec.http2.Http2Headers;
+import io.netty.handler.codec.http2.Http2Stream;
+
+import java.util.concurrent.atomic.AtomicInteger;
+
+import org.apache.hadoop.classification.InterfaceAudience;
+
+/**
+ * Base class of an HTTP/2 FrameListener and EventListener to manage
+ * {@link Http2StreamChannel}s.
+ * <p>
+ * We do not handle onRstStreamRead here, a stream that being reset will also
+ * call onStreamClosed. The upper layer should not rely on a reset event.
+ */
+@InterfaceAudience.Private
+public abstract class AbstractHttp2EventListener extends Http2EventAdapter {
+
+ protected final Channel parentChannel;
+
+ protected final Http2Connection conn;
+
+ protected final PropertyKey subChannelPropKey;
+
+ protected final AtomicInteger numActiveStreams = new AtomicInteger(0);
+
+ protected AbstractHttp2EventListener(Channel parentChannel,
+ Http2Connection conn) {
+ this.parentChannel = parentChannel;
+ this.conn = conn;
+ this.subChannelPropKey = conn.newKey();
+ }
+
+ protected abstract void initChannelOnStreamActive(
+ Http2StreamChannel subChannel);
+
+ @Override
+ public void onStreamActive(final Http2Stream stream) {
+ numActiveStreams.incrementAndGet();
+ Http2StreamChannel subChannel =
+ new Http2StreamChannel(parentChannel, stream);
+ stream.setProperty(subChannelPropKey, subChannel);
+ initChannelOnStreamActive(subChannel);
+ }
+
+ @Override
+ public void onStreamClosed(Http2Stream stream) {
+ numActiveStreams.decrementAndGet();
+ Http2StreamChannel subChannel = stream.removeProperty(subChannelPropKey);
+ if (subChannel != null && subChannel.isRegistered()) {
+ subChannel.setClosed();
+ }
+ }
+
+ private Http2StreamChannel getSubChannel(int streamId) throws Http2Exception {
+ Http2StreamChannel subChannel =
+ conn.stream(streamId).getProperty(subChannelPropKey);
+ if (subChannel == null) {
+ throw Http2Exception.streamError(streamId, Http2Error.INTERNAL_ERROR,
+ "No sub channel found");
+ }
+ return subChannel;
+ }
+
+ private void writeInbound(int streamId, Object msg, boolean endOfStream,
+ int pendingBytes) throws Http2Exception {
+ Http2StreamChannel subChannel = getSubChannel(streamId);
+ subChannel.writeInbound(msg, pendingBytes);
+ if (endOfStream) {
+ subChannel.writeInbound(LastHttp2Message.get(), 0);
+ }
+ if (subChannel.config().isAutoRead()) {
+ subChannel.read();
+ }
+ }
+
+ @Override
+ public void onHeadersRead(ChannelHandlerContext ctx, int streamId,
+ Http2Headers headers, int padding, boolean endOfStream)
+ throws Http2Exception {
+ writeInbound(streamId, headers, endOfStream, 0);
+ }
+
+ @Override
+ public void onHeadersRead(ChannelHandlerContext ctx, int streamId,
+ Http2Headers headers, int streamDependency, short weight,
+ boolean exclusive, int padding, boolean endOfStream)
+ throws Http2Exception {
+ onHeadersRead(ctx, streamId, headers, padding, endOfStream);
+ }
+
+ @Override
+ public int onDataRead(ChannelHandlerContext ctx, int streamId, ByteBuf data,
+ int padding, boolean endOfStream) throws Http2Exception {
+ int pendingBytes = data.readableBytes() + padding;
+ writeInbound(streamId, data.retain(), endOfStream, pendingBytes);
+ return 0;
+ }
+
+ public int numActiveStreams() {
+ return numActiveStreams.get();
+ }
+}
diff --git a/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/web/http2/ByteBufferReadableInputStream.java b/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/web/http2/ByteBufferReadableInputStream.java
new file mode 100644
index 0000000..a54715a
--- /dev/null
+++ b/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/web/http2/ByteBufferReadableInputStream.java
@@ -0,0 +1,31 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.hadoop.hdfs.web.http2;
+
+import java.io.InputStream;
+
+import org.apache.hadoop.classification.InterfaceAudience;
+import org.apache.hadoop.fs.ByteBufferReadable;
+
+/**
+ * A place holder.
+ */
+@InterfaceAudience.Private
+public abstract class ByteBufferReadableInputStream extends InputStream
+ implements ByteBufferReadable {
+}
diff --git a/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/web/http2/ClientHttp2ConnectionHandler.java b/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/web/http2/ClientHttp2ConnectionHandler.java
new file mode 100644
index 0000000..42be10d
--- /dev/null
+++ b/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/web/http2/ClientHttp2ConnectionHandler.java
@@ -0,0 +1,146 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.hadoop.hdfs.web.http2;
+
+import io.netty.channel.Channel;
+import io.netty.channel.ChannelFuture;
+import io.netty.channel.ChannelFutureListener;
+import io.netty.channel.ChannelHandlerContext;
+import io.netty.channel.ChannelPromise;
+import io.netty.channel.ChannelPromiseAggregator;
+import io.netty.handler.codec.UnsupportedMessageTypeException;
+import io.netty.handler.codec.http2.DefaultHttp2Connection;
+import io.netty.handler.codec.http2.Http2Connection;
+import io.netty.handler.codec.http2.Http2ConnectionDecoder;
+import io.netty.handler.codec.http2.Http2ConnectionEncoder;
+import io.netty.handler.codec.http2.Http2ConnectionHandler;
+import io.netty.handler.codec.http2.Http2Exception;
+import io.netty.handler.codec.http2.Http2FrameLogger;
+import io.netty.handler.codec.http2.Http2Headers;
+import io.netty.handler.logging.LogLevel;
+import io.netty.util.concurrent.Promise;
+
+import java.util.concurrent.atomic.AtomicInteger;
+
+import org.apache.commons.logging.Log;
+import org.apache.commons.logging.LogFactory;
+import org.apache.hadoop.classification.InterfaceAudience;
+import org.apache.hadoop.conf.Configuration;
+
+/**
+ * An {@link Http2ConnectionHandler} used at client side.
+ */
+@InterfaceAudience.Private
+public class ClientHttp2ConnectionHandler extends Http2ConnectionHandler {
+
+ private static final Log LOG = LogFactory
+ .getLog(ClientHttp2ConnectionHandler.class);
+
+ private static final Http2FrameLogger FRAME_LOGGER = new Http2FrameLogger(
+ LogLevel.INFO, ClientHttp2ConnectionHandler.class);
+
+ private AtomicInteger nextStreamId = new AtomicInteger(3);
+
+ private final ClientHttp2EventListener listener;
+
+ private ClientHttp2ConnectionHandler(Http2ConnectionDecoder decoder,
+ Http2ConnectionEncoder encoder) {
+ super(decoder, encoder);
+ this.listener = (ClientHttp2EventListener) decoder.listener();
+ }
+
+ private int nextStreamId() {
+ return nextStreamId.getAndAdd(2);
+ }
+
+ private void writeHeaders(ChannelHandlerContext ctx,
+ Http2ConnectionEncoder encoder, final int streamId, Http2Headers headers,
+ boolean endStream, ChannelPromise promise,
+ final Promise<Http2StreamChannel> callback) {
+ encoder.writeHeaders(ctx, streamId, headers, 0, endStream, promise)
+ .addListener(new ChannelFutureListener() {
+
+ @Override
+ public void operationComplete(ChannelFuture future) throws Exception {
+ if (future.isSuccess()) {
+ callback
+ .setSuccess(connection().stream(streamId)
+ .<Http2StreamChannel> getProperty(
+ listener.subChannelPropKey));
+ } else {
+ callback.setFailure(future.cause());
+ }
+ }
+ });
+ }
+
+ @Override
+ public void write(ChannelHandlerContext ctx, Object msg,
+ ChannelPromise promise) throws Exception {
+ if (msg instanceof StartHttp2StreamRequest) {
+ final StartHttp2StreamRequest request = (StartHttp2StreamRequest) msg;
+ final int streamId = nextStreamId();
+ Http2ConnectionEncoder encoder = encoder();
+ if (request.data.isReadable()) {
+ ChannelPromiseAggregator aggregator =
+ new ChannelPromiseAggregator(promise);
+ ChannelPromise headerPromise = ctx.newPromise();
+ aggregator.add(headerPromise);
+ writeHeaders(ctx, encoder, streamId, request.headers, false,
+ headerPromise, request.promise);
+ ChannelPromise dataPromise = ctx.newPromise();
+ aggregator.add(dataPromise);
+ encoder.writeData(ctx, streamId, request.data, 0, request.endStream,
+ dataPromise);
+ } else {
+ writeHeaders(ctx, encoder, streamId, request.headers,
+ request.endStream, promise, request.promise);
+ }
+ } else {
+ throw new UnsupportedMessageTypeException(msg,
+ StartHttp2StreamRequest.class);
+ }
+ }
+
+ public int numActiveStreams() {
+ return listener.numActiveStreams();
+ }
+
+ public int currentStreamId() {
+ return nextStreamId.get();
+ }
+
+ private static final Http2Util.Http2ConnectionHandlerFactory<ClientHttp2ConnectionHandler> FACTORY =
+ new Http2Util.Http2ConnectionHandlerFactory<ClientHttp2ConnectionHandler>() {
+
+ @Override
+ public ClientHttp2ConnectionHandler create(
+ Http2ConnectionDecoder decoder, Http2ConnectionEncoder encoder) {
+ return new ClientHttp2ConnectionHandler(decoder, encoder);
+ }
+ };
+
+ public static ClientHttp2ConnectionHandler create(Channel channel,
+ Configuration conf) throws Http2Exception {
+ Http2Connection conn = new DefaultHttp2Connection(false);
+ ClientHttp2EventListener listener =
+ new ClientHttp2EventListener(channel, conn);
+ return Http2Util.create(conf, conn, listener, FACTORY,
+ LOG.isDebugEnabled() ? FRAME_LOGGER : null);
+ }
+}
diff --git a/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/web/http2/ClientHttp2EventListener.java b/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/web/http2/ClientHttp2EventListener.java
new file mode 100644
index 0000000..f4bbdba
--- /dev/null
+++ b/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/web/http2/ClientHttp2EventListener.java
@@ -0,0 +1,42 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.hadoop.hdfs.web.http2;
+
+import io.netty.channel.Channel;
+import io.netty.handler.codec.http2.Http2Connection;
+
+import org.apache.hadoop.classification.InterfaceAudience;
+
+/**
+ * An HTTP/2 FrameListener and EventListener to manage
+ * {@link Http2StreamChannel}s for client.
+ */
+@InterfaceAudience.Private
+public class ClientHttp2EventListener extends AbstractHttp2EventListener {
+
+ public ClientHttp2EventListener(Channel parentChannel, Http2Connection conn) {
+ super(parentChannel, conn);
+ }
+
+ @Override
+ protected void initChannelOnStreamActive(Http2StreamChannel subChannel) {
+ // disable read until pipeline initialized
+ subChannel.config().setAutoRead(false);
+ }
+
+}
diff --git a/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/web/http2/Http2DataReceiver.java b/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/web/http2/Http2DataReceiver.java
new file mode 100644
index 0000000..7cfbcce
--- /dev/null
+++ b/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/web/http2/Http2DataReceiver.java
@@ -0,0 +1,326 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.hadoop.hdfs.web.http2;
+
+import io.netty.buffer.ByteBuf;
+import io.netty.channel.Channel;
+import io.netty.channel.ChannelConfig;
+import io.netty.channel.ChannelFuture;
+import io.netty.channel.ChannelFutureListener;
+import io.netty.channel.ChannelHandlerContext;
+import io.netty.channel.ChannelInboundHandlerAdapter;
+import io.netty.handler.codec.http2.Http2Headers;
+import io.netty.handler.timeout.ReadTimeoutException;
+import io.netty.handler.timeout.ReadTimeoutHandler;
+
+import java.io.EOFException;
+import java.io.IOException;
+import java.io.InputStream;
+import java.io.InterruptedIOException;
+import java.nio.ByteBuffer;
+import java.util.ArrayDeque;
+import java.util.Deque;
+
+import org.apache.hadoop.classification.InterfaceAudience;
+
+/**
+ * A helper class that wrapper the HTTP/2 data frame as an {@link InputStream}.
+ * <p>
+ * Notice that, this classes can be used together with
+ * {@link ReadTimeoutHandler} to limit the waiting time when reading data.
+ */
+@InterfaceAudience.Private
+public class Http2DataReceiver extends ChannelInboundHandlerAdapter {
+
+ private static final Component END_OF_STREAM = new Component(null, 0);
+
+ private static final EOFException EOF = new EOFException();
+
+ private static final class Component {
+
+ public final ByteBuf buf;
+
+ public final int length;
+
+ public Component(ByteBuf buf) {
+ this(buf, buf.readableBytes());
+ }
+
+ public Component(ByteBuf buf, int length) {
+ this.buf = buf;
+ this.length = length;
+ }
+
+ }
+
+ private final Deque<Component> queue = new ArrayDeque<Component>();
+
+ private int queuedBytes;
+
+ private Channel channel;
+
+ private Throwable error;
+
+ private Http2Headers headers;
+
+ private final ByteBufferReadableInputStream contentInput =
+ new ByteBufferReadableInputStream() {
+
+ @Override
+ public int read() throws IOException {
+ Component comp = peekUntilAvailable();
+ if (comp == END_OF_STREAM) {
+ return -1;
+ }
+ int b = comp.buf.readByte() & 0xFF;
+ if (!comp.buf.isReadable()) {
+ removeHead();
+ }
+ return b;
+ }
+
+ @Override
+ public int read(byte[] b, int off, int len) throws IOException {
+ Component comp = peekUntilAvailable();
+ if (comp == END_OF_STREAM) {
+ return -1;
+ }
+ int bufReadableBytes = comp.buf.readableBytes();
+ if (len >= bufReadableBytes) {
+ comp.buf.readBytes(b, off, bufReadableBytes);
+ removeHead();
+ return bufReadableBytes;
+ } else {
+ comp.buf.readBytes(b, off, len);
+ return len;
+ }
+ }
+
+ @Override
+ public long skip(long n) throws IOException {
+ Component comp = peekUntilAvailable();
+ if (comp == END_OF_STREAM) {
+ return 0L;
+ }
+ int bufReadableBytes = comp.buf.readableBytes();
+ if (n >= bufReadableBytes) {
+ removeHead();
+ return bufReadableBytes;
+ } else {
+ comp.buf.skipBytes((int) n);
+ return n;
+ }
+ }
+
+ @Override
+ public int read(ByteBuffer bb) throws IOException {
+ Component comp = peekUntilAvailable();
+ if (comp == END_OF_STREAM) {
+ return -1;
+ }
+ int bbRemaining = bb.remaining();
+ int bufReadableBytes = comp.buf.readableBytes();
+ if (bbRemaining >= bufReadableBytes) {
+ int toRestoredLimit = bb.limit();
+ bb.limit(bb.position() + bufReadableBytes);
+ comp.buf.readBytes(bb);
+ bb.limit(toRestoredLimit);
+ removeHead();
+ return bufReadableBytes;
+ } else {
+ comp.buf.readBytes(bb);
+ return bbRemaining;
+ }
+ }
+
+ private boolean closed = false;
+
+ @Override
+ public void close() throws IOException {
+ if (closed) {
+ return;
+ }
+ synchronized (queue) {
+ if (error == null) {
+ error = EOF;
+ }
+ }
+ channel.close().addListener(new ChannelFutureListener() {
+
+ @Override
+ public void operationComplete(ChannelFuture future)
+ throws Exception {
+ synchronized (queue) {
+ for (Component c; (c = queue.peek()) != null;) {
+ if (c == END_OF_STREAM) {
+ return;
+ }
+ c.buf.release();
+ queue.remove();
+ }
+ }
+ }
+ });
+ }
+
+ };
+
+ @Override
+ public void channelRead(ChannelHandlerContext ctx, Object msg)
+ throws Exception {
+ if (msg == LastHttp2Message.get()) {
+ enqueue(END_OF_STREAM);
+ } else if (msg instanceof Http2Headers) {
+ synchronized (queue) {
+ headers = (Http2Headers) msg;
+ queue.notifyAll();
+ }
+ } else if (msg instanceof ByteBuf) {
+ ByteBuf buf = (ByteBuf) msg;
+ if (buf.isReadable()) {
+ enqueue(new Component(buf));
+ } else {
+ buf.release();
+ }
+ } else {
+ ctx.fireChannelRead(msg);
+ }
+ }
+
+ @Override
+ public void exceptionCaught(ChannelHandlerContext ctx, Throwable cause)
+ throws Exception {
+ synchronized (queue) {
+ if (error == null) {
+ error = cause;
+ queue.notifyAll();
+ }
+ }
+ }
+
+ @Override
+ public void channelActive(ChannelHandlerContext ctx) throws Exception {
+ channel = ctx.channel();
+ }
+
+ @Override
+ public void channelInactive(ChannelHandlerContext ctx) throws Exception {
+ synchronized (queue) {
+ if (error != null) {
+ return;
+ }
+ Component lastComp = queue.peekLast();
+ if (lastComp == END_OF_STREAM) {
+ return;
+ }
+ error = EOF;
+ notifyAll();
+ }
+ }
+
+ private void enqueue(Component comp) {
+ synchronized (queue) {
+ queuedBytes += comp.length;
+ if (queuedBytes >= channel.config().getWriteBufferHighWaterMark()) {
+ channel.config().setAutoRead(false);
+ }
+ queue.add(comp);
+ queue.notifyAll();
+ }
+ }
+
+ private Component peekUntilAvailable() throws IOException {
+ Throwable cause;
+ synchronized (queue) {
+ for (;;) {
+ if (!queue.isEmpty()) {
+ return queue.peek();
+ }
+ if (error != null) {
+ cause = error;
+ break;
+ }
+ try {
+ queue.wait();
+ } catch (InterruptedException e) {
+ throw new InterruptedIOException();
+ }
+ }
+ }
+ propagate(cause);
+ return null;
+ }
+
+ private void removeHead() {
+ Component comp;
+ synchronized (queue) {
+ comp = queue.remove();
+ queuedBytes -= comp.length;
+ ChannelConfig config = channel.config();
+ if (!config.isAutoRead()
+ && queuedBytes < config.getWriteBufferLowWaterMark()) {
+ config.setAutoRead(true);
+ }
+ }
+ comp.buf.release();
+ }
+
+ private void propagate(Throwable cause) throws IOException {
+ if (cause == ReadTimeoutException.INSTANCE) {
+ throw new IOException("Read timeout");
+ } else if (cause == EOF) {
+ throw new IOException("Stream reset by peer: " + channel.remoteAddress());
+ } else if (cause instanceof IOException) {
+ throw (IOException) cause;
+ } else if (cause instanceof RuntimeException) {
+ throw (RuntimeException) cause;
+ } else {
+ throw new IOException(cause);
+ }
+ }
+
+ public Http2Headers waitForResponse() throws IOException {
+ Throwable cause;
+ synchronized (queue) {
+ for (;;) {
+ if (error != null) {
+ cause = error;
+ break;
+ }
+ if (headers != null) {
+ return headers;
+ }
+ try {
+ queue.wait();
+ } catch (InterruptedException e) {
+ throw new InterruptedIOException();
+ }
+ }
+ }
+ propagate(cause);
+ return null;
+ }
+
+ /**
+ * The returned stream is not thread safe.
+ */
+ public ByteBufferReadableInputStream content() {
+ return contentInput;
+ }
+
+}
\ No newline at end of file
diff --git a/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/web/http2/Http2StreamBootstrap.java b/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/web/http2/Http2StreamBootstrap.java
new file mode 100644
index 0000000..4d0b32b
--- /dev/null
+++ b/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/web/http2/Http2StreamBootstrap.java
@@ -0,0 +1,160 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.hadoop.hdfs.web.http2;
+
+import io.netty.buffer.ByteBuf;
+import io.netty.buffer.Unpooled;
+import io.netty.channel.Channel;
+import io.netty.channel.ChannelFuture;
+import io.netty.channel.ChannelFutureListener;
+import io.netty.channel.ChannelHandler;
+import io.netty.channel.ChannelInitializer;
+import io.netty.channel.EventLoop;
+import io.netty.handler.codec.http2.Http2Headers;
+import io.netty.util.concurrent.Future;
+import io.netty.util.concurrent.FutureListener;
+import io.netty.util.concurrent.Promise;
+
+import org.apache.hadoop.classification.InterfaceAudience;
+
+import com.google.common.base.Preconditions;
+
+/**
+ * A {@link Http2StreamBootstrap} that makes it easy to bootstrap a
+ * {@link Http2StreamChannel} to use for clients.
+ * <p>
+ * Call connect when you finish set up other things to establish an
+ * {@link Http2StreamChannel}.
+ */
+@InterfaceAudience.Private
+public class Http2StreamBootstrap {
+ private Channel channel;
+
+ private Http2Headers headers;
+
+ private ByteBuf data = Unpooled.EMPTY_BUFFER;
+
+ private boolean endStream;
+
+ private ChannelHandler handler;
+
+ /**
+ * Set the {@link Channel} this HTTP/2 stream is running on.
+ */
+ public Http2StreamBootstrap channel(Channel channel) {
+ this.channel = channel;
+ return this;
+ }
+
+ /**
+ * Set the request headers.
+ */
+ public Http2StreamBootstrap headers(Http2Headers headers) {
+ this.headers = headers;
+ return this;
+ }
+
+ /**
+ * Set the request data.
+ * <p>
+ * This is used to avoid one context-switch if you only need to send a small
+ * piece of data.
+ */
+ public Http2StreamBootstrap data(ByteBuf data) {
+ this.data = data;
+ return this;
+ }
+
+ /**
+ * Set whether there is no data after the headers being sent.
+ * <p>
+ * Default is <tt>false </tt>which means you could still send data using the
+ * returned {@link Http2StreamChannel}.
+ */
+ public Http2StreamBootstrap endStream(boolean endStream) {
+ this.endStream = endStream;
+ return this;
+ }
+
+ /**
+ * The {@link ChannelHandler} which is used to serve request.
+ * <p>
+ * Typically, you should use a {@link ChannelInitializer} here.
+ */
+ public Http2StreamBootstrap handler(ChannelHandler handler) {
+ this.handler = handler;
+ return this;
+ }
+
+ /**
+ * Establish the {@link Http2StreamChannel}. You can get it with the returned
+ * {@link Future}.
+ */
+ public Future<Http2StreamChannel> connect() {
+ Preconditions.checkNotNull(headers);
+ Preconditions.checkNotNull(handler);
+ final Promise<Http2StreamChannel> registeredPromise =
+ channel.eventLoop().<Http2StreamChannel> newPromise();
+
+ final StartHttp2StreamRequest request =
+ new StartHttp2StreamRequest(headers, data, endStream, channel
+ .eventLoop().<Http2StreamChannel> newPromise()
+ .addListener(new FutureListener<Http2StreamChannel>() {
+
+ @Override
+ public void operationComplete(Future<Http2StreamChannel> future)
+ throws Exception {
+ if (future.isSuccess()) {
+ final Http2StreamChannel subChannel = future.get();
+ subChannel.pipeline().addFirst(handler);
+ channel.eventLoop().register(subChannel)
+ .addListener(new ChannelFutureListener() {
+
+ @Override
+ public void operationComplete(ChannelFuture future)
+ throws Exception {
+ if (future.isSuccess()) {
+ subChannel.config().setAutoRead(true);
+ registeredPromise.setSuccess(subChannel);
+ } else {
+ registeredPromise.setFailure(future.cause());
+ }
+ }
+ });
+ } else {
+ registeredPromise.setFailure(future.cause());
+ }
+ }
+
+ }));
+ EventLoop eventLoop = channel.eventLoop();
+ if (eventLoop.inEventLoop()) {
+ channel.writeAndFlush(request);
+ } else {
+ channel.eventLoop().execute(new Runnable() {
+
+ @Override
+ public void run() {
+ channel.writeAndFlush(request);
+ }
+ });
+ }
+
+ return registeredPromise;
+ }
+}
diff --git a/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/web/http2/Http2StreamChannel.java b/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/web/http2/Http2StreamChannel.java
index 658ffe4..ffaf3ef 100644
--- a/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/web/http2/Http2StreamChannel.java
+++ b/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/web/http2/Http2StreamChannel.java
@@ -18,9 +18,12 @@
package org.apache.hadoop.hdfs.web.http2;
import io.netty.buffer.ByteBuf;
+import io.netty.buffer.Unpooled;
import io.netty.channel.AbstractChannel;
import io.netty.channel.Channel;
import io.netty.channel.ChannelConfig;
+import io.netty.channel.ChannelFuture;
+import io.netty.channel.ChannelFutureListener;
import io.netty.channel.ChannelHandlerContext;
import io.netty.channel.ChannelMetadata;
import io.netty.channel.ChannelOutboundBuffer;
@@ -32,8 +35,11 @@
import io.netty.handler.codec.http2.Http2ConnectionEncoder;
import io.netty.handler.codec.http2.Http2ConnectionHandler;
import io.netty.handler.codec.http2.Http2Error;
+import io.netty.handler.codec.http2.Http2Exception;
import io.netty.handler.codec.http2.Http2Headers;
+import io.netty.handler.codec.http2.Http2LocalFlowController;
import io.netty.handler.codec.http2.Http2Stream;
+import io.netty.util.ReferenceCountUtil;
import io.netty.util.internal.InternalThreadLocalMap;
import java.net.SocketAddress;
@@ -64,10 +70,33 @@
private static final int MAX_READER_STACK_DEPTH = 8;
private final ChannelHandlerContext http2ConnHandlerCtx;
+
private final Http2Stream stream;
+
+ private final Http2LocalFlowController localFlowController;
+
private final Http2ConnectionEncoder encoder;
+
private final DefaultChannelConfig config;
- private final Queue<Object> inboundMessageQueue = new ArrayDeque<>();
+
+ private static final class InboundMessage {
+
+ public final Object msg;
+
+ public final int length;
+
+ public InboundMessage(Object msg, int length) {
+ this.msg = msg;
+ this.length = length;
+ }
+
+ }
+
+ private final Queue<InboundMessage> inboundMessageQueue = new ArrayDeque<>();
+
+ private boolean writePending = false;
+
+ private int pendingOutboundBytes;
private enum State {
OPEN, HALF_CLOSED_LOCAL, HALF_CLOSED_REMOTE, PRE_CLOSED, CLOSED
@@ -82,10 +111,16 @@
Http2ConnectionHandler connHandler =
(Http2ConnectionHandler) http2ConnHandlerCtx.handler();
this.stream = stream;
+ this.localFlowController =
+ connHandler.connection().local().flowController();
this.encoder = connHandler.encoder();
this.config = new DefaultChannelConfig(this);
}
+ public Http2Stream stream() {
+ return stream;
+ }
+
@Override
public ChannelConfig config() {
return config;
@@ -98,8 +133,6 @@
@Override
public boolean isActive() {
- // we create this channel after HTTP/2 stream active, so we do not have a
- // separated 'active' state.
return isOpen();
}
@@ -115,6 +148,10 @@
SocketAddress localAddress, ChannelPromise promise) {
throw new UnsupportedOperationException();
}
+
+ public void forceFlush() {
+ super.flush0();
+ }
}
@Override
@@ -149,7 +186,11 @@
@Override
protected void doClose() throws Exception {
- if (stream.state() != Http2Stream.State.CLOSED) {
+ for (InboundMessage msg; (msg = inboundMessageQueue.poll()) != null;) {
+ ReferenceCountUtil.release(msg.msg);
+ localFlowController.consumeBytes(stream, msg.length);
+ }
+ if (state != State.PRE_CLOSED) {
encoder.writeRstStream(http2ConnHandlerCtx, stream.id(),
Http2Error.INTERNAL_ERROR.code(), http2ConnHandlerCtx.newPromise());
}
@@ -157,36 +198,44 @@
}
private final Runnable readTask = new Runnable() {
-
@Override
public void run() {
ChannelPipeline pipeline = pipeline();
- int maxMessagesPerRead = config().getMaxMessagesPerRead();
- for (int i = 0; i < maxMessagesPerRead; i++) {
- Object m = inboundMessageQueue.poll();
- if (m == null) {
- break;
- }
- if (m == LastHttp2Message.get()) {
+ for (InboundMessage m; (m = inboundMessageQueue.poll()) != null;) {
+ if (m.msg == LastHttp2Message.get()) {
state =
state == State.HALF_CLOSED_LOCAL ? State.PRE_CLOSED
: State.HALF_CLOSED_REMOTE;
}
- pipeline.fireChannelRead(m);
+ try {
+ if (m.length > 0
+ && localFlowController.consumeBytes(stream, m.length)) {
+ http2ConnHandlerCtx.channel().flush();
+ }
+ } catch (Http2Exception e) {
+ // an Http2Exception at least means the stream is broken(maybe the
+ // whole connection), so we are out.
+ http2ConnHandlerCtx.pipeline().fireExceptionCaught(e);
+ return;
+ }
+ pipeline.fireChannelRead(m.msg);
}
pipeline.fireChannelReadComplete();
+ if (state == State.PRE_CLOSED) {
+ close();
+ }
}
};
@Override
protected void doBeginRead() throws Exception {
- State currentState = this.state;
- if (currentState == State.CLOSED) {
- throw new ClosedChannelException();
- }
if (inboundMessageQueue.isEmpty()) {
return;
}
+ State currentState = this.state;
+ if (remoteSideClosed(currentState)) {
+ throw new ClosedChannelException();
+ }
final InternalThreadLocalMap threadLocals = InternalThreadLocalMap.get();
final Integer stackDepth = threadLocals.localChannelReaderStackDepth();
if (stackDepth < MAX_READER_STACK_DEPTH) {
@@ -201,14 +250,25 @@
}
}
+ private void resumeWrite() {
+ writePending = false;
+ ((Http2Unsafe) unsafe()).forceFlush();
+ }
+
@Override
protected void doWrite(ChannelOutboundBuffer in) throws Exception {
State currentState = this.state;
- if (currentState == State.CLOSED) {
+ if (localSideClosed(currentState)) {
throw new ClosedChannelException();
}
+ int writeBufferHighWaterMark = config().getWriteBufferHighWaterMark();
+
boolean flush = false;
for (;;) {
+ if (pendingOutboundBytes >= writeBufferHighWaterMark) {
+ writePending = true;
+ break;
+ }
Object msg = in.current();
if (msg == null) {
break;
@@ -217,15 +277,30 @@
this.state =
currentState == State.HALF_CLOSED_REMOTE ? State.PRE_CLOSED
: State.HALF_CLOSED_LOCAL;
- encoder.writeData(http2ConnHandlerCtx, stream.id(), http2ConnHandlerCtx
- .alloc().buffer(0), 0, true, http2ConnHandlerCtx.newPromise());
+ encoder.writeData(http2ConnHandlerCtx, stream.id(),
+ Unpooled.EMPTY_BUFFER, 0, true, http2ConnHandlerCtx.newPromise());
} else if (msg instanceof Http2Headers) {
encoder.writeHeaders(http2ConnHandlerCtx, stream.id(),
(Http2Headers) msg, 0, false, http2ConnHandlerCtx.newPromise());
} else if (msg instanceof ByteBuf) {
ByteBuf data = (ByteBuf) msg;
+ final int pendingBytes = data.readableBytes();
+ pendingOutboundBytes += pendingBytes;
encoder.writeData(http2ConnHandlerCtx, stream.id(), data.retain(), 0,
- false, http2ConnHandlerCtx.newPromise());
+ false, http2ConnHandlerCtx.newPromise()).addListener(
+ new ChannelFutureListener() {
+
+ @Override
+ public void operationComplete(ChannelFuture future)
+ throws Exception {
+ pendingOutboundBytes -= pendingBytes;
+ if (writePending
+ && pendingOutboundBytes <= config()
+ .getWriteBufferLowWaterMark()) {
+ resumeWrite();
+ }
+ }
+ });
} else {
throw new UnsupportedMessageTypeException(msg, Http2Headers.class,
ByteBuf.class);
@@ -236,33 +311,53 @@
if (flush) {
http2ConnHandlerCtx.channel().flush();
}
+ if (state == State.PRE_CLOSED) {
+ close();
+ }
}
- /**
- * Append a message to the inbound queue of this channel. You need to call
- * {@link #read()} if you want to pass the message to handlers.
- */
- void writeInbound(Object msg) {
- inboundMessageQueue.add(msg);
+ public void writeInbound(Object msg, int length) {
+ inboundMessageQueue.add(new InboundMessage(msg, length));
}
private static final ImmutableSet<State> REMOTE_SIDE_CLOSED_STATES =
ImmutableSet.of(State.HALF_CLOSED_REMOTE, State.PRE_CLOSED, State.CLOSED);
- /**
- * @return true if remote side finishes sending data to us.
- */
public boolean remoteSideClosed() {
+ return remoteSideClosed(state);
+ }
+
+ private boolean remoteSideClosed(State state) {
return REMOTE_SIDE_CLOSED_STATES.contains(state);
}
private static final ImmutableSet<State> LOCAL_SIDE_CLOSED_STATES =
ImmutableSet.of(State.HALF_CLOSED_LOCAL, State.PRE_CLOSED, State.CLOSED);
- /**
- * @return true if we finish sending data to remote side.
- */
public boolean localSideClosed() {
+ return localSideClosed(state);
+ }
+
+ private boolean localSideClosed(State state) {
return LOCAL_SIDE_CLOSED_STATES.contains(state);
}
+
+ public void setClosed() {
+ State currentState = this.state;
+ if (!remoteSideClosed(currentState)) {
+ writeInbound(LastHttp2Message.get(), 0);
+ if (config().isAutoRead()) {
+ read();
+ }
+ }
+ currentState = this.state;
+ if (!localSideClosed(currentState)) {
+ this.state =
+ currentState == State.HALF_CLOSED_REMOTE ? State.PRE_CLOSED
+ : State.HALF_CLOSED_LOCAL;
+ if (currentState == State.PRE_CLOSED) {
+ close();
+ }
+ }
+ }
}
diff --git a/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/web/http2/Http2Util.java b/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/web/http2/Http2Util.java
new file mode 100644
index 0000000..dfe9c3e
--- /dev/null
+++ b/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/web/http2/Http2Util.java
@@ -0,0 +1,85 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.hadoop.hdfs.web.http2;
+
+import io.netty.handler.codec.http2.DefaultHttp2ConnectionDecoder;
+import io.netty.handler.codec.http2.DefaultHttp2ConnectionEncoder;
+import io.netty.handler.codec.http2.DefaultHttp2FrameReader;
+import io.netty.handler.codec.http2.DefaultHttp2FrameWriter;
+import io.netty.handler.codec.http2.DefaultHttp2LocalFlowController;
+import io.netty.handler.codec.http2.Http2CodecUtil;
+import io.netty.handler.codec.http2.Http2Connection;
+import io.netty.handler.codec.http2.Http2ConnectionDecoder;
+import io.netty.handler.codec.http2.Http2ConnectionEncoder;
+import io.netty.handler.codec.http2.Http2ConnectionHandler;
+import io.netty.handler.codec.http2.Http2EventAdapter;
+import io.netty.handler.codec.http2.Http2Exception;
+import io.netty.handler.codec.http2.Http2FrameLogger;
+import io.netty.handler.codec.http2.Http2FrameReader;
+import io.netty.handler.codec.http2.Http2FrameWriter;
+import io.netty.handler.codec.http2.Http2InboundFrameLogger;
+import io.netty.handler.codec.http2.Http2OutboundFrameLogger;
+
+import org.apache.hadoop.classification.InterfaceAudience;
+import org.apache.hadoop.conf.Configuration;
+import org.apache.hadoop.hdfs.DFSConfigKeys;
+
+/**
+ *
+ */
+@InterfaceAudience.Private
+public class Http2Util {
+ public interface Http2ConnectionHandlerFactory<T extends Http2ConnectionHandler> {
+ T create(Http2ConnectionDecoder decoder, Http2ConnectionEncoder encoder);
+ }
+
+ @SuppressWarnings("resource")
+ static <T extends Http2ConnectionHandler> T create(Configuration conf,
+ Http2Connection conn, Http2EventAdapter listener,
+ Http2ConnectionHandlerFactory<T> handlerFactory,
+ Http2FrameLogger frameLogger) throws Http2Exception {
+ conn.addListener(listener);
+ DefaultHttp2FrameReader rawFrameReader = new DefaultHttp2FrameReader();
+ DefaultHttp2FrameWriter rawFrameWriter = new DefaultHttp2FrameWriter();
+ rawFrameWriter.maxFrameSize(Http2CodecUtil.MAX_FRAME_SIZE_UPPER_BOUND);
+ Http2FrameReader frameReader;
+ Http2FrameWriter frameWriter;
+ if (frameLogger != null) {
+ frameReader = new Http2InboundFrameLogger(rawFrameReader, frameLogger);
+ frameWriter = new Http2OutboundFrameLogger(rawFrameWriter, frameLogger);
+ } else {
+ frameReader = rawFrameReader;
+ frameWriter = rawFrameWriter;
+ }
+ DefaultHttp2LocalFlowController localFlowController =
+ new DefaultHttp2LocalFlowController(conn, frameWriter, conf.getFloat(
+ DFSConfigKeys.DFS_HTTP2_WINDOW_UPDATE_RATIO,
+ DFSConfigKeys.DFS_HTTP2_WINDOW_UPDATE_RATIO_DEFAULT));
+ int initialWindowsSize =
+ conf.getInt(DFSConfigKeys.DFS_HTTP2_INITIAL_WINDOW_SIZE,
+ DFSConfigKeys.DFS_HTTP2_INITIAL_WINDOW_SIZE_DEFAULT);
+ localFlowController.initialWindowSize(initialWindowsSize);
+ conn.local().flowController(localFlowController);
+
+ DefaultHttp2ConnectionEncoder encoder =
+ new DefaultHttp2ConnectionEncoder(conn, frameWriter);
+ DefaultHttp2ConnectionDecoder decoder =
+ new DefaultHttp2ConnectionDecoder(conn, encoder, frameReader, listener);
+ return handlerFactory.create(decoder, encoder);
+ }
+}
diff --git a/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/web/http2/ServerHttp2ConnectionHandler.java b/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/web/http2/ServerHttp2ConnectionHandler.java
index 1ee733d..964fd0b 100644
--- a/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/web/http2/ServerHttp2ConnectionHandler.java
+++ b/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/web/http2/ServerHttp2ConnectionHandler.java
@@ -20,21 +20,18 @@
import io.netty.channel.Channel;
import io.netty.channel.ChannelInitializer;
import io.netty.handler.codec.http2.DefaultHttp2Connection;
-import io.netty.handler.codec.http2.DefaultHttp2FrameReader;
-import io.netty.handler.codec.http2.DefaultHttp2FrameWriter;
import io.netty.handler.codec.http2.Http2Connection;
+import io.netty.handler.codec.http2.Http2ConnectionDecoder;
+import io.netty.handler.codec.http2.Http2ConnectionEncoder;
import io.netty.handler.codec.http2.Http2ConnectionHandler;
-import io.netty.handler.codec.http2.Http2FrameListener;
+import io.netty.handler.codec.http2.Http2Exception;
import io.netty.handler.codec.http2.Http2FrameLogger;
-import io.netty.handler.codec.http2.Http2FrameReader;
-import io.netty.handler.codec.http2.Http2FrameWriter;
-import io.netty.handler.codec.http2.Http2InboundFrameLogger;
-import io.netty.handler.codec.http2.Http2OutboundFrameLogger;
import io.netty.handler.logging.LogLevel;
import org.apache.commons.logging.Log;
import org.apache.commons.logging.LogFactory;
import org.apache.hadoop.classification.InterfaceAudience;
+import org.apache.hadoop.conf.Configuration;
/**
* An {@link Http2ConnectionHandler} used at server side.
@@ -48,39 +45,36 @@
private static final Http2FrameLogger FRAME_LOGGER = new Http2FrameLogger(
LogLevel.INFO, ServerHttp2ConnectionHandler.class);
- private ServerHttp2ConnectionHandler(Http2Connection connection,
- Http2FrameReader frameReader, Http2FrameWriter frameWriter,
- Http2FrameListener listener) {
- super(connection, frameReader, frameWriter, listener);
+ private ServerHttp2ConnectionHandler(Http2ConnectionDecoder decoder,
+ Http2ConnectionEncoder encoder) {
+ super(decoder, encoder);
}
+ private static final Http2Util.Http2ConnectionHandlerFactory<ServerHttp2ConnectionHandler> FACTORY =
+ new Http2Util.Http2ConnectionHandlerFactory<ServerHttp2ConnectionHandler>() {
+
+ @Override
+ public ServerHttp2ConnectionHandler create(
+ Http2ConnectionDecoder decoder, Http2ConnectionEncoder encoder) {
+ return new ServerHttp2ConnectionHandler(decoder, encoder);
+ }
+ };
+
/**
* Create and initialize an {@link ServerHttp2ConnectionHandler}.
* @param channel
* @param initializer
- * @param verbose whether to log inbound and outbound HTTP/2 messages
+ * @param conf
* @return the initialized {@link ServerHttp2ConnectionHandler}
+ * @throws Http2Exception
*/
public static ServerHttp2ConnectionHandler create(Channel channel,
- ChannelInitializer<Http2StreamChannel> initializer) {
+ ChannelInitializer<Http2StreamChannel> initializer, Configuration conf)
+ throws Http2Exception {
Http2Connection conn = new DefaultHttp2Connection(true);
ServerHttp2EventListener listener =
new ServerHttp2EventListener(channel, conn, initializer);
- conn.addListener(listener);
- Http2FrameReader frameReader;
- Http2FrameWriter frameWriter;
- if (LOG.isDebugEnabled()) {
- frameReader =
- new Http2InboundFrameLogger(new DefaultHttp2FrameReader(),
- FRAME_LOGGER);
- frameWriter =
- new Http2OutboundFrameLogger(new DefaultHttp2FrameWriter(),
- FRAME_LOGGER);
- } else {
- frameReader = new DefaultHttp2FrameReader();
- frameWriter = new DefaultHttp2FrameWriter();
- }
- return new ServerHttp2ConnectionHandler(conn, frameReader, frameWriter,
- listener);
+ return Http2Util.create(conf, conn, listener, FACTORY,
+ LOG.isDebugEnabled() ? FRAME_LOGGER : null);
}
}
diff --git a/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/web/http2/ServerHttp2EventListener.java b/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/web/http2/ServerHttp2EventListener.java
index 72e3879..f58f6a3 100644
--- a/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/web/http2/ServerHttp2EventListener.java
+++ b/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/web/http2/ServerHttp2EventListener.java
@@ -17,17 +17,9 @@
*/
package org.apache.hadoop.hdfs.web.http2;
-import io.netty.buffer.ByteBuf;
import io.netty.channel.Channel;
-import io.netty.channel.ChannelHandlerContext;
import io.netty.channel.ChannelInitializer;
import io.netty.handler.codec.http2.Http2Connection;
-import io.netty.handler.codec.http2.Http2Connection.PropertyKey;
-import io.netty.handler.codec.http2.Http2Error;
-import io.netty.handler.codec.http2.Http2EventAdapter;
-import io.netty.handler.codec.http2.Http2Exception;
-import io.netty.handler.codec.http2.Http2Headers;
-import io.netty.handler.codec.http2.Http2Stream;
import io.netty.util.concurrent.Future;
import io.netty.util.concurrent.FutureListener;
@@ -35,35 +27,21 @@
/**
* An HTTP/2 FrameListener and EventListener to manage
- * {@link Http2StreamChannel}s.
- * <p>
- * We do not handle onRstStreamRead here, a stream that being reset will also
- * call onStreamClosed. The upper layer should not rely on a reset event.
+ * {@link Http2StreamChannel}s for server.
*/
@InterfaceAudience.Private
-public class ServerHttp2EventListener extends Http2EventAdapter {
-
- private final Channel parentChannel;
+public class ServerHttp2EventListener extends AbstractHttp2EventListener {
private final ChannelInitializer<Http2StreamChannel> subChannelInitializer;
- private final Http2Connection conn;
-
- private final PropertyKey subChannelPropKey;
-
public ServerHttp2EventListener(Channel parentChannel, Http2Connection conn,
ChannelInitializer<Http2StreamChannel> subChannelInitializer) {
- this.parentChannel = parentChannel;
- this.conn = conn;
+ super(parentChannel, conn);
this.subChannelInitializer = subChannelInitializer;
- this.subChannelPropKey = conn.newKey();
}
@Override
- public void onStreamActive(final Http2Stream stream) {
- Http2StreamChannel subChannel =
- new Http2StreamChannel(parentChannel, stream);
- stream.setProperty(subChannelPropKey, subChannel);
+ protected void initChannelOnStreamActive(final Http2StreamChannel subChannel) {
subChannel.pipeline().addFirst(subChannelInitializer);
parentChannel.eventLoop().register(subChannel)
.addListener(new FutureListener<Void>() {
@@ -71,65 +49,10 @@
@Override
public void operationComplete(Future<Void> future) throws Exception {
if (!future.isSuccess()) {
- stream.removeProperty(subChannelPropKey);
+ subChannel.stream().removeProperty(subChannelPropKey);
}
}
});
-
- }
-
- @Override
- public void onStreamClosed(Http2Stream stream) {
- Http2StreamChannel subChannel = stream.removeProperty(subChannelPropKey);
- if (subChannel != null) {
- subChannel.close();
- }
- }
-
- private Http2StreamChannel getSubChannel(int streamId) throws Http2Exception {
- Http2StreamChannel subChannel =
- conn.stream(streamId).getProperty(subChannelPropKey);
- if (subChannel == null) {
- throw Http2Exception.streamError(streamId, Http2Error.INTERNAL_ERROR,
- "No sub channel found");
- }
- return subChannel;
- }
-
- private void writeInbound(int streamId, Object msg, boolean endOfStream)
- throws Http2Exception {
- Http2StreamChannel subChannel = getSubChannel(streamId);
- subChannel.writeInbound(msg);
- if (endOfStream) {
- subChannel.writeInbound(LastHttp2Message.get());
- }
- if (subChannel.config().isAutoRead()) {
- subChannel.read();
- }
-
- }
-
- @Override
- public void onHeadersRead(ChannelHandlerContext ctx, int streamId,
- Http2Headers headers, int padding, boolean endOfStream)
- throws Http2Exception {
- writeInbound(streamId, headers, endOfStream);
- }
-
- @Override
- public void onHeadersRead(ChannelHandlerContext ctx, int streamId,
- Http2Headers headers, int streamDependency, short weight,
- boolean exclusive, int padding, boolean endOfStream)
- throws Http2Exception {
- onHeadersRead(ctx, streamId, headers, padding, endOfStream);
- }
-
- @Override
- public int onDataRead(ChannelHandlerContext ctx, int streamId, ByteBuf data,
- int padding, boolean endOfStream) throws Http2Exception {
- int pendingBytes = data.readableBytes() + padding;
- writeInbound(streamId, data.retain(), endOfStream);
- return pendingBytes;
}
}
diff --git a/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/web/http2/StartHttp2StreamRequest.java b/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/web/http2/StartHttp2StreamRequest.java
new file mode 100644
index 0000000..cda447b
--- /dev/null
+++ b/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/web/http2/StartHttp2StreamRequest.java
@@ -0,0 +1,48 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.hadoop.hdfs.web.http2;
+
+import io.netty.buffer.ByteBuf;
+import io.netty.handler.codec.http2.Http2Headers;
+import io.netty.util.concurrent.Promise;
+
+import org.apache.hadoop.classification.InterfaceAudience;
+
+/**
+ * Used by {@link Http2StreamBootstrap} to establish an
+ * {@link Http2StreamChannel}.
+ */
+@InterfaceAudience.Private
+class StartHttp2StreamRequest {
+
+ final Http2Headers headers;
+
+ final ByteBuf data;
+
+ final boolean endStream;
+
+ final Promise<Http2StreamChannel> promise;
+
+ StartHttp2StreamRequest(Http2Headers headers, ByteBuf data,
+ boolean endStream, Promise<Http2StreamChannel> promise) {
+ this.headers = headers;
+ this.data = data;
+ this.endStream = endStream;
+ this.promise = promise;
+ }
+}
diff --git a/hadoop-hdfs-project/hadoop-hdfs/src/test/java/org/apache/hadoop/hdfs/server/datanode/web/dtp/Http2ResponseHandler.java b/hadoop-hdfs-project/hadoop-hdfs/src/test/java/org/apache/hadoop/hdfs/server/datanode/web/dtp/Http2ResponseHandler.java
deleted file mode 100644
index 1e1acdd..0000000
--- a/hadoop-hdfs-project/hadoop-hdfs/src/test/java/org/apache/hadoop/hdfs/server/datanode/web/dtp/Http2ResponseHandler.java
+++ /dev/null
@@ -1,63 +0,0 @@
-/**
- * Licensed to the Apache Software Foundation (ASF) under one
- * or more contributor license agreements. See the NOTICE file
- * distributed with this work for additional information
- * regarding copyright ownership. The ASF licenses this file
- * to you under the Apache License, Version 2.0 (the
- * "License"); you may not use this file except in compliance
- * with the License. You may obtain a copy of the License at
- *
- * http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-package org.apache.hadoop.hdfs.server.datanode.web.dtp;
-
-import io.netty.channel.ChannelHandlerContext;
-import io.netty.channel.SimpleChannelInboundHandler;
-import io.netty.handler.codec.http.FullHttpResponse;
-import io.netty.handler.codec.http2.HttpUtil;
-import io.netty.util.concurrent.Promise;
-
-import java.util.concurrent.ConcurrentMap;
-
-import net.sf.ehcache.store.chm.ConcurrentHashMap;
-
-public class Http2ResponseHandler extends
- SimpleChannelInboundHandler<FullHttpResponse> {
-
- private ConcurrentMap<Integer, Promise<FullHttpResponse>> streamId2Promise =
- new ConcurrentHashMap<>();
-
- @Override
- protected void channelRead0(ChannelHandlerContext ctx, FullHttpResponse msg)
- throws Exception {
- Integer streamId =
- msg.headers().getInt(HttpUtil.ExtensionHeaderNames.STREAM_ID.text());
- if (streamId == null) {
- System.err.println("HttpResponseHandler unexpected message received: "
- + msg);
- return;
- }
- if (streamId.intValue() == 1) {
- // this is the upgrade response message, just ignore it.
- return;
- }
- Promise<FullHttpResponse> promise = streamId2Promise.get(streamId);
- if (promise == null) {
- System.err.println("Message received for unknown stream id " + streamId);
- } else {
- // Do stuff with the message (for now just print it)
- promise.setSuccess(msg.retain());
-
- }
- }
-
- public void put(Integer streamId, Promise<FullHttpResponse> promise) {
- streamId2Promise.put(streamId, promise);
- }
-}
\ No newline at end of file
diff --git a/hadoop-hdfs-project/hadoop-hdfs/src/test/java/org/apache/hadoop/hdfs/server/datanode/web/dtp/TestDtpHttp2.java b/hadoop-hdfs-project/hadoop-hdfs/src/test/java/org/apache/hadoop/hdfs/server/datanode/web/dtp/TestDtpHttp2.java
index eaa63a4..4d4ac37 100644
--- a/hadoop-hdfs-project/hadoop-hdfs/src/test/java/org/apache/hadoop/hdfs/server/datanode/web/dtp/TestDtpHttp2.java
+++ b/hadoop-hdfs-project/hadoop-hdfs/src/test/java/org/apache/hadoop/hdfs/server/datanode/web/dtp/TestDtpHttp2.java
@@ -19,35 +19,16 @@
import static org.junit.Assert.assertEquals;
import io.netty.bootstrap.Bootstrap;
-import io.netty.buffer.ByteBuf;
import io.netty.channel.Channel;
import io.netty.channel.ChannelInitializer;
import io.netty.channel.EventLoopGroup;
import io.netty.channel.nio.NioEventLoopGroup;
import io.netty.channel.socket.nio.NioSocketChannel;
-import io.netty.handler.codec.http.DefaultFullHttpRequest;
-import io.netty.handler.codec.http.FullHttpRequest;
-import io.netty.handler.codec.http.FullHttpResponse;
import io.netty.handler.codec.http.HttpMethod;
import io.netty.handler.codec.http.HttpResponseStatus;
-import io.netty.handler.codec.http.HttpVersion;
-import io.netty.handler.codec.http2.DefaultHttp2Connection;
-import io.netty.handler.codec.http2.DefaultHttp2FrameReader;
-import io.netty.handler.codec.http2.DefaultHttp2FrameWriter;
-import io.netty.handler.codec.http2.DelegatingDecompressorFrameListener;
-import io.netty.handler.codec.http2.Http2Connection;
-import io.netty.handler.codec.http2.Http2ConnectionHandler;
-import io.netty.handler.codec.http2.Http2FrameLogger;
-import io.netty.handler.codec.http2.Http2FrameReader;
-import io.netty.handler.codec.http2.Http2FrameWriter;
-import io.netty.handler.codec.http2.Http2InboundFrameLogger;
-import io.netty.handler.codec.http2.Http2OutboundFrameLogger;
-import io.netty.handler.codec.http2.HttpToHttp2ConnectionHandler;
-import io.netty.handler.codec.http2.HttpUtil;
-import io.netty.handler.codec.http2.InboundHttp2ToHttpAdapter;
-import io.netty.handler.logging.LogLevel;
+import io.netty.handler.codec.http2.DefaultHttp2Headers;
import io.netty.handler.timeout.TimeoutException;
-import io.netty.util.concurrent.Promise;
+import io.netty.util.AsciiString;
import java.io.IOException;
import java.net.URISyntaxException;
@@ -57,14 +38,17 @@
import org.apache.hadoop.conf.Configuration;
import org.apache.hadoop.hdfs.MiniDFSCluster;
import org.apache.hadoop.hdfs.web.WebHdfsTestUtil;
+import org.apache.hadoop.hdfs.web.http2.ClientHttp2ConnectionHandler;
+import org.apache.hadoop.hdfs.web.http2.Http2DataReceiver;
+import org.apache.hadoop.hdfs.web.http2.Http2StreamBootstrap;
+import org.apache.hadoop.hdfs.web.http2.Http2StreamChannel;
import org.junit.AfterClass;
import org.junit.BeforeClass;
import org.junit.Test;
-public class TestDtpHttp2 {
+import com.google.common.io.ByteStreams;
- private static final Http2FrameLogger FRAME_LOGGER = new Http2FrameLogger(
- LogLevel.INFO, TestDtpHttp2.class);
+public class TestDtpHttp2 {
private static final Configuration CONF = WebHdfsTestUtil.createConf();
@@ -74,16 +58,15 @@
private static Channel CHANNEL;
- private static Http2ResponseHandler RESPONSE_HANDLER;
+ private static Http2StreamChannel STREAM;
@BeforeClass
public static void setUp() throws IOException, URISyntaxException,
- TimeoutException {
+ TimeoutException, InterruptedException, ExecutionException {
CLUSTER = new MiniDFSCluster.Builder(CONF).numDataNodes(1).build();
CLUSTER.waitActive();
- RESPONSE_HANDLER = new Http2ResponseHandler();
- Bootstrap bootstrap =
+ CHANNEL =
new Bootstrap()
.group(WORKER_GROUP)
.channel(NioSocketChannel.class)
@@ -93,22 +76,31 @@
@Override
protected void initChannel(Channel ch) throws Exception {
- Http2Connection connection = new DefaultHttp2Connection(false);
- Http2ConnectionHandler connectionHandler =
- new HttpToHttp2ConnectionHandler(connection, frameReader(),
- frameWriter(), new DelegatingDecompressorFrameListener(
- connection, new InboundHttp2ToHttpAdapter.Builder(
- connection).maxContentLength(Integer.MAX_VALUE)
- .propagateSettings(true).build()));
- ch.pipeline().addLast(connectionHandler, RESPONSE_HANDLER);
+ ch.pipeline().addLast(
+ ClientHttp2ConnectionHandler.create(ch, CONF));
}
- });
- CHANNEL = bootstrap.connect().syncUninterruptibly().channel();
+ }).connect().syncUninterruptibly().channel();
+ STREAM =
+ new Http2StreamBootstrap()
+ .channel(CHANNEL)
+ .headers(
+ new DefaultHttp2Headers().method(
+ new AsciiString(HttpMethod.GET.name())).path(
+ new AsciiString("/"))).endStream(true)
+ .handler(new ChannelInitializer<Channel>() {
+ @Override
+ protected void initChannel(Channel ch) throws Exception {
+ ch.pipeline().addLast(new Http2DataReceiver());
+ }
+ }).connect().syncUninterruptibly().get();
}
@AfterClass
public static void tearDown() throws IOException {
+ if (STREAM != null) {
+ STREAM.close();
+ }
if (CHANNEL != null) {
CHANNEL.close().syncUninterruptibly();
}
@@ -118,28 +110,14 @@
}
}
- private static Http2FrameReader frameReader() {
- return new Http2InboundFrameLogger(new DefaultHttp2FrameReader(),
- FRAME_LOGGER);
- }
-
- private static Http2FrameWriter frameWriter() {
- return new Http2OutboundFrameLogger(new DefaultHttp2FrameWriter(),
- FRAME_LOGGER);
- }
-
@Test
- public void test() throws InterruptedException, ExecutionException {
- int streamId = 3;
- FullHttpRequest request =
- new DefaultFullHttpRequest(HttpVersion.HTTP_1_1, HttpMethod.GET, "/");
- request.headers().add(HttpUtil.ExtensionHeaderNames.STREAM_ID.text(),
- streamId);
- Promise<FullHttpResponse> promise = CHANNEL.eventLoop().newPromise();
- RESPONSE_HANDLER.put(streamId, promise);
- CHANNEL.writeAndFlush(request);
- assertEquals(HttpResponseStatus.OK, promise.get().status());
- ByteBuf content = promise.get().content();
- assertEquals("HTTP/2 DTP", content.toString(StandardCharsets.UTF_8));
+ public void test() throws InterruptedException, ExecutionException,
+ IOException {
+ Http2DataReceiver receiver = STREAM.pipeline().get(Http2DataReceiver.class);
+ assertEquals(HttpResponseStatus.OK.codeAsText(), receiver.waitForResponse()
+ .status());
+ assertEquals("HTTP/2 DTP",
+ new String(ByteStreams.toByteArray(receiver.content()),
+ StandardCharsets.UTF_8));
}
}
diff --git a/hadoop-hdfs-project/hadoop-hdfs/src/test/java/org/apache/hadoop/hdfs/web/http2/AbstractTestHttp2Client.java b/hadoop-hdfs-project/hadoop-hdfs/src/test/java/org/apache/hadoop/hdfs/web/http2/AbstractTestHttp2Client.java
new file mode 100644
index 0000000..d7c6d16
--- /dev/null
+++ b/hadoop-hdfs-project/hadoop-hdfs/src/test/java/org/apache/hadoop/hdfs/web/http2/AbstractTestHttp2Client.java
@@ -0,0 +1,132 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.hadoop.hdfs.web.http2;
+
+import io.netty.bootstrap.Bootstrap;
+import io.netty.channel.Channel;
+import io.netty.channel.ChannelInitializer;
+import io.netty.channel.EventLoopGroup;
+import io.netty.channel.nio.NioEventLoopGroup;
+import io.netty.channel.socket.nio.NioSocketChannel;
+import io.netty.handler.codec.http.HttpMethod;
+import io.netty.handler.codec.http2.DefaultHttp2Headers;
+import io.netty.util.ByteString;
+
+import java.io.IOException;
+import java.net.InetSocketAddress;
+import java.nio.charset.StandardCharsets;
+import java.util.concurrent.ExecutionException;
+
+import javax.servlet.ServletException;
+import javax.servlet.http.HttpServletRequest;
+import javax.servlet.http.HttpServletResponse;
+
+import org.apache.commons.io.IOUtils;
+import org.apache.hadoop.conf.Configuration;
+import org.eclipse.jetty.http2.server.HTTP2CServerConnectionFactory;
+import org.eclipse.jetty.server.HttpConfiguration;
+import org.eclipse.jetty.server.Request;
+import org.eclipse.jetty.server.Server;
+import org.eclipse.jetty.server.ServerConnector;
+import org.eclipse.jetty.server.handler.AbstractHandler;
+
+public abstract class AbstractTestHttp2Client {
+
+ protected EventLoopGroup workerGroup = new NioEventLoopGroup();
+
+ protected Server server;
+
+ protected final class EchoHandler extends AbstractHandler {
+
+ @Override
+ public void handle(String target, Request baseRequest,
+ HttpServletRequest request, HttpServletResponse response)
+ throws IOException, ServletException {
+ byte[] msg = IOUtils.toByteArray(request.getInputStream());
+ response.getOutputStream().write(msg);
+ response.getOutputStream().flush();
+ }
+
+ }
+
+ protected Channel channel;
+
+ protected void start() throws Exception {
+ server = new Server();
+ ServerConnector connector =
+ new ServerConnector(server, new HTTP2CServerConnectionFactory(
+ new HttpConfiguration()));
+ connector.setPort(0);
+ server.addConnector(connector);
+ setHandler(server);
+ server.start();
+ channel =
+ new Bootstrap()
+ .group(workerGroup)
+ .channel(NioSocketChannel.class)
+ .handler(new ChannelInitializer<Channel>() {
+
+ @Override
+ protected void initChannel(Channel ch) throws Exception {
+ ch.pipeline().addLast(
+ ClientHttp2ConnectionHandler.create(ch, new Configuration()));
+ }
+
+ })
+ .connect(
+ new InetSocketAddress("127.0.0.1", connector.getLocalPort()))
+ .sync().channel();
+ }
+
+ protected void stop() throws Exception {
+ if (channel != null) {
+ channel.close();
+ }
+ if (server != null) {
+ server.stop();
+ }
+ workerGroup.shutdownGracefully();
+ }
+
+ protected Http2StreamChannel connect(boolean endStream)
+ throws InterruptedException, ExecutionException {
+ return new Http2StreamBootstrap()
+ .channel(channel)
+ .handler(new ChannelInitializer<Http2StreamChannel>() {
+
+ @Override
+ protected void initChannel(Http2StreamChannel ch) throws Exception {
+ ch.pipeline().addLast(new Http2DataReceiver());
+ }
+
+ })
+ .headers(
+ new DefaultHttp2Headers()
+ .method(
+ new ByteString(HttpMethod.GET.name(), StandardCharsets.UTF_8))
+ .path(new ByteString("/", StandardCharsets.UTF_8))
+ .scheme(new ByteString("http", StandardCharsets.UTF_8))
+ .authority(
+ new ByteString("127.0.0.1:"
+ + ((InetSocketAddress) channel.remoteAddress()).getPort(),
+ StandardCharsets.UTF_8))).endStream(endStream).connect()
+ .sync().get();
+ }
+
+ protected abstract void setHandler(Server server);
+}
\ No newline at end of file
diff --git a/hadoop-hdfs-project/hadoop-hdfs/src/test/java/org/apache/hadoop/hdfs/web/http2/TestHttp2Client.java b/hadoop-hdfs-project/hadoop-hdfs/src/test/java/org/apache/hadoop/hdfs/web/http2/TestHttp2Client.java
new file mode 100644
index 0000000..ab3f6d4
--- /dev/null
+++ b/hadoop-hdfs-project/hadoop-hdfs/src/test/java/org/apache/hadoop/hdfs/web/http2/TestHttp2Client.java
@@ -0,0 +1,65 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.hadoop.hdfs.web.http2;
+
+import static org.junit.Assert.assertEquals;
+import io.netty.handler.codec.http.HttpResponseStatus;
+
+import java.io.IOException;
+import java.nio.charset.StandardCharsets;
+import java.util.concurrent.ExecutionException;
+
+import org.eclipse.jetty.server.Server;
+import org.junit.After;
+import org.junit.Before;
+import org.junit.Test;
+
+import com.google.common.io.ByteStreams;
+
+public class TestHttp2Client extends AbstractTestHttp2Client {
+
+ @Before
+ public void setUp() throws Exception {
+ start();
+ }
+
+ @After
+ public void tearDown() throws Exception {
+ stop();
+ }
+
+ @Test
+ public void test() throws InterruptedException, ExecutionException,
+ IOException {
+ Http2StreamChannel stream = connect(false);
+ Http2DataReceiver receiver = stream.pipeline().get(Http2DataReceiver.class);
+ stream.write(stream.alloc().buffer()
+ .writeBytes("Hello World".getBytes(StandardCharsets.UTF_8)));
+ stream.writeAndFlush(LastHttp2Message.get());
+ assertEquals(receiver.waitForResponse().status(),
+ HttpResponseStatus.OK.codeAsText());
+ assertEquals("Hello World",
+ new String(ByteStreams.toByteArray(receiver.content()),
+ StandardCharsets.UTF_8));
+ }
+
+ @Override
+ protected void setHandler(Server server) {
+ server.setHandler(new EchoHandler());
+ }
+}
\ No newline at end of file
diff --git a/hadoop-hdfs-project/hadoop-hdfs/src/test/java/org/apache/hadoop/hdfs/web/http2/TestHttp2ClientMultiThread.java b/hadoop-hdfs-project/hadoop-hdfs/src/test/java/org/apache/hadoop/hdfs/web/http2/TestHttp2ClientMultiThread.java
new file mode 100644
index 0000000..1c483c1
--- /dev/null
+++ b/hadoop-hdfs-project/hadoop-hdfs/src/test/java/org/apache/hadoop/hdfs/web/http2/TestHttp2ClientMultiThread.java
@@ -0,0 +1,100 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.hadoop.hdfs.web.http2;
+
+import static org.junit.Assert.assertArrayEquals;
+import static org.junit.Assert.assertEquals;
+import static org.junit.Assert.assertTrue;
+import io.netty.handler.codec.http.HttpResponseStatus;
+
+import java.io.IOException;
+import java.util.concurrent.ExecutionException;
+import java.util.concurrent.ExecutorService;
+import java.util.concurrent.Executors;
+import java.util.concurrent.ThreadLocalRandom;
+import java.util.concurrent.TimeUnit;
+import java.util.concurrent.atomic.AtomicBoolean;
+
+import org.eclipse.jetty.server.Server;
+import org.junit.After;
+import org.junit.Before;
+import org.junit.Test;
+
+import com.google.common.io.ByteStreams;
+import com.google.common.util.concurrent.ThreadFactoryBuilder;
+
+public class TestHttp2ClientMultiThread extends AbstractTestHttp2Client {
+
+ private int requestCount = 10000;
+
+ private int concurrency = 10;
+
+ private ExecutorService executor = Executors.newFixedThreadPool(concurrency,
+ new ThreadFactoryBuilder().setNameFormat("Echo-Client-%d").setDaemon(true)
+ .build());
+
+ @Before
+ public void setUp() throws Exception {
+ start();
+ }
+
+ @After
+ public void tearDown() throws Exception {
+ stop();
+ }
+
+ private void testEcho() throws InterruptedException, ExecutionException,
+ IOException {
+ Http2StreamChannel stream = connect(false);
+ Http2DataReceiver receiver = stream.pipeline().get(Http2DataReceiver.class);
+ byte[] b = new byte[ThreadLocalRandom.current().nextInt(10, 100)];
+ ThreadLocalRandom.current().nextBytes(b);
+ stream.write(stream.alloc().buffer(b.length).writeBytes(b));
+ stream.writeAndFlush(LastHttp2Message.get());
+ assertEquals(receiver.waitForResponse().status(),
+ HttpResponseStatus.OK.codeAsText());
+ assertArrayEquals(b, ByteStreams.toByteArray(receiver.content()));
+ }
+
+ @Test
+ public void test() throws InterruptedException {
+ final AtomicBoolean succ = new AtomicBoolean(true);
+ for (int i = 0; i < requestCount; i++) {
+ executor.execute(new Runnable() {
+
+ @Override
+ public void run() {
+ try {
+ testEcho();
+ } catch (Throwable t) {
+ t.printStackTrace();
+ succ.set(false);
+ }
+ }
+ });
+ }
+ executor.shutdown();
+ assertTrue(executor.awaitTermination(5, TimeUnit.MINUTES));
+ assertTrue(succ.get());
+ }
+
+ @Override
+ protected void setHandler(Server server) {
+ server.setHandler(new EchoHandler());
+ }
+}
\ No newline at end of file
diff --git a/hadoop-hdfs-project/hadoop-hdfs/src/test/java/org/apache/hadoop/hdfs/web/http2/TestHttp2DataReceiver.java b/hadoop-hdfs-project/hadoop-hdfs/src/test/java/org/apache/hadoop/hdfs/web/http2/TestHttp2DataReceiver.java
new file mode 100644
index 0000000..601f7c9
--- /dev/null
+++ b/hadoop-hdfs-project/hadoop-hdfs/src/test/java/org/apache/hadoop/hdfs/web/http2/TestHttp2DataReceiver.java
@@ -0,0 +1,111 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.hadoop.hdfs.web.http2;
+
+import static org.junit.Assert.assertEquals;
+import io.netty.handler.codec.http.HttpResponseStatus;
+
+import java.io.File;
+import java.io.FileInputStream;
+import java.io.FileOutputStream;
+import java.io.IOException;
+import java.io.InputStream;
+import java.util.concurrent.ExecutionException;
+import java.util.concurrent.ThreadLocalRandom;
+
+import javax.servlet.ServletException;
+import javax.servlet.http.HttpServletRequest;
+import javax.servlet.http.HttpServletResponse;
+
+import org.eclipse.jetty.server.Request;
+import org.eclipse.jetty.server.Server;
+import org.eclipse.jetty.server.handler.AbstractHandler;
+import org.junit.After;
+import org.junit.Before;
+import org.junit.Test;
+
+import com.google.common.io.ByteStreams;
+import com.google.common.io.Files;
+
+public class TestHttp2DataReceiver extends AbstractTestHttp2Client {
+
+ private File largeFile = new File(".largeFile");
+
+ @Before
+ public void setUp() throws Exception {
+ byte[] b = new byte[64 * 1024];
+ try (FileOutputStream out = new FileOutputStream(largeFile)) {
+ for (int i = 0; i < 1024; i++) {
+ ThreadLocalRandom.current().nextBytes(b);
+ out.write(b);
+ }
+ }
+ largeFile.deleteOnExit();
+ start();
+ }
+
+ @After
+ public void tearDown() throws Exception {
+ stop();
+ largeFile.delete();
+ }
+
+ @Override
+ protected void setHandler(Server server) {
+ server.setHandler(new AbstractHandler() {
+
+ @Override
+ public void handle(String target, Request baseRequest,
+ HttpServletRequest request, HttpServletResponse response)
+ throws IOException, ServletException {
+ Files.copy(largeFile, response.getOutputStream());
+ response.getOutputStream().flush();
+ }
+ });
+ }
+
+ private void assertContentEquals(byte[] expected, byte[] actual, int length) {
+ for (int i = 0; i < length; i++) {
+ assertEquals("differ at index " + i + ", expected " + expected[i]
+ + ", actual " + actual[i], expected[i], actual[i]);
+ }
+ }
+
+ @Test
+ public void test() throws InterruptedException, ExecutionException,
+ IOException {
+ Http2StreamChannel stream = connect(true);
+ Http2DataReceiver receiver = stream.pipeline().get(Http2DataReceiver.class);
+ assertEquals(HttpResponseStatus.OK.codeAsText(), receiver.waitForResponse()
+ .status());
+ byte[] buf = new byte[4 * 1024];
+ byte[] fileBuf = new byte[buf.length];
+ try (InputStream in = receiver.content();
+ FileInputStream fileIn = new FileInputStream(largeFile)) {
+ for (;;) {
+ int read = in.read(buf);
+ if (read == -1) {
+ assertEquals(-1, fileIn.read());
+ break;
+ }
+ ByteStreams.readFully(fileIn, fileBuf, 0, read);
+ assertContentEquals(fileBuf, buf, read);
+ }
+ }
+ }
+}
diff --git a/hadoop-hdfs-project/hadoop-hdfs/src/test/java/org/apache/hadoop/hdfs/web/http2/TestHttp2Server.java b/hadoop-hdfs-project/hadoop-hdfs/src/test/java/org/apache/hadoop/hdfs/web/http2/TestHttp2Server.java
index 6a8495b..b6c197d 100644
--- a/hadoop-hdfs-project/hadoop-hdfs/src/test/java/org/apache/hadoop/hdfs/web/http2/TestHttp2Server.java
+++ b/hadoop-hdfs-project/hadoop-hdfs/src/test/java/org/apache/hadoop/hdfs/web/http2/TestHttp2Server.java
@@ -35,6 +35,7 @@
import java.util.concurrent.ExecutionException;
import java.util.concurrent.atomic.AtomicInteger;
+import org.apache.hadoop.conf.Configuration;
import org.eclipse.jetty.http.HttpFields;
import org.eclipse.jetty.http.HttpHeader;
import org.eclipse.jetty.http.HttpMethod;
@@ -92,7 +93,7 @@
throws Exception {
ch.pipeline().addLast(new HelloWorldHandler());
}
- }));
+ }, new Configuration()));
}
}).bind(0).syncUninterruptibly().channel();
@@ -137,4 +138,4 @@
Thread.sleep(1000);
assertEquals(2, handlerClosedCount.get());
}
-}
+}
\ No newline at end of file
diff --git a/hadoop-hdfs-project/hadoop-hdfs/src/test/java/org/apache/hadoop/hdfs/web/http2/TestHttp2ServerMultiThread.java b/hadoop-hdfs-project/hadoop-hdfs/src/test/java/org/apache/hadoop/hdfs/web/http2/TestHttp2ServerMultiThread.java
index e583ca3..d2fdf0f 100644
--- a/hadoop-hdfs-project/hadoop-hdfs/src/test/java/org/apache/hadoop/hdfs/web/http2/TestHttp2ServerMultiThread.java
+++ b/hadoop-hdfs-project/hadoop-hdfs/src/test/java/org/apache/hadoop/hdfs/web/http2/TestHttp2ServerMultiThread.java
@@ -44,6 +44,7 @@
import java.util.concurrent.atomic.AtomicBoolean;
import java.util.concurrent.atomic.AtomicInteger;
+import org.apache.hadoop.conf.Configuration;
import org.eclipse.jetty.http.HttpFields;
import org.eclipse.jetty.http.HttpHeader;
import org.eclipse.jetty.http.HttpMethod;
@@ -83,8 +84,7 @@
@Override
protected void channelRead0(ChannelHandlerContext ctx, ByteBuf msg)
throws Exception {
- ByteBuf out = msg.readBytes(msg.readableBytes());
- ctx.writeAndFlush(out);
+ ctx.writeAndFlush(msg.readBytes(msg.readableBytes()));
}
@Override
@@ -133,7 +133,7 @@
throws Exception {
ch.pipeline().addLast(new DispatchHandler());
}
- }));
+ }, new Configuration()));
}
}).bind(0).syncUninterruptibly().channel();
@@ -203,5 +203,4 @@
Thread.sleep(1000);
assertEquals(requestCount, handlerClosedCount.get());
}
-
-}
+}
\ No newline at end of file
diff --git a/hadoop-project/pom.xml b/hadoop-project/pom.xml
index e70acca..1c2cfad 100644
--- a/hadoop-project/pom.xml
+++ b/hadoop-project/pom.xml
@@ -465,6 +465,11 @@
<version>2.5</version>
</dependency>
<dependency>
+ <groupId>javax.servlet</groupId>
+ <artifactId>javax.servlet-api</artifactId>
+ <version>3.1.0</version>
+ </dependency>
+ <dependency>
<groupId>org.mortbay.jetty</groupId>
<artifactId>jetty</artifactId>
<version>${jetty.version}</version>
@@ -585,14 +590,19 @@
<dependency>
<groupId>io.netty</groupId>
<artifactId>netty-all</artifactId>
- <version>4.1.0.Beta5</version>
+ <version>4.1.0.Beta6</version>
</dependency>
<dependency>
<groupId>org.eclipse.jetty.http2</groupId>
<artifactId>http2-client</artifactId>
<version>9.3.0.M2</version>
- <scope>test</scope>
+ </dependency>
+
+ <dependency>
+ <groupId>org.eclipse.jetty.http2</groupId>
+ <artifactId>http2-server</artifactId>
+ <version>9.3.0.M2</version>
</dependency>
<dependency>