| /* |
| * Licensed to the Apache Software Foundation (ASF) under one |
| * or more contributor license agreements. See the NOTICE file |
| * distributed with this work for additional information |
| * regarding copyright ownership. The ASF licenses this file |
| * to you under the Apache License, Version 2.0 (the |
| * "License"); you may not use this file except in compliance |
| * with the License. You may obtain a copy of the License at |
| * |
| * http://www.apache.org/licenses/LICENSE-2.0 |
| * |
| * Unless required by applicable law or agreed to in writing, software |
| * distributed under the License is distributed on an "AS IS" BASIS, |
| * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. |
| * See the License for the specific language governing permissions and |
| * limitations under the License. |
| */ |
| package org.apache.ratis.grpc; |
| |
| import org.apache.ratis.protocol.RaftClientReply; |
| import org.apache.ratis.protocol.exceptions.ServerNotReadyException; |
| import org.apache.ratis.protocol.exceptions.TimeoutIOException; |
| import org.apache.ratis.thirdparty.io.grpc.ManagedChannel; |
| import org.apache.ratis.thirdparty.io.grpc.Metadata; |
| import org.apache.ratis.thirdparty.io.grpc.Status; |
| import org.apache.ratis.thirdparty.io.grpc.StatusRuntimeException; |
| import org.apache.ratis.thirdparty.io.grpc.stub.StreamObserver; |
| import org.apache.ratis.util.IOUtils; |
| import org.apache.ratis.util.JavaUtils; |
| import org.apache.ratis.util.LogUtils; |
| import org.apache.ratis.util.ReflectionUtils; |
| import org.apache.ratis.util.function.CheckedSupplier; |
| import org.slf4j.Logger; |
| import org.slf4j.LoggerFactory; |
| |
| import java.io.IOException; |
| import java.util.concurrent.CompletableFuture; |
| import java.util.concurrent.TimeUnit; |
| import java.util.function.Function; |
| import java.util.function.Supplier; |
| |
| public interface GrpcUtil { |
| static final Logger LOG = LoggerFactory.getLogger(GrpcUtil.class); |
| |
| Metadata.Key<String> EXCEPTION_TYPE_KEY = |
| Metadata.Key.of("exception-type", Metadata.ASCII_STRING_MARSHALLER); |
| Metadata.Key<byte[]> EXCEPTION_OBJECT_KEY = |
| Metadata.Key.of("exception-object-bin", Metadata.BINARY_BYTE_MARSHALLER); |
| Metadata.Key<String> CALL_ID = |
| Metadata.Key.of("call-id", Metadata.ASCII_STRING_MARSHALLER); |
| Metadata.Key<String> HEARTBEAT = |
| Metadata.Key.of("heartbeat", Metadata.ASCII_STRING_MARSHALLER); |
| |
| static StatusRuntimeException wrapException(Throwable t) { |
| return wrapException(t, -1); |
| } |
| |
| static StatusRuntimeException wrapException(Throwable t, long callId) { |
| t = JavaUtils.unwrapCompletionException(t); |
| Metadata trailers = new StatusRuntimeExceptionMetadataBuilder(t) |
| .addCallId(callId) |
| .build(); |
| return wrapException(t, trailers); |
| } |
| |
| static StatusRuntimeException wrapException(Throwable t, long callId, boolean isHeartbeat) { |
| t = JavaUtils.unwrapCompletionException(t); |
| Metadata trailers = new StatusRuntimeExceptionMetadataBuilder(t) |
| .addCallId(callId) |
| .addIsHeartbeat(isHeartbeat) |
| .build(); |
| return wrapException(t, trailers); |
| } |
| |
| static StatusRuntimeException wrapException(Throwable t, Metadata trailers) { |
| return new StatusRuntimeException( |
| Status.INTERNAL.withCause(t).withDescription(t.getMessage()), trailers); |
| } |
| |
| static Throwable unwrapThrowable(Throwable t) { |
| if (t instanceof StatusRuntimeException) { |
| final IOException ioe = tryUnwrapException((StatusRuntimeException)t); |
| if (ioe != null) { |
| return ioe; |
| } |
| } |
| return t; |
| } |
| |
| static IOException unwrapException(StatusRuntimeException se) { |
| final IOException ioe = tryUnwrapException(se); |
| return ioe != null? ioe: new IOException(se); |
| } |
| |
| static IOException tryUnwrapException(StatusRuntimeException se) { |
| final Status status = se.getStatus(); |
| if (status != null && status.getCode() == Status.Code.DEADLINE_EXCEEDED) { |
| return new TimeoutIOException(status.getDescription(), se); |
| } |
| |
| final Metadata trailers = se.getTrailers(); |
| if (trailers == null) { |
| return null; |
| } |
| |
| final byte[] bytes = trailers.get(EXCEPTION_OBJECT_KEY); |
| if (bytes != null) { |
| try { |
| return IOUtils.bytes2Object(bytes, IOException.class); |
| } catch (Exception e) { |
| se.addSuppressed(e); |
| } |
| } |
| |
| if (status != null) { |
| final String className = trailers.get(EXCEPTION_TYPE_KEY); |
| if (className != null) { |
| try { |
| final Class<? extends Throwable> clazz = Class.forName(className).asSubclass(Throwable.class); |
| final Throwable unwrapped = ReflectionUtils.instantiateException(clazz, status.getDescription()); |
| return IOUtils.asIOException(unwrapped.initCause(se)); |
| } catch (Throwable e) { |
| se.addSuppressed(e); |
| return new IOException(se); |
| } |
| } |
| } |
| return null; |
| } |
| |
| static long getCallId(Throwable t) { |
| if (t instanceof StatusRuntimeException) { |
| final Metadata trailers = ((StatusRuntimeException)t).getTrailers(); |
| String callId = trailers.get(CALL_ID); |
| return callId != null ? Integer.parseInt(callId) : -1; |
| } |
| return -1; |
| } |
| |
| static boolean isHeartbeat(Throwable t) { |
| if (t instanceof StatusRuntimeException) { |
| final Metadata trailers = ((StatusRuntimeException)t).getTrailers(); |
| String isHeartbeat = trailers != null ? trailers.get(HEARTBEAT) : null; |
| return isHeartbeat != null && Boolean.valueOf(isHeartbeat); |
| } |
| return false; |
| } |
| |
| static IOException unwrapIOException(Throwable t) { |
| final IOException e; |
| if (t instanceof StatusRuntimeException) { |
| e = GrpcUtil.unwrapException((StatusRuntimeException) t); |
| } else { |
| e = IOUtils.asIOException(t); |
| } |
| return e; |
| } |
| |
| static <REPLY extends RaftClientReply, REPLY_PROTO> void asyncCall( |
| StreamObserver<REPLY_PROTO> responseObserver, |
| CheckedSupplier<CompletableFuture<REPLY>, IOException> supplier, |
| Function<REPLY, REPLY_PROTO> toProto) { |
| try { |
| supplier.get().whenCompleteAsync((reply, exception) -> { |
| if (exception != null) { |
| responseObserver.onError(GrpcUtil.wrapException(exception)); |
| } else { |
| responseObserver.onNext(toProto.apply(reply)); |
| responseObserver.onCompleted(); |
| } |
| }); |
| } catch (Exception e) { |
| responseObserver.onError(GrpcUtil.wrapException(e)); |
| } |
| } |
| |
| static void warn(Logger log, Supplier<String> message, Throwable t) { |
| LogUtils.warn(log, message, unwrapThrowable(t), StatusRuntimeException.class, ServerNotReadyException.class); |
| } |
| |
| class StatusRuntimeExceptionMetadataBuilder { |
| private Metadata trailers = new Metadata(); |
| |
| StatusRuntimeExceptionMetadataBuilder(Throwable t) { |
| trailers.put(EXCEPTION_TYPE_KEY, t.getClass().getCanonicalName()); |
| trailers.put(EXCEPTION_OBJECT_KEY, IOUtils.object2Bytes(t)); |
| } |
| |
| StatusRuntimeExceptionMetadataBuilder addCallId(long callId) { |
| if (callId > 0) { |
| trailers.put(CALL_ID, String.valueOf(callId)); |
| } |
| return this; |
| } |
| |
| StatusRuntimeExceptionMetadataBuilder addIsHeartbeat(boolean isHeartbeat) { |
| trailers.put(HEARTBEAT, String.valueOf(isHeartbeat)); |
| return this; |
| } |
| |
| Metadata build() { |
| return trailers; |
| } |
| } |
| |
| /** |
| * Tries to gracefully shut down the managed channel. Falls back to forceful shutdown if |
| * graceful shutdown times out. |
| */ |
| static void shutdownManagedChannel(ManagedChannel managedChannel) { |
| // Close the gRPC managed-channel if not shut down already. |
| if (!managedChannel.isShutdown()) { |
| try { |
| managedChannel.shutdown(); |
| if (!managedChannel.awaitTermination(3, TimeUnit.SECONDS)) { |
| LOG.warn("Timed out gracefully shutting down connection: {}. ", managedChannel); |
| } |
| } catch (Exception e) { |
| LOG.error("Unexpected exception while waiting for channel termination", e); |
| } |
| } |
| |
| // Forceful shut down if still not terminated. |
| if (!managedChannel.isTerminated()) { |
| try { |
| managedChannel.shutdownNow(); |
| if (!managedChannel.awaitTermination(2, TimeUnit.SECONDS)) { |
| LOG.warn("Timed out forcefully shutting down connection: {}. ", managedChannel); |
| } |
| } catch (Exception e) { |
| LOG.error("Unexpected exception while waiting for channel termination", e); |
| } |
| } |
| } |
| } |