blob: 42be10db57bf70f8fe0f3957f8ccf79298c1e7e6 [file] [log] [blame]
/**
* Licensed to the Apache Software Foundation (ASF) under one
* or more contributor license agreements. See the NOTICE file
* distributed with this work for additional information
* regarding copyright ownership. The ASF licenses this file
* to you under the Apache License, Version 2.0 (the
* "License"); you may not use this file except in compliance
* with the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/
package org.apache.hadoop.hdfs.web.http2;
import io.netty.channel.Channel;
import io.netty.channel.ChannelFuture;
import io.netty.channel.ChannelFutureListener;
import io.netty.channel.ChannelHandlerContext;
import io.netty.channel.ChannelPromise;
import io.netty.channel.ChannelPromiseAggregator;
import io.netty.handler.codec.UnsupportedMessageTypeException;
import io.netty.handler.codec.http2.DefaultHttp2Connection;
import io.netty.handler.codec.http2.Http2Connection;
import io.netty.handler.codec.http2.Http2ConnectionDecoder;
import io.netty.handler.codec.http2.Http2ConnectionEncoder;
import io.netty.handler.codec.http2.Http2ConnectionHandler;
import io.netty.handler.codec.http2.Http2Exception;
import io.netty.handler.codec.http2.Http2FrameLogger;
import io.netty.handler.codec.http2.Http2Headers;
import io.netty.handler.logging.LogLevel;
import io.netty.util.concurrent.Promise;
import java.util.concurrent.atomic.AtomicInteger;
import org.apache.commons.logging.Log;
import org.apache.commons.logging.LogFactory;
import org.apache.hadoop.classification.InterfaceAudience;
import org.apache.hadoop.conf.Configuration;
/**
* An {@link Http2ConnectionHandler} used at client side.
*/
@InterfaceAudience.Private
public class ClientHttp2ConnectionHandler extends Http2ConnectionHandler {
private static final Log LOG = LogFactory
.getLog(ClientHttp2ConnectionHandler.class);
private static final Http2FrameLogger FRAME_LOGGER = new Http2FrameLogger(
LogLevel.INFO, ClientHttp2ConnectionHandler.class);
private AtomicInteger nextStreamId = new AtomicInteger(3);
private final ClientHttp2EventListener listener;
private ClientHttp2ConnectionHandler(Http2ConnectionDecoder decoder,
Http2ConnectionEncoder encoder) {
super(decoder, encoder);
this.listener = (ClientHttp2EventListener) decoder.listener();
}
private int nextStreamId() {
return nextStreamId.getAndAdd(2);
}
private void writeHeaders(ChannelHandlerContext ctx,
Http2ConnectionEncoder encoder, final int streamId, Http2Headers headers,
boolean endStream, ChannelPromise promise,
final Promise<Http2StreamChannel> callback) {
encoder.writeHeaders(ctx, streamId, headers, 0, endStream, promise)
.addListener(new ChannelFutureListener() {
@Override
public void operationComplete(ChannelFuture future) throws Exception {
if (future.isSuccess()) {
callback
.setSuccess(connection().stream(streamId)
.<Http2StreamChannel> getProperty(
listener.subChannelPropKey));
} else {
callback.setFailure(future.cause());
}
}
});
}
@Override
public void write(ChannelHandlerContext ctx, Object msg,
ChannelPromise promise) throws Exception {
if (msg instanceof StartHttp2StreamRequest) {
final StartHttp2StreamRequest request = (StartHttp2StreamRequest) msg;
final int streamId = nextStreamId();
Http2ConnectionEncoder encoder = encoder();
if (request.data.isReadable()) {
ChannelPromiseAggregator aggregator =
new ChannelPromiseAggregator(promise);
ChannelPromise headerPromise = ctx.newPromise();
aggregator.add(headerPromise);
writeHeaders(ctx, encoder, streamId, request.headers, false,
headerPromise, request.promise);
ChannelPromise dataPromise = ctx.newPromise();
aggregator.add(dataPromise);
encoder.writeData(ctx, streamId, request.data, 0, request.endStream,
dataPromise);
} else {
writeHeaders(ctx, encoder, streamId, request.headers,
request.endStream, promise, request.promise);
}
} else {
throw new UnsupportedMessageTypeException(msg,
StartHttp2StreamRequest.class);
}
}
public int numActiveStreams() {
return listener.numActiveStreams();
}
public int currentStreamId() {
return nextStreamId.get();
}
private static final Http2Util.Http2ConnectionHandlerFactory<ClientHttp2ConnectionHandler> FACTORY =
new Http2Util.Http2ConnectionHandlerFactory<ClientHttp2ConnectionHandler>() {
@Override
public ClientHttp2ConnectionHandler create(
Http2ConnectionDecoder decoder, Http2ConnectionEncoder encoder) {
return new ClientHttp2ConnectionHandler(decoder, encoder);
}
};
public static ClientHttp2ConnectionHandler create(Channel channel,
Configuration conf) throws Http2Exception {
Http2Connection conn = new DefaultHttp2Connection(false);
ClientHttp2EventListener listener =
new ClientHttp2EventListener(channel, conn);
return Http2Util.create(conf, conn, listener, FACTORY,
LOG.isDebugEnabled() ? FRAME_LOGGER : null);
}
}