blob: 3cc56598021d9a30d5ec8bc48bd798b0f6dc3bd0 [file] [log] [blame]
/**
* Licensed to the Apache Software Foundation (ASF) under one
* or more contributor license agreements. See the NOTICE file
* distributed with this work for additional information
* regarding copyright ownership. The ASF licenses this file
* to you under the Apache License, Version 2.0 (the
* "License"); you may not use this file except in compliance
* with the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/
package org.apache.hadoop.hdfs.ipc;
import java.io.IOException;
import java.util.HashMap;
import java.util.Map;
import org.apache.hadoop.classification.InterfaceAudience;
import org.apache.hadoop.ipc.RPC.RpcKind;
import org.apache.hadoop.ipc.RemoteException;
import org.apache.hadoop.ipc.protobuf.ProtobufRpcEngineProtos.RequestHeaderProto;
import org.apache.hadoop.ipc.protobuf.RpcHeaderProtos.RpcRequestHeaderProto;
import org.apache.hadoop.ipc.protobuf.RpcHeaderProtos.RpcRequestHeaderProto.OperationProto;
import org.apache.hadoop.ipc.protobuf.RpcHeaderProtos.RpcResponseHeaderProto;
import org.apache.hadoop.ipc.protobuf.RpcHeaderProtos.RpcResponseHeaderProto.RpcErrorCodeProto;
import org.apache.hadoop.ipc.protobuf.RpcHeaderProtos.RpcResponseHeaderProto.RpcStatusProto;
import org.apache.hadoop.util.ProtoUtil;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
import com.google.protobuf.CodedOutputStream;
import com.google.protobuf.Message;
import io.netty.buffer.ByteBuf;
import io.netty.buffer.ByteBufInputStream;
import io.netty.buffer.ByteBufOutputStream;
import io.netty.channel.ChannelDuplexHandler;
import io.netty.channel.ChannelHandlerContext;
import io.netty.channel.ChannelPromise;
@InterfaceAudience.Private
class RpcDuplexHandler extends ChannelDuplexHandler {
private static final Logger LOG =
LoggerFactory.getLogger(RpcDuplexHandler.class);
private final RpcConnection conn;
private final Map<Integer, Call> id2Call = new HashMap<>();
public RpcDuplexHandler(RpcConnection conn) {
this.conn = conn;
}
private void writeRequest(ChannelHandlerContext ctx, Call call,
ChannelPromise promise) throws IOException {
id2Call.put(call.getId(), call);
RpcRequestHeaderProto rpcHeader = ProtoUtil.makeRpcRequestHeader(
RpcKind.RPC_PROTOCOL_BUFFER, OperationProto.RPC_FINAL_PACKET,
call.getId(), 0, conn.rpcClient.getClientId());
int rpcHeaderSize = rpcHeader.getSerializedSize();
RequestHeaderProto requestHeader =
RequestHeaderProto.newBuilder().setMethodName(call.getMethodName())
.setDeclaringClassProtocolName(call.getProtocolName())
.setClientProtocolVersion(call.getProtocolVersion()).build();
int requestHeaderSize = requestHeader.getSerializedSize();
int totalSize = CodedOutputStream.computeRawVarint32Size(rpcHeaderSize) +
rpcHeaderSize +
CodedOutputStream.computeRawVarint32Size(requestHeaderSize) +
requestHeaderSize;
Message param = call.getParam();
if (param != null) {
int paramSize = param.getSerializedSize();
totalSize +=
CodedOutputStream.computeRawVarint32Size(paramSize) + paramSize;
}
ByteBufOutputStream out =
new ByteBufOutputStream(ctx.alloc().buffer(totalSize + 4));
out.writeInt(totalSize);
rpcHeader.writeDelimitedTo(out);
requestHeader.writeDelimitedTo(out);
if (param != null) {
param.writeDelimitedTo(out);
}
ctx.write(out.buffer(), promise);
}
@Override
public void write(ChannelHandlerContext ctx, Object msg,
ChannelPromise promise) throws Exception {
if (msg instanceof Call) {
writeRequest(ctx, (Call) msg, promise);
} else {
ctx.write(msg, promise);
}
}
private void readResponse(ChannelHandlerContext ctx, ByteBuf buf)
throws Exception {
ByteBufInputStream in = new ByteBufInputStream(buf);
RpcResponseHeaderProto header =
RpcResponseHeaderProto.parseDelimitedFrom(in);
int id = header.getCallId();
RpcStatusProto status = header.getStatus();
if (status != RpcStatusProto.SUCCESS) {
String exceptionClassName =
header.hasExceptionClassName() ? header.getExceptionClassName()
: "ServerDidNotSetExceptionClassName";
String errorMsg = header.hasErrorMsg() ? header.getErrorMsg()
: "ServerDidNotSetErrorMsg";
RpcErrorCodeProto errCode =
(header.hasErrorDetail() ? header.getErrorDetail() : null);
if (errCode == null) {
LOG.warn("Detailed error code not set by server on rpc error");
}
RemoteException re =
new RemoteException(exceptionClassName, errorMsg, errCode);
if (status == RpcStatusProto.ERROR) {
Call call = id2Call.remove(id);
call.setException(re);
} else if (status == RpcStatusProto.FATAL) {
exceptionCaught(ctx, re);
}
return;
}
Call call = id2Call.remove(id);
call.setResponse(call.getResponseDefaultType().getParserForType()
.parseDelimitedFrom(in));
}
@Override
public void channelRead(ChannelHandlerContext ctx, Object msg)
throws Exception {
if (msg instanceof ByteBuf) {
ByteBuf buf = (ByteBuf) msg;
try {
readResponse(ctx, buf);
} finally {
buf.release();
}
}
}
private void cleanupCalls(ChannelHandlerContext ctx, IOException error) {
for (Call call : id2Call.values()) {
call.setException(error);
}
id2Call.clear();
}
@Override
public void channelInactive(ChannelHandlerContext ctx) throws Exception {
if (!id2Call.isEmpty()) {
cleanupCalls(ctx, new IOException("Connection closed"));
}
conn.shutdown();
ctx.fireChannelInactive();
}
@Override
public void exceptionCaught(ChannelHandlerContext ctx, Throwable cause)
throws Exception {
if (!id2Call.isEmpty()) {
cleanupCalls(ctx, new IOException("Connection closed"));
}
conn.shutdown();
}
}