| /* |
| * Licensed to the Apache Software Foundation (ASF) under one |
| * or more contributor license agreements. See the NOTICE file |
| * distributed with this work for additional information |
| * regarding copyright ownership. The ASF licenses this file |
| * to you under the Apache License, Version 2.0 (the |
| * "License"); you may not use this file except in compliance |
| * with the License. You may obtain a copy of the License at |
| * |
| * http://www.apache.org/licenses/LICENSE-2.0 |
| * |
| * Unless required by applicable law or agreed to in writing, software |
| * distributed under the License is distributed on an "AS IS" BASIS, |
| * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. |
| * See the License for the specific language governing permissions and |
| * limitations under the License. |
| */ |
| package org.apache.ratis.grpc.server; |
| |
| import org.apache.ratis.conf.RaftProperties; |
| import org.apache.ratis.grpc.GrpcConfigKeys; |
| import org.apache.ratis.grpc.GrpcTlsConfig; |
| import org.apache.ratis.grpc.metrics.intercept.server.MetricServerInterceptor; |
| import org.apache.ratis.protocol.RaftGroupId; |
| import org.apache.ratis.protocol.RaftPeerId; |
| import org.apache.ratis.rpc.SupportedRpcType; |
| import org.apache.ratis.server.RaftServer; |
| import org.apache.ratis.server.RaftServerConfigKeys; |
| import org.apache.ratis.server.RaftServerRpcWithProxy; |
| import org.apache.ratis.thirdparty.io.grpc.ServerInterceptors; |
| import org.apache.ratis.thirdparty.io.grpc.netty.GrpcSslContexts; |
| import org.apache.ratis.thirdparty.io.grpc.netty.NettyServerBuilder; |
| import org.apache.ratis.thirdparty.io.grpc.Server; |
| import org.apache.ratis.thirdparty.io.netty.channel.ChannelOption; |
| import org.apache.ratis.thirdparty.io.netty.handler.ssl.ClientAuth; |
| import org.apache.ratis.thirdparty.io.netty.handler.ssl.SslContextBuilder; |
| |
| import org.apache.ratis.proto.RaftProtos.*; |
| import org.apache.ratis.util.*; |
| import org.slf4j.Logger; |
| import org.slf4j.LoggerFactory; |
| |
| import java.io.IOException; |
| import java.net.InetSocketAddress; |
| import java.util.HashMap; |
| import java.util.Map; |
| import java.util.concurrent.ExecutorService; |
| import java.util.function.Supplier; |
| |
| import static org.apache.ratis.thirdparty.io.netty.handler.ssl.SslProvider.OPENSSL; |
| |
| /** A grpc implementation of {@link org.apache.ratis.server.RaftServerRpc}. */ |
| public final class GrpcService extends RaftServerRpcWithProxy<GrpcServerProtocolClient, |
| PeerProxyMap<GrpcServerProtocolClient>> { |
| static final Logger LOG = LoggerFactory.getLogger(GrpcService.class); |
| public static final String GRPC_SEND_SERVER_REQUEST = |
| JavaUtils.getClassSimpleName(GrpcService.class) + ".sendRequest"; |
| |
| public static final class Builder { |
| private RaftServer server; |
| private GrpcTlsConfig tlsConfig; |
| private GrpcTlsConfig adminTlsConfig; |
| private GrpcTlsConfig clientTlsConfig; |
| private GrpcTlsConfig serverTlsConfig; |
| |
| private Builder() {} |
| |
| public Builder setServer(RaftServer raftServer) { |
| this.server = raftServer; |
| return this; |
| } |
| |
| public GrpcService build() { |
| return new GrpcService(server, adminTlsConfig, clientTlsConfig, serverTlsConfig); |
| } |
| |
| public Builder setTlsConfig(GrpcTlsConfig tlsConfig) { |
| this.tlsConfig = tlsConfig; |
| return this; |
| } |
| |
| public Builder setAdminTlsConfig(GrpcTlsConfig config) { |
| this.adminTlsConfig = config; |
| return this; |
| } |
| |
| public Builder setClientTlsConfig(GrpcTlsConfig config) { |
| this.clientTlsConfig = config; |
| return this; |
| } |
| |
| public Builder setServerTlsConfig(GrpcTlsConfig config) { |
| this.serverTlsConfig = config; |
| return this; |
| } |
| |
| public GrpcTlsConfig getTlsConfig() { |
| return tlsConfig; |
| } |
| } |
| |
| public static Builder newBuilder() { |
| return new Builder(); |
| } |
| |
| private final Map<String, Server> servers = new HashMap<>(); |
| private final Supplier<InetSocketAddress> addressSupplier; |
| private final Supplier<InetSocketAddress> clientServerAddressSupplier; |
| private final Supplier<InetSocketAddress> adminServerAddressSupplier; |
| |
| private final ExecutorService executor; |
| private final GrpcClientProtocolService clientProtocolService; |
| |
| private final MetricServerInterceptor serverInterceptor; |
| |
| public MetricServerInterceptor getServerInterceptor() { |
| return serverInterceptor; |
| } |
| |
| private GrpcService(RaftServer server, |
| GrpcTlsConfig adminTlsConfig, GrpcTlsConfig clientTlsConfig, GrpcTlsConfig serverTlsConfig) { |
| this(server, server::getId, |
| GrpcConfigKeys.Admin.host(server.getProperties()), |
| GrpcConfigKeys.Admin.port(server.getProperties()), |
| adminTlsConfig, |
| GrpcConfigKeys.Client.host(server.getProperties()), |
| GrpcConfigKeys.Client.port(server.getProperties()), |
| clientTlsConfig, |
| GrpcConfigKeys.Server.host(server.getProperties()), |
| GrpcConfigKeys.Server.port(server.getProperties()), |
| serverTlsConfig, |
| GrpcConfigKeys.messageSizeMax(server.getProperties(), LOG::info), |
| RaftServerConfigKeys.Log.Appender.bufferByteLimit(server.getProperties()), |
| GrpcConfigKeys.flowControlWindow(server.getProperties(), LOG::info), |
| RaftServerConfigKeys.Rpc.requestTimeout(server.getProperties()), |
| GrpcConfigKeys.Server.heartbeatChannel(server.getProperties())); |
| } |
| |
| @SuppressWarnings("checkstyle:ParameterNumber") // private constructor |
| private GrpcService(RaftServer raftServer, Supplier<RaftPeerId> idSupplier, |
| String adminHost, int adminPort, GrpcTlsConfig adminTlsConfig, |
| String clientHost, int clientPort, GrpcTlsConfig clientTlsConfig, |
| String serverHost, int serverPort, GrpcTlsConfig serverTlsConfig, |
| SizeInBytes grpcMessageSizeMax, SizeInBytes appenderBufferSize, |
| SizeInBytes flowControlWindow,TimeDuration requestTimeoutDuration, |
| boolean useSeparateHBChannel) { |
| super(idSupplier, id -> new PeerProxyMap<>(id.toString(), |
| p -> new GrpcServerProtocolClient(p, flowControlWindow.getSizeInt(), |
| requestTimeoutDuration, serverTlsConfig, useSeparateHBChannel))); |
| if (appenderBufferSize.getSize() > grpcMessageSizeMax.getSize()) { |
| throw new IllegalArgumentException("Illegal configuration: " |
| + RaftServerConfigKeys.Log.Appender.BUFFER_BYTE_LIMIT_KEY + " = " + appenderBufferSize |
| + " > " + GrpcConfigKeys.MESSAGE_SIZE_MAX_KEY + " = " + grpcMessageSizeMax); |
| } |
| |
| final RaftProperties properties = raftServer.getProperties(); |
| this.executor = ConcurrentUtils.newThreadPoolWithMax( |
| GrpcConfigKeys.Server.asyncRequestThreadPoolCached(properties), |
| GrpcConfigKeys.Server.asyncRequestThreadPoolSize(properties), |
| getId() + "-request-"); |
| this.clientProtocolService = new GrpcClientProtocolService(idSupplier, raftServer, executor); |
| |
| this.serverInterceptor = new MetricServerInterceptor( |
| idSupplier, |
| JavaUtils.getClassSimpleName(getClass()) + "_" + serverPort |
| ); |
| |
| final boolean separateAdminServer = adminPort != serverPort && adminPort > 0; |
| final boolean separateClientServer = clientPort != serverPort && clientPort > 0; |
| |
| final NettyServerBuilder serverBuilder = |
| startBuildingNettyServer(serverHost, serverPort, serverTlsConfig, grpcMessageSizeMax, flowControlWindow); |
| serverBuilder.addService(ServerInterceptors.intercept( |
| new GrpcServerProtocolService(idSupplier, raftServer), serverInterceptor)); |
| if (!separateAdminServer) { |
| addAdminService(raftServer, serverBuilder); |
| } |
| if (!separateClientServer) { |
| addClientService(serverBuilder); |
| } |
| |
| final Server server = serverBuilder.build(); |
| servers.put(GrpcServerProtocolService.class.getSimpleName(), server); |
| addressSupplier = newAddressSupplier(serverPort, server); |
| |
| if (separateAdminServer) { |
| final NettyServerBuilder builder = |
| startBuildingNettyServer(adminHost, adminPort, adminTlsConfig, grpcMessageSizeMax, flowControlWindow); |
| addAdminService(raftServer, builder); |
| final Server adminServer = builder.build(); |
| servers.put(GrpcAdminProtocolService.class.getName(), adminServer); |
| adminServerAddressSupplier = newAddressSupplier(adminPort, adminServer); |
| } else { |
| adminServerAddressSupplier = addressSupplier; |
| } |
| |
| if (separateClientServer) { |
| final NettyServerBuilder builder = |
| startBuildingNettyServer(clientHost, clientPort, clientTlsConfig, grpcMessageSizeMax, flowControlWindow); |
| addClientService(builder); |
| final Server clientServer = builder.build(); |
| servers.put(GrpcClientProtocolService.class.getName(), clientServer); |
| clientServerAddressSupplier = newAddressSupplier(clientPort, clientServer); |
| } else { |
| clientServerAddressSupplier = addressSupplier; |
| } |
| } |
| |
| private MemoizedSupplier<InetSocketAddress> newAddressSupplier(int port, Server server) { |
| return JavaUtils.memoize(() -> new InetSocketAddress(port != 0 ? port : server.getPort())); |
| } |
| |
| private void addClientService(NettyServerBuilder builder) { |
| builder.addService(ServerInterceptors.intercept(clientProtocolService, serverInterceptor)); |
| } |
| |
| private void addAdminService(RaftServer raftServer, NettyServerBuilder nettyServerBuilder) { |
| nettyServerBuilder.addService(ServerInterceptors.intercept( |
| new GrpcAdminProtocolService(raftServer), |
| serverInterceptor)); |
| } |
| |
| private static NettyServerBuilder startBuildingNettyServer(String hostname, int port, GrpcTlsConfig tlsConfig, |
| SizeInBytes grpcMessageSizeMax, SizeInBytes flowControlWindow) { |
| InetSocketAddress address = hostname == null || hostname.isEmpty() ? |
| new InetSocketAddress(port) : new InetSocketAddress(hostname, port); |
| NettyServerBuilder nettyServerBuilder = NettyServerBuilder.forAddress(address) |
| .withChildOption(ChannelOption.SO_REUSEADDR, true) |
| .maxInboundMessageSize(grpcMessageSizeMax.getSizeInt()) |
| .flowControlWindow(flowControlWindow.getSizeInt()); |
| |
| if (tlsConfig != null) { |
| SslContextBuilder sslContextBuilder = |
| tlsConfig.isFileBasedConfig()? |
| SslContextBuilder.forServer(tlsConfig.getCertChainFile(), |
| tlsConfig.getPrivateKeyFile()): |
| SslContextBuilder.forServer(tlsConfig.getPrivateKey(), |
| tlsConfig.getCertChain()); |
| if (tlsConfig.getMtlsEnabled()) { |
| sslContextBuilder.clientAuth(ClientAuth.REQUIRE); |
| if (tlsConfig.isFileBasedConfig()) { |
| sslContextBuilder.trustManager(tlsConfig.getTrustStoreFile()); |
| } else { |
| sslContextBuilder.trustManager(tlsConfig.getTrustStore()); |
| } |
| } |
| sslContextBuilder = GrpcSslContexts.configure(sslContextBuilder, OPENSSL); |
| try { |
| nettyServerBuilder.sslContext(sslContextBuilder.build()); |
| } catch (Exception ex) { |
| throw new IllegalArgumentException("Failed to build SslContext, tlsConfig=" + tlsConfig, ex); |
| } |
| } |
| return nettyServerBuilder; |
| } |
| |
| @Override |
| public SupportedRpcType getRpcType() { |
| return SupportedRpcType.GRPC; |
| } |
| |
| @Override |
| public void startImpl() { |
| for (Server server : servers.values()) { |
| try { |
| server.start(); |
| } catch (IOException e) { |
| ExitUtils.terminate(1, "Failed to start Grpc server", e, LOG); |
| } |
| LOG.info("{}: {} started, listening on {}", |
| getId(), JavaUtils.getClassSimpleName(getClass()), server.getPort()); |
| } |
| } |
| |
| @Override |
| public void closeImpl() throws IOException { |
| for (Map.Entry<String, Server> server : servers.entrySet()) { |
| final String name = getId() + ": shutdown server " + server.getKey(); |
| LOG.info("{} now", name); |
| final Server s = server.getValue().shutdownNow(); |
| super.closeImpl(); |
| try { |
| s.awaitTermination(); |
| } catch (InterruptedException e) { |
| Thread.currentThread().interrupt(); |
| throw IOUtils.toInterruptedIOException(name + " failed", e); |
| } |
| LOG.info("{} successfully", name); |
| } |
| |
| serverInterceptor.close(); |
| ConcurrentUtils.shutdownAndWait(executor); |
| } |
| |
| @Override |
| public void notifyNotLeader(RaftGroupId groupId) { |
| clientProtocolService.closeAllOrderedRequestStreamObservers(groupId); |
| } |
| |
| @Override |
| public InetSocketAddress getInetSocketAddress() { |
| return addressSupplier.get(); |
| } |
| |
| @Override |
| public InetSocketAddress getClientServerAddress() { |
| return clientServerAddressSupplier.get(); |
| } |
| |
| @Override |
| public InetSocketAddress getAdminServerAddress() { |
| return adminServerAddressSupplier.get(); |
| } |
| |
| @Override |
| public AppendEntriesReplyProto appendEntries(AppendEntriesRequestProto request) { |
| throw new UnsupportedOperationException( |
| "Blocking " + JavaUtils.getCurrentStackTraceElement().getMethodName() + " call is not supported"); |
| } |
| |
| @Override |
| public InstallSnapshotReplyProto installSnapshot(InstallSnapshotRequestProto request) { |
| throw new UnsupportedOperationException( |
| "Blocking " + JavaUtils.getCurrentStackTraceElement().getMethodName() + " call is not supported"); |
| } |
| |
| @Override |
| public RequestVoteReplyProto requestVote(RequestVoteRequestProto request) |
| throws IOException { |
| CodeInjectionForTesting.execute(GRPC_SEND_SERVER_REQUEST, getId(), |
| null, request); |
| |
| final RaftPeerId target = RaftPeerId.valueOf(request.getServerRequest().getReplyId()); |
| return getProxies().getProxy(target).requestVote(request); |
| } |
| |
| @Override |
| public StartLeaderElectionReplyProto startLeaderElection(StartLeaderElectionRequestProto request) throws IOException { |
| CodeInjectionForTesting.execute(GRPC_SEND_SERVER_REQUEST, getId(), null, request); |
| |
| final RaftPeerId target = RaftPeerId.valueOf(request.getServerRequest().getReplyId()); |
| return getProxies().getProxy(target).startLeaderElection(request); |
| } |
| |
| } |