| /** |
| * Licensed to the Apache Software Foundation (ASF) under one |
| * or more contributor license agreements. See the NOTICE file |
| * distributed with this work for additional information |
| * regarding copyright ownership. The ASF licenses this file |
| * to you under the Apache License, Version 2.0 (the |
| * "License"); you may not use this file except in compliance |
| * with the License. You may obtain a copy of the License at |
| * |
| * http://www.apache.org/licenses/LICENSE-2.0 |
| * |
| * Unless required by applicable law or agreed to in writing, software |
| * distributed under the License is distributed on an "AS IS" BASIS, |
| * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. |
| * See the License for the specific language governing permissions and |
| * limitations under the License. |
| */ |
| package org.apache.hadoop.hbase.ipc; |
| |
| import io.opentelemetry.context.Scope; |
| import java.io.IOException; |
| import java.util.HashMap; |
| import java.util.Map; |
| import org.apache.hadoop.hbase.CellScanner; |
| import org.apache.hadoop.hbase.codec.Codec; |
| import org.apache.hadoop.hbase.exceptions.ConnectionClosedException; |
| import org.apache.hadoop.io.compress.CompressionCodec; |
| import org.apache.hadoop.ipc.RemoteException; |
| import org.apache.yetus.audience.InterfaceAudience; |
| import org.slf4j.Logger; |
| import org.slf4j.LoggerFactory; |
| |
| import org.apache.hbase.thirdparty.com.google.protobuf.Message; |
| import org.apache.hbase.thirdparty.com.google.protobuf.Message.Builder; |
| import org.apache.hbase.thirdparty.com.google.protobuf.TextFormat; |
| import org.apache.hbase.thirdparty.io.netty.buffer.ByteBuf; |
| import org.apache.hbase.thirdparty.io.netty.buffer.ByteBufInputStream; |
| import org.apache.hbase.thirdparty.io.netty.buffer.ByteBufOutputStream; |
| import org.apache.hbase.thirdparty.io.netty.channel.ChannelDuplexHandler; |
| import org.apache.hbase.thirdparty.io.netty.channel.ChannelFuture; |
| import org.apache.hbase.thirdparty.io.netty.channel.ChannelHandlerContext; |
| import org.apache.hbase.thirdparty.io.netty.channel.ChannelPromise; |
| import org.apache.hbase.thirdparty.io.netty.handler.timeout.IdleStateEvent; |
| import org.apache.hbase.thirdparty.io.netty.util.concurrent.PromiseCombiner; |
| |
| import org.apache.hadoop.hbase.shaded.protobuf.generated.RPCProtos.CellBlockMeta; |
| import org.apache.hadoop.hbase.shaded.protobuf.generated.RPCProtos.ExceptionResponse; |
| import org.apache.hadoop.hbase.shaded.protobuf.generated.RPCProtos.RequestHeader; |
| import org.apache.hadoop.hbase.shaded.protobuf.generated.RPCProtos.ResponseHeader; |
| |
| /** |
| * The netty rpc handler. |
| * @since 2.0.0 |
| */ |
| @InterfaceAudience.Private |
| class NettyRpcDuplexHandler extends ChannelDuplexHandler { |
| |
| private static final Logger LOG = LoggerFactory.getLogger(NettyRpcDuplexHandler.class); |
| |
| private final NettyRpcConnection conn; |
| |
| private final CellBlockBuilder cellBlockBuilder; |
| |
| private final Codec codec; |
| |
| private final CompressionCodec compressor; |
| |
| private final Map<Integer, Call> id2Call = new HashMap<>(); |
| |
| public NettyRpcDuplexHandler(NettyRpcConnection conn, CellBlockBuilder cellBlockBuilder, |
| Codec codec, CompressionCodec compressor) { |
| this.conn = conn; |
| this.cellBlockBuilder = cellBlockBuilder; |
| this.codec = codec; |
| this.compressor = compressor; |
| |
| } |
| |
| private void writeRequest(ChannelHandlerContext ctx, Call call, ChannelPromise promise) |
| throws IOException { |
| id2Call.put(call.id, call); |
| ByteBuf cellBlock = cellBlockBuilder.buildCellBlock(codec, compressor, call.cells, ctx.alloc()); |
| CellBlockMeta cellBlockMeta; |
| if (cellBlock != null) { |
| CellBlockMeta.Builder cellBlockMetaBuilder = CellBlockMeta.newBuilder(); |
| cellBlockMetaBuilder.setLength(cellBlock.writerIndex()); |
| cellBlockMeta = cellBlockMetaBuilder.build(); |
| } else { |
| cellBlockMeta = null; |
| } |
| RequestHeader requestHeader = IPCUtil.buildRequestHeader(call, cellBlockMeta); |
| int sizeWithoutCellBlock = IPCUtil.getTotalSizeWhenWrittenDelimited(requestHeader, call.param); |
| int totalSize = cellBlock != null ? sizeWithoutCellBlock + cellBlock.writerIndex() |
| : sizeWithoutCellBlock; |
| ByteBuf buf = ctx.alloc().buffer(sizeWithoutCellBlock + 4); |
| buf.writeInt(totalSize); |
| try (ByteBufOutputStream bbos = new ByteBufOutputStream(buf)) { |
| requestHeader.writeDelimitedTo(bbos); |
| if (call.param != null) { |
| call.param.writeDelimitedTo(bbos); |
| } |
| if (cellBlock != null) { |
| ChannelPromise withoutCellBlockPromise = ctx.newPromise(); |
| ctx.write(buf, withoutCellBlockPromise); |
| ChannelPromise cellBlockPromise = ctx.newPromise(); |
| ctx.write(cellBlock, cellBlockPromise); |
| PromiseCombiner combiner = new PromiseCombiner(ctx.executor()); |
| combiner.addAll((ChannelFuture) withoutCellBlockPromise, cellBlockPromise); |
| combiner.finish(promise); |
| } else { |
| ctx.write(buf, promise); |
| } |
| } |
| } |
| |
| @Override |
| public void write(ChannelHandlerContext ctx, Object msg, ChannelPromise promise) |
| throws Exception { |
| if (msg instanceof Call) { |
| Call call = (Call) msg; |
| try (Scope scope = call.span.makeCurrent()) { |
| writeRequest(ctx, call, promise); |
| } |
| } else { |
| ctx.write(msg, promise); |
| } |
| } |
| |
| private void readResponse(ChannelHandlerContext ctx, ByteBuf buf) throws IOException { |
| int totalSize = buf.readInt(); |
| ByteBufInputStream in = new ByteBufInputStream(buf); |
| ResponseHeader responseHeader = ResponseHeader.parseDelimitedFrom(in); |
| int id = responseHeader.getCallId(); |
| if (LOG.isTraceEnabled()) { |
| LOG.trace("got response header " + TextFormat.shortDebugString(responseHeader) |
| + ", totalSize: " + totalSize + " bytes"); |
| } |
| RemoteException remoteExc; |
| if (responseHeader.hasException()) { |
| ExceptionResponse exceptionResponse = responseHeader.getException(); |
| remoteExc = IPCUtil.createRemoteException(exceptionResponse); |
| if (IPCUtil.isFatalConnectionException(exceptionResponse)) { |
| // Here we will cleanup all calls so do not need to fall back, just return. |
| exceptionCaught(ctx, remoteExc); |
| return; |
| } |
| } else { |
| remoteExc = null; |
| } |
| Call call = id2Call.remove(id); |
| if (call == null) { |
| // So we got a response for which we have no corresponding 'call' here on the client-side. |
| // We probably timed out waiting, cleaned up all references, and now the server decides |
| // to return a response. There is nothing we can do w/ the response at this stage. Clean |
| // out the wire of the response so its out of the way and we can get other responses on |
| // this connection. |
| if (LOG.isDebugEnabled()) { |
| int readSoFar = IPCUtil.getTotalSizeWhenWrittenDelimited(responseHeader); |
| int whatIsLeftToRead = totalSize - readSoFar; |
| LOG.debug("Unknown callId: " + id + ", skipping over this response of " + whatIsLeftToRead |
| + " bytes"); |
| } |
| return; |
| } |
| if (remoteExc != null) { |
| call.setException(remoteExc); |
| return; |
| } |
| Message value; |
| if (call.responseDefaultType != null) { |
| Builder builder = call.responseDefaultType.newBuilderForType(); |
| builder.mergeDelimitedFrom(in); |
| value = builder.build(); |
| } else { |
| value = null; |
| } |
| CellScanner cellBlockScanner; |
| if (responseHeader.hasCellBlockMeta()) { |
| int size = responseHeader.getCellBlockMeta().getLength(); |
| // Maybe we could read directly from the ByteBuf. |
| // The problem here is that we do not know when to release it. |
| byte[] cellBlock = new byte[size]; |
| buf.readBytes(cellBlock); |
| cellBlockScanner = cellBlockBuilder.createCellScanner(this.codec, this.compressor, cellBlock); |
| } else { |
| cellBlockScanner = null; |
| } |
| call.setResponse(value, cellBlockScanner); |
| } |
| |
| @Override |
| public void channelRead(ChannelHandlerContext ctx, Object msg) throws Exception { |
| if (msg instanceof ByteBuf) { |
| ByteBuf buf = (ByteBuf) msg; |
| try { |
| readResponse(ctx, buf); |
| } finally { |
| buf.release(); |
| } |
| } else { |
| super.channelRead(ctx, msg); |
| } |
| } |
| |
| private void cleanupCalls(ChannelHandlerContext ctx, IOException error) { |
| for (Call call : id2Call.values()) { |
| call.setException(error); |
| } |
| id2Call.clear(); |
| } |
| |
| @Override |
| public void channelInactive(ChannelHandlerContext ctx) throws Exception { |
| if (!id2Call.isEmpty()) { |
| cleanupCalls(ctx, new ConnectionClosedException("Connection closed")); |
| } |
| conn.shutdown(); |
| ctx.fireChannelInactive(); |
| } |
| |
| @Override |
| public void exceptionCaught(ChannelHandlerContext ctx, Throwable cause) { |
| if (!id2Call.isEmpty()) { |
| cleanupCalls(ctx, IPCUtil.toIOE(cause)); |
| } |
| conn.shutdown(); |
| } |
| |
| @Override |
| public void userEventTriggered(ChannelHandlerContext ctx, Object evt) throws Exception { |
| if (evt instanceof IdleStateEvent) { |
| IdleStateEvent idleEvt = (IdleStateEvent) evt; |
| switch (idleEvt.state()) { |
| case WRITER_IDLE: |
| if (id2Call.isEmpty()) { |
| if (LOG.isTraceEnabled()) { |
| LOG.trace("shutdown connection to " + conn.remoteId().address |
| + " because idle for a long time"); |
| } |
| // It may happen that there are still some pending calls in the event loop queue and |
| // they will get a closed channel exception. But this is not a big deal as it rarely |
| // rarely happens and the upper layer could retry immediately. |
| conn.shutdown(); |
| } |
| break; |
| default: |
| LOG.warn("Unrecognized idle state " + idleEvt.state()); |
| break; |
| } |
| } else if (evt instanceof CallEvent) { |
| // just remove the call for now until we add other call event other than timeout and cancel. |
| id2Call.remove(((CallEvent) evt).call.id); |
| } else { |
| ctx.fireUserEventTriggered(evt); |
| } |
| } |
| } |