blob: f42d3568fce94d5e228e0266b2ecc320eabe1d28 [file] [log] [blame]
/*
* Licensed to the Apache Software Foundation (ASF) under one
* or more contributor license agreements. See the NOTICE file
* distributed with this work for additional information
* regarding copyright ownership. The ASF licenses this file
* to you under the Apache License, Version 2.0 (the
* "License"); you may not use this file except in compliance
* with the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/
package org.apache.ratis.grpc.server;
import org.apache.ratis.grpc.GrpcConfigKeys;
import org.apache.ratis.grpc.GrpcTlsConfig;
import org.apache.ratis.grpc.client.GrpcClientProtocolService;
import org.apache.ratis.protocol.RaftGroupId;
import org.apache.ratis.protocol.RaftPeerId;
import org.apache.ratis.rpc.SupportedRpcType;
import org.apache.ratis.server.RaftServer;
import org.apache.ratis.server.RaftServerConfigKeys;
import org.apache.ratis.server.RaftServerRpc;
import org.apache.ratis.server.impl.RaftServerRpcWithProxy;
import org.apache.ratis.thirdparty.io.grpc.netty.GrpcSslContexts;
import org.apache.ratis.thirdparty.io.grpc.netty.NettyServerBuilder;
import org.apache.ratis.thirdparty.io.grpc.Server;
import org.apache.ratis.thirdparty.io.netty.channel.ChannelOption;
import org.apache.ratis.thirdparty.io.netty.handler.ssl.ClientAuth;
import org.apache.ratis.thirdparty.io.netty.handler.ssl.SslContextBuilder;
import org.apache.ratis.proto.RaftProtos.*;
import org.apache.ratis.util.*;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
import java.io.IOException;
import java.net.InetSocketAddress;
import java.util.function.Supplier;
import static org.apache.ratis.thirdparty.io.netty.handler.ssl.SslProvider.OPENSSL;
/** A grpc implementation of {@link RaftServerRpc}. */
public class GrpcService extends RaftServerRpcWithProxy<GrpcServerProtocolClient, PeerProxyMap<GrpcServerProtocolClient>> {
static final Logger LOG = LoggerFactory.getLogger(GrpcService.class);
public static final String GRPC_SEND_SERVER_REQUEST =
GrpcService.class.getSimpleName() + ".sendRequest";
public static class Builder extends RaftServerRpc.Builder<Builder, GrpcService> {
private GrpcTlsConfig tlsConfig;
private Builder() {}
@Override
public Builder getThis() {
return this;
}
@Override
public GrpcService build() {
return new GrpcService(getServer(), getTlsConfig());
}
public Builder setTlsConfig(GrpcTlsConfig tlsConfig) {
this.tlsConfig = tlsConfig;
return this;
}
public GrpcTlsConfig getTlsConfig() {
return tlsConfig;
}
}
public static Builder newBuilder() {
return new Builder();
}
private final Server server;
private final Supplier<InetSocketAddress> addressSupplier;
private final GrpcClientProtocolService clientProtocolService;
private GrpcService(RaftServer server, GrpcTlsConfig tlsConfig) {
this(server, server::getId,
GrpcConfigKeys.Server.port(server.getProperties()),
GrpcConfigKeys.messageSizeMax(server.getProperties(), LOG::info),
RaftServerConfigKeys.Log.Appender.bufferByteLimit(server.getProperties()),
GrpcConfigKeys.flowControlWindow(server.getProperties(), LOG::info),
RaftServerConfigKeys.Rpc.requestTimeout(server.getProperties()),
tlsConfig);
}
private GrpcService(RaftServer raftServer, Supplier<RaftPeerId> idSupplier, int port,
SizeInBytes grpcMessageSizeMax, SizeInBytes appenderBufferSize,
SizeInBytes flowControlWindow,TimeDuration requestTimeoutDuration, GrpcTlsConfig tlsConfig) {
super(idSupplier, id -> new PeerProxyMap<>(id.toString(),
p -> new GrpcServerProtocolClient(p, flowControlWindow.getSizeInt(),
requestTimeoutDuration, tlsConfig)));
if (appenderBufferSize.getSize() > grpcMessageSizeMax.getSize()) {
throw new IllegalArgumentException("Illegal configuration: "
+ RaftServerConfigKeys.Log.Appender.BUFFER_BYTE_LIMIT_KEY + " = " + appenderBufferSize
+ " > " + GrpcConfigKeys.MESSAGE_SIZE_MAX_KEY + " = " + grpcMessageSizeMax);
}
this.clientProtocolService = new GrpcClientProtocolService(idSupplier, raftServer);
NettyServerBuilder nettyServerBuilder = NettyServerBuilder.forPort(port)
.withChildOption(ChannelOption.SO_REUSEADDR, true)
.maxInboundMessageSize(grpcMessageSizeMax.getSizeInt())
.flowControlWindow(flowControlWindow.getSizeInt())
.addService(new GrpcServerProtocolService(idSupplier, raftServer))
.addService(clientProtocolService)
.addService(new GrpcAdminProtocolService(raftServer));
if (tlsConfig != null) {
SslContextBuilder sslContextBuilder =
SslContextBuilder.forServer(tlsConfig.getCertChain(),
tlsConfig.getPrivateKey());
if (tlsConfig.getMtlsEnabled()) {
sslContextBuilder.clientAuth(ClientAuth.REQUIRE);
sslContextBuilder.trustManager(tlsConfig.getCertChain());
}
sslContextBuilder = GrpcSslContexts.configure(sslContextBuilder, OPENSSL);
try {
nettyServerBuilder.sslContext(sslContextBuilder.build());
} catch (Exception ex) {
throw new IllegalArgumentException("Failed to build SslContext, tlsConfig=" + tlsConfig, ex);
}
}
server = nettyServerBuilder.build();
addressSupplier = JavaUtils.memoize(() -> new InetSocketAddress(port != 0? port: server.getPort()));
}
@Override
public SupportedRpcType getRpcType() {
return SupportedRpcType.GRPC;
}
@Override
public void startImpl() {
try {
server.start();
} catch (IOException e) {
ExitUtils.terminate(1, "Failed to start Grpc server", e, LOG);
}
LOG.info("{}: {} started, listening on {}", getId(), getClass().getSimpleName(), getInetSocketAddress());
}
@Override
public void closeImpl() throws IOException {
final String name = getId() + ": shutdown server with port " + server.getPort();
LOG.info("{} now", name);
final Server s = server.shutdownNow();
super.closeImpl();
try {
s.awaitTermination();
} catch(InterruptedException e) {
throw IOUtils.toInterruptedIOException(name + " failed", e);
}
LOG.info("{} successfully", name);
}
@Override
public void notifyNotLeader(RaftGroupId groupId) {
clientProtocolService.closeAllOrderedRequestStreamObservers(groupId);
}
@Override
public InetSocketAddress getInetSocketAddress() {
return addressSupplier.get();
}
@Override
public AppendEntriesReplyProto appendEntries(AppendEntriesRequestProto request) {
throw new UnsupportedOperationException(
"Blocking " + JavaUtils.getCurrentStackTraceElement().getMethodName() + " call is not supported");
}
@Override
public InstallSnapshotReplyProto installSnapshot(InstallSnapshotRequestProto request) {
throw new UnsupportedOperationException(
"Blocking " + JavaUtils.getCurrentStackTraceElement().getMethodName() + " call is not supported");
}
@Override
public RequestVoteReplyProto requestVote(RequestVoteRequestProto request)
throws IOException {
CodeInjectionForTesting.execute(GRPC_SEND_SERVER_REQUEST, getId(),
null, request);
final RaftPeerId target = RaftPeerId.valueOf(request.getServerRequest().getReplyId());
return getProxies().getProxy(target).requestVote(request);
}
}