blob: 92b685d6a89f0278c1d451cf0acc10968ab6c6c2 [file] [log] [blame]
/*
* Licensed to the Apache Software Foundation (ASF) under one
* or more contributor license agreements. See the NOTICE file
* distributed with this work for additional information
* regarding copyright ownership. The ASF licenses this file
* to you under the Apache License, Version 2.0 (the
* "License"); you may not use this file except in compliance
* with the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/
package org.apache.hadoop.ozone.om.ratis;
import com.google.common.annotations.VisibleForTesting;
import com.google.common.base.Preconditions;
import com.google.common.collect.Maps;
import com.google.protobuf.InvalidProtocolBufferException;
import com.google.protobuf.ServiceException;
import java.io.File;
import java.io.IOException;
import java.net.InetSocketAddress;
import java.nio.charset.StandardCharsets;
import java.security.cert.X509Certificate;
import java.util.ArrayList;
import java.util.Collection;
import java.util.Collections;
import java.util.List;
import java.util.Map;
import java.util.UUID;
import java.util.concurrent.ExecutionException;
import java.util.concurrent.atomic.AtomicLong;
import java.util.concurrent.TimeUnit;
import org.apache.hadoop.hdds.conf.ConfigurationSource;
import org.apache.hadoop.hdds.conf.StorageUnit;
import org.apache.hadoop.hdds.ratis.RatisHelper;
import org.apache.hadoop.hdds.security.x509.SecurityConfig;
import org.apache.hadoop.hdds.security.x509.certificate.client.CertificateClient;
import org.apache.hadoop.hdds.tracing.TracingUtil;
import org.apache.hadoop.hdds.utils.HAUtils;
import org.apache.hadoop.ipc.ProtobufRpcEngine.Server;
import org.apache.hadoop.ozone.om.OMConfigKeys;
import org.apache.hadoop.ozone.om.OzoneManager;
import org.apache.hadoop.ozone.om.exceptions.OMException;
import org.apache.hadoop.ozone.om.exceptions.OMLeaderNotReadyException;
import org.apache.hadoop.ozone.om.exceptions.OMNotLeaderException;
import org.apache.hadoop.ozone.om.helpers.OMNodeDetails;
import org.apache.hadoop.ozone.om.helpers.OMRatisHelper;
import org.apache.hadoop.ozone.om.ratis.utils.OzoneManagerRatisUtils;
import org.apache.hadoop.ozone.protocol.proto.OzoneManagerProtocolProtos;
import org.apache.hadoop.ozone.protocol.proto.OzoneManagerProtocolProtos.OMRequest;
import org.apache.hadoop.ozone.protocol.proto.OzoneManagerProtocolProtos.OMResponse;
import org.apache.hadoop.ozone.protocol.proto.OzoneManagerProtocolProtos.Status;
import org.apache.ratis.conf.Parameters;
import org.apache.ratis.conf.RaftProperties;
import org.apache.ratis.grpc.GrpcConfigKeys;
import org.apache.ratis.grpc.GrpcTlsConfig;
import org.apache.ratis.netty.NettyConfigKeys;
import org.apache.ratis.protocol.ClientId;
import org.apache.ratis.protocol.SetConfigurationRequest;
import org.apache.ratis.protocol.exceptions.LeaderNotReadyException;
import org.apache.ratis.protocol.exceptions.NotLeaderException;
import org.apache.ratis.protocol.exceptions.StateMachineException;
import org.apache.ratis.protocol.Message;
import org.apache.ratis.protocol.RaftClientReply;
import org.apache.ratis.protocol.RaftClientRequest;
import org.apache.ratis.protocol.RaftGroup;
import org.apache.ratis.protocol.RaftGroupId;
import org.apache.ratis.protocol.RaftPeer;
import org.apache.ratis.protocol.RaftPeerId;
import org.apache.ratis.rpc.RpcType;
import org.apache.ratis.rpc.SupportedRpcType;
import org.apache.ratis.server.RaftServer;
import org.apache.ratis.server.RaftServerConfigKeys;
import org.apache.ratis.server.protocol.TermIndex;
import org.apache.ratis.thirdparty.com.google.protobuf.ByteString;
import org.apache.ratis.util.LifeCycle;
import org.apache.ratis.util.SizeInBytes;
import org.apache.ratis.util.StringUtils;
import org.apache.ratis.util.TimeDuration;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
import static org.apache.hadoop.ipc.RpcConstants.DUMMY_CLIENT_ID;
import static org.apache.hadoop.ipc.RpcConstants.INVALID_CALL_ID;
import static org.apache.hadoop.ozone.om.OMConfigKeys.OZONE_OM_HA_PREFIX;
/**
* Creates a Ratis server endpoint for OM.
*/
public final class OzoneManagerRatisServer {
private static final Logger LOG = LoggerFactory
.getLogger(OzoneManagerRatisServer.class);
private final int port;
private final InetSocketAddress omRatisAddress;
private final RaftServer server;
private final RaftGroupId raftGroupId;
private final RaftGroup raftGroup;
private final RaftPeerId raftPeerId;
private final Map<String, RaftPeer> raftPeerMap;
private final OzoneManager ozoneManager;
private final OzoneManagerStateMachine omStateMachine;
private final String ratisStorageDir;
private final ClientId clientId = ClientId.randomId();
private static final AtomicLong CALL_ID_COUNTER = new AtomicLong();
private static long nextCallId() {
return CALL_ID_COUNTER.getAndIncrement() & Long.MAX_VALUE;
}
/**
* Returns an OM Ratis server.
* @param conf configuration
* @param om the OM instance starting the ratis server
* @param raftGroupIdStr raft group id string
* @param localRaftPeerId raft peer id of this Ratis server
* @param addr address of the ratis server
* @param peers peer nodes in the raft ring
* @throws IOException
*/
@SuppressWarnings({"parameternumber", "java:S107"})
private OzoneManagerRatisServer(ConfigurationSource conf, OzoneManager om,
String raftGroupIdStr, RaftPeerId localRaftPeerId,
InetSocketAddress addr, List<RaftPeer> peers, boolean isBootstrapping,
SecurityConfig secConfig, CertificateClient certClient)
throws IOException {
this.ozoneManager = om;
this.omRatisAddress = addr;
this.port = addr.getPort();
this.ratisStorageDir = OzoneManagerRatisUtils.getOMRatisDirectory(conf);
RaftProperties serverProperties = newRaftProperties(conf);
this.raftPeerId = localRaftPeerId;
this.raftGroupId = RaftGroupId.valueOf(
getRaftGroupIdFromOmServiceId(raftGroupIdStr));
this.raftPeerMap = Maps.newHashMap();
peers.forEach(e -> raftPeerMap.put(e.getId().toString(), e));
this.raftGroup = RaftGroup.valueOf(raftGroupId, peers);
if (isBootstrapping) {
LOG.info("OM started in Bootstrap mode. Instantiating OM Ratis server " +
"with groupID: {}", raftGroupIdStr);
} else {
StringBuilder raftPeersStr = new StringBuilder();
for (RaftPeer peer : peers) {
raftPeersStr.append(", ").append(peer.getAddress());
}
LOG.info("Instantiating OM Ratis server with groupID: {} and peers: {}",
raftGroupIdStr, raftPeersStr.toString().substring(2));
}
this.omStateMachine = getStateMachine(conf);
Parameters parameters = createServerTlsParameters(secConfig, certClient);
this.server = RaftServer.newBuilder()
.setServerId(this.raftPeerId)
.setGroup(this.raftGroup)
.setProperties(serverProperties)
.setParameters(parameters)
.setStateMachine(omStateMachine)
.build();
}
/**
* Creates an instance of OzoneManagerRatisServer.
*/
public static OzoneManagerRatisServer newOMRatisServer(
ConfigurationSource ozoneConf, OzoneManager omProtocol,
OMNodeDetails omNodeDetails, Map<String, OMNodeDetails> peerNodes,
SecurityConfig secConfig, CertificateClient certClient,
boolean isBootstrapping) throws IOException {
// RaftGroupId is the omServiceId
String omServiceId = omNodeDetails.getServiceId();
String omNodeId = omNodeDetails.getNodeId();
RaftPeerId localRaftPeerId = RaftPeerId.getRaftPeerId(omNodeId);
InetSocketAddress ratisAddr = new InetSocketAddress(
omNodeDetails.getInetAddress(), omNodeDetails.getRatisPort());
RaftPeer localRaftPeer = RaftPeer.newBuilder()
.setId(localRaftPeerId)
.setAddress(ratisAddr)
.build();
// If OM is started in bootstrap mode, do not add peers to the RaftGroup.
// Raft peers will be added after SetConfiguration transaction is
// committed by leader and propagated to followers.
List<RaftPeer> raftPeers = new ArrayList<>();
if (!isBootstrapping) {
// On regular startup, add all OMs to Ratis ring
raftPeers.add(localRaftPeer);
for (Map.Entry<String, OMNodeDetails> peerInfo : peerNodes.entrySet()) {
String peerNodeId = peerInfo.getKey();
OMNodeDetails peerNode = peerInfo.getValue();
RaftPeerId raftPeerId = RaftPeerId.valueOf(peerNodeId);
RaftPeer raftPeer;
if (peerNode.isHostUnresolved()) {
raftPeer = RaftPeer.newBuilder()
.setId(raftPeerId)
.setAddress(peerNode.getRatisHostPortStr())
.build();
} else {
InetSocketAddress peerRatisAddr = new InetSocketAddress(
peerNode.getInetAddress(), peerNode.getRatisPort());
raftPeer = RaftPeer.newBuilder()
.setId(raftPeerId)
.setAddress(peerRatisAddr)
.build();
}
// Add other OM nodes belonging to the same OM service to the Ratis ring
raftPeers.add(raftPeer);
}
}
return new OzoneManagerRatisServer(ozoneConf, omProtocol, omServiceId,
localRaftPeerId, ratisAddr, raftPeers, isBootstrapping, secConfig,
certClient);
}
/**
* Submit request to Ratis server.
* @param omRequest
* @return OMResponse - response returned to the client.
* @throws ServiceException
*/
public OMResponse submitRequest(OMRequest omRequest) throws ServiceException {
// In prepare mode, only prepare and cancel requests are allowed to go
// through.
if (ozoneManager.getPrepareState().requestAllowed(omRequest.getCmdType())) {
RaftClientRequest raftClientRequest =
createWriteRaftClientRequest(omRequest);
RaftClientReply raftClientReply = submitRequestToRatis(raftClientRequest);
return processReply(omRequest, raftClientReply);
} else {
LOG.info("Rejecting write request on OM {} because it is in prepare " +
"mode: {}", ozoneManager.getOMNodeId(),
omRequest.getCmdType().name());
String message = "Cannot apply write request " +
omRequest.getCmdType().name() + " when OM is in prepare mode.";
OMResponse.Builder omResponse = OMResponse.newBuilder()
.setMessage(message)
.setStatus(Status.NOT_SUPPORTED_OPERATION_WHEN_PREPARED)
.setCmdType(omRequest.getCmdType())
.setTraceID(omRequest.getTraceID())
.setSuccess(false);
return omResponse.build();
}
}
/**
* API used internally from OzoneManager Server when requests needs to be
* submitted to ratis, where the crafted RaftClientRequest is passed along.
* @param omRequest
* @param raftClientRequest
* @return OMResponse
* @throws ServiceException
*/
public OMResponse submitRequest(OMRequest omRequest,
RaftClientRequest raftClientRequest) throws ServiceException {
RaftClientReply raftClientReply = submitRequestToRatis(raftClientRequest);
return processReply(omRequest, raftClientReply);
}
private RaftClientReply submitRequestToRatis(
RaftClientRequest raftClientRequest) throws ServiceException {
try {
return server.submitClientRequestAsync(raftClientRequest)
.get();
} catch (ExecutionException | IOException ex) {
throw new ServiceException(ex.getMessage(), ex);
} catch (InterruptedException ex) {
Thread.currentThread().interrupt();
throw new ServiceException(ex.getMessage(), ex);
}
}
/**
* Add new OM to the Ratis ring.
*/
public void addOMToRatisRing(OMNodeDetails newOMNode) throws IOException {
Preconditions.checkNotNull(newOMNode);
String newOMNodeId = newOMNode.getNodeId();
RaftPeerId newOMRaftPeerId = RaftPeerId.valueOf(newOMNodeId);
InetSocketAddress newOMRatisAddr = new InetSocketAddress(
newOMNode.getHostAddress(), newOMNode.getRatisPort());
RaftPeer newRaftPeer = RaftPeer.newBuilder()
.setId(newOMRaftPeerId)
.setAddress(newOMRatisAddr)
.build();
LOG.info("{}: Submitting SetConfiguration request to Ratis server to add" +
" new OM peer {} to the Ratis group {}", ozoneManager.getOMNodeId(),
newRaftPeer, raftGroup);
List<RaftPeer> newPeersList = new ArrayList<>();
newPeersList.addAll(raftPeerMap.values());
newPeersList.add(newRaftPeer);
checkLeaderStatus();
SetConfigurationRequest request = new SetConfigurationRequest(clientId,
server.getId(), raftGroupId, nextCallId(), newPeersList);
RaftClientReply raftClientReply = server.setConfiguration(request);
if (raftClientReply.isSuccess()) {
LOG.info("Added OM {} to Ratis group {}.", newOMNodeId, raftGroupId);
} else {
LOG.error("Failed to add OM {} to Ratis group {}. Ratis " +
"SetConfiguration reply: {}", newOMNodeId, raftGroupId,
raftClientReply);
throw new IOException("Failed to add OM " + newOMNodeId + " to Ratis " +
"ring.");
}
}
/**
* Remove decommissioned OM node from Ratis ring.
*/
public void removeOMFromRatisRing(OMNodeDetails removeOMNode)
throws IOException {
Preconditions.checkNotNull(removeOMNode);
String removeNodeId = removeOMNode.getNodeId();
LOG.info("{}: Submitting SetConfiguration request to Ratis server to " +
"remove OM peer {} from Ratis group {}", ozoneManager.getOMNodeId(),
removeNodeId, raftGroup);
List<RaftPeer> newPeersList = new ArrayList<>();
newPeersList.addAll(raftPeerMap.values());
newPeersList.remove(raftPeerMap.get(removeNodeId));
checkLeaderStatus();
SetConfigurationRequest request = new SetConfigurationRequest(clientId,
server.getId(), raftGroupId, nextCallId(), newPeersList);
RaftClientReply raftClientReply = server.setConfiguration(request);
if (raftClientReply.isSuccess()) {
LOG.info("Removed OM {} from Ratis group {}.", removeNodeId,
raftGroupId);
} else {
LOG.error("Failed to remove OM {} from Ratis group {}. Ratis " +
"SetConfiguration reply: {}", removeNodeId, raftGroupId,
raftClientReply);
throw new IOException("Failed to remove OM " + removeNodeId + " from " +
"Ratis ring.");
}
}
/**
* Return a list of peer NodeIds.
*/
public List<String> getPeerIds() {
List<String> peerIds = new ArrayList<>();
peerIds.addAll(raftPeerMap.keySet());
return peerIds;
}
/**
* Check if the input peerId exists in the peers list.
* @return true if the nodeId is self or it exists in peer node list,
* false otherwise.
*/
@VisibleForTesting
public boolean doesPeerExist(String peerId) {
return raftPeerMap.containsKey(peerId);
}
/**
* Add given node to list of RaftPeers.
*/
public void addRaftPeer(OMNodeDetails omNodeDetails) {
InetSocketAddress newOMRatisAddr = new InetSocketAddress(
omNodeDetails.getHostAddress(), omNodeDetails.getRatisPort());
String newNodeId = omNodeDetails.getNodeId();
RaftPeerId newPeerId = RaftPeerId.valueOf(newNodeId);
RaftPeer raftPeer = RaftPeer.newBuilder()
.setId(newPeerId)
.setAddress(newOMRatisAddr)
.build();
raftPeerMap.put(newNodeId, raftPeer);
LOG.info("Added OM {} to Ratis Peers list.", newNodeId);
}
/**
* Remove given node from list of RaftPeers.
*/
public void removeRaftPeer(OMNodeDetails omNodeDetails) {
String removeNodeID = omNodeDetails.getNodeId();
raftPeerMap.remove(removeNodeID);
LOG.info("{}: Removed OM {} from Ratis Peers list.", this, removeNodeID);
}
/**
* Create Write RaftClient request from OMRequest.
* @param omRequest
* @return RaftClientRequest - Raft Client request which is submitted to
* ratis server.
*/
private RaftClientRequest createWriteRaftClientRequest(OMRequest omRequest) {
if (!ozoneManager.isTestSecureOmFlag()) {
Preconditions.checkArgument(Server.getClientId() != DUMMY_CLIENT_ID);
Preconditions.checkArgument(Server.getCallId() != INVALID_CALL_ID);
}
return RaftClientRequest.newBuilder()
.setClientId(
ClientId.valueOf(UUID.nameUUIDFromBytes(Server.getClientId())))
.setServerId(server.getId())
.setGroupId(raftGroupId)
.setCallId(Server.getCallId())
.setMessage(
Message.valueOf(
OMRatisHelper.convertRequestToByteString(omRequest)))
.setType(RaftClientRequest.writeRequestType())
.build();
}
/**
* Process the raftClientReply and return OMResponse.
* @param omRequest
* @param reply
* @return OMResponse - response which is returned to client.
* @throws ServiceException
*/
private OMResponse processReply(OMRequest omRequest, RaftClientReply reply)
throws ServiceException {
// NotLeader exception is thrown only when the raft server to which the
// request is submitted is not the leader. This can happen first time
// when client is submitting request to OM.
if (!reply.isSuccess()) {
NotLeaderException notLeaderException = reply.getNotLeaderException();
if (notLeaderException != null) {
throw new ServiceException(
OMNotLeaderException.convertToOMNotLeaderException(
notLeaderException, getRaftPeerId()));
}
LeaderNotReadyException leaderNotReadyException =
reply.getLeaderNotReadyException();
if (leaderNotReadyException != null) {
throw new ServiceException(new OMLeaderNotReadyException(
leaderNotReadyException.getMessage()));
}
StateMachineException stateMachineException =
reply.getStateMachineException();
if (stateMachineException != null) {
OMResponse.Builder omResponse = OMResponse.newBuilder()
.setCmdType(omRequest.getCmdType())
.setSuccess(false)
.setTraceID(omRequest.getTraceID());
if (stateMachineException.getCause() != null) {
omResponse.setMessage(stateMachineException.getCause().getMessage());
omResponse.setStatus(
exceptionToResponseStatus(stateMachineException.getCause()));
} else {
// Current Ratis is setting cause, this is an safer side check.
LOG.error("StateMachine exception cause is not set");
omResponse.setStatus(
OzoneManagerProtocolProtos.Status.INTERNAL_ERROR);
omResponse.setMessage(
StringUtils.stringifyException(stateMachineException));
}
if (LOG.isDebugEnabled()) {
LOG.debug("Error while executing ratis request. " +
"stateMachineException: ", stateMachineException);
}
return omResponse.build();
}
}
try {
return OMRatisHelper.getOMResponseFromRaftClientReply(reply);
} catch (InvalidProtocolBufferException ex) {
if (ex.getMessage() != null) {
throw new ServiceException(ex.getMessage(), ex);
} else {
throw new ServiceException(ex);
}
}
// TODO: Still need to handle RaftRetry failure exception and
// NotReplicated exception.
}
/**
* Convert exception to {@link OzoneManagerProtocolProtos.Status}.
* @param cause - Cause from stateMachine exception
* @return {@link OzoneManagerProtocolProtos.Status}
*/
private OzoneManagerProtocolProtos.Status exceptionToResponseStatus(
Throwable cause) {
if (cause instanceof OMException) {
return OzoneManagerProtocolProtos.Status.values()[
((OMException) cause).getResult().ordinal()];
} else {
LOG.error("Unknown error occurs", cause);
return OzoneManagerProtocolProtos.Status.INTERNAL_ERROR;
}
}
public RaftGroup getRaftGroup() {
return this.raftGroup;
}
@VisibleForTesting
public RaftServer getServer() {
return server;
}
/**
* Initializes and returns OzoneManager StateMachine.
*/
private OzoneManagerStateMachine getStateMachine(ConfigurationSource conf)
throws IOException {
return new OzoneManagerStateMachine(this,
TracingUtil.isTracingEnabled(conf));
}
@VisibleForTesting
public OzoneManagerStateMachine getOmStateMachine() {
return omStateMachine;
}
public OzoneManager getOzoneManager() {
return ozoneManager;
}
/**
* Start the Ratis server.
* @throws IOException
*/
public void start() throws IOException {
LOG.info("Starting {} {} at port {}", getClass().getSimpleName(),
server.getId(), port);
server.start();
}
public void stop() {
try {
server.close();
omStateMachine.close();
} catch (IOException e) {
throw new RuntimeException(e);
}
}
//TODO simplify it to make it shorter
@SuppressWarnings("methodlength")
private RaftProperties newRaftProperties(ConfigurationSource conf) {
// Set RPC type
final String rpcType = conf.get(
OMConfigKeys.OZONE_OM_RATIS_RPC_TYPE_KEY,
OMConfigKeys.OZONE_OM_RATIS_RPC_TYPE_DEFAULT);
final RpcType rpc = SupportedRpcType.valueOfIgnoreCase(rpcType);
final RaftProperties properties = RatisHelper.newRaftProperties(rpc);
// Set the ratis port number
if (rpc == SupportedRpcType.GRPC) {
GrpcConfigKeys.Server.setPort(properties, port);
} else if (rpc == SupportedRpcType.NETTY) {
NettyConfigKeys.Server.setPort(properties, port);
}
// Set Ratis storage directory
RaftServerConfigKeys.setStorageDir(properties,
Collections.singletonList(new File(ratisStorageDir)));
// Disable/enable the pre vote feature in Ratis
RaftServerConfigKeys.LeaderElection.setPreVote(properties,
conf.getBoolean(OMConfigKeys.OZONE_OM_RATIS_SERVER_ELECTION_PRE_VOTE,
OMConfigKeys.OZONE_OM_RATIS_SERVER_ELECTION_PRE_VOTE_DEFAULT));
// Set RAFT segment size
final long raftSegmentSize = (long) conf.getStorageSize(
OMConfigKeys.OZONE_OM_RATIS_SEGMENT_SIZE_KEY,
OMConfigKeys.OZONE_OM_RATIS_SEGMENT_SIZE_DEFAULT,
StorageUnit.BYTES);
RaftServerConfigKeys.Log.setSegmentSizeMax(properties,
SizeInBytes.valueOf(raftSegmentSize));
RaftServerConfigKeys.Log.setPurgeUptoSnapshotIndex(properties, true);
// Set RAFT segment pre-allocated size
final long raftSegmentPreallocatedSize = (long) conf.getStorageSize(
OMConfigKeys.OZONE_OM_RATIS_SEGMENT_PREALLOCATED_SIZE_KEY,
OMConfigKeys.OZONE_OM_RATIS_SEGMENT_PREALLOCATED_SIZE_DEFAULT,
StorageUnit.BYTES);
int logAppenderQueueNumElements = conf.getInt(
OMConfigKeys.OZONE_OM_RATIS_LOG_APPENDER_QUEUE_NUM_ELEMENTS,
OMConfigKeys.OZONE_OM_RATIS_LOG_APPENDER_QUEUE_NUM_ELEMENTS_DEFAULT);
final int logAppenderQueueByteLimit = (int) conf.getStorageSize(
OMConfigKeys.OZONE_OM_RATIS_LOG_APPENDER_QUEUE_BYTE_LIMIT,
OMConfigKeys.OZONE_OM_RATIS_LOG_APPENDER_QUEUE_BYTE_LIMIT_DEFAULT,
StorageUnit.BYTES);
RaftServerConfigKeys.Log.Appender.setBufferElementLimit(properties,
logAppenderQueueNumElements);
RaftServerConfigKeys.Log.Appender.setBufferByteLimit(properties,
SizeInBytes.valueOf(logAppenderQueueByteLimit));
RaftServerConfigKeys.Log.setPreallocatedSize(properties,
SizeInBytes.valueOf(raftSegmentPreallocatedSize));
RaftServerConfigKeys.Log.Appender.setInstallSnapshotEnabled(properties,
false);
final int logPurgeGap = conf.getInt(
OMConfigKeys.OZONE_OM_RATIS_LOG_PURGE_GAP,
OMConfigKeys.OZONE_OM_RATIS_LOG_PURGE_GAP_DEFAULT);
RaftServerConfigKeys.Log.setPurgeGap(properties, logPurgeGap);
// For grpc set the maximum message size
// TODO: calculate the optimal max message size
GrpcConfigKeys.setMessageSizeMax(properties,
SizeInBytes.valueOf(logAppenderQueueByteLimit));
// Set the server request timeout
TimeUnit serverRequestTimeoutUnit =
OMConfigKeys.OZONE_OM_RATIS_SERVER_REQUEST_TIMEOUT_DEFAULT.getUnit();
long serverRequestTimeoutDuration = conf.getTimeDuration(
OMConfigKeys.OZONE_OM_RATIS_SERVER_REQUEST_TIMEOUT_KEY,
OMConfigKeys.OZONE_OM_RATIS_SERVER_REQUEST_TIMEOUT_DEFAULT
.getDuration(), serverRequestTimeoutUnit);
final TimeDuration serverRequestTimeout = TimeDuration.valueOf(
serverRequestTimeoutDuration, serverRequestTimeoutUnit);
RaftServerConfigKeys.Rpc.setRequestTimeout(properties,
serverRequestTimeout);
// Set timeout for server retry cache entry
TimeUnit retryCacheTimeoutUnit = OMConfigKeys
.OZONE_OM_RATIS_SERVER_RETRY_CACHE_TIMEOUT_DEFAULT.getUnit();
long retryCacheTimeoutDuration = conf.getTimeDuration(
OMConfigKeys.OZONE_OM_RATIS_SERVER_RETRY_CACHE_TIMEOUT_KEY,
OMConfigKeys.OZONE_OM_RATIS_SERVER_RETRY_CACHE_TIMEOUT_DEFAULT
.getDuration(), retryCacheTimeoutUnit);
final TimeDuration retryCacheTimeout = TimeDuration.valueOf(
retryCacheTimeoutDuration, retryCacheTimeoutUnit);
RaftServerConfigKeys.RetryCache.setExpiryTime(properties,
retryCacheTimeout);
// Set the server min and max timeout
TimeUnit serverMinTimeoutUnit =
OMConfigKeys.OZONE_OM_RATIS_MINIMUM_TIMEOUT_DEFAULT.getUnit();
long serverMinTimeoutDuration = conf.getTimeDuration(
OMConfigKeys.OZONE_OM_RATIS_MINIMUM_TIMEOUT_KEY,
OMConfigKeys.OZONE_OM_RATIS_MINIMUM_TIMEOUT_DEFAULT
.getDuration(), serverMinTimeoutUnit);
final TimeDuration serverMinTimeout = TimeDuration.valueOf(
serverMinTimeoutDuration, serverMinTimeoutUnit);
long serverMaxTimeoutDuration =
serverMinTimeout.toLong(TimeUnit.MILLISECONDS) + 200;
final TimeDuration serverMaxTimeout = TimeDuration.valueOf(
serverMaxTimeoutDuration, TimeUnit.MILLISECONDS);
RaftServerConfigKeys.Rpc.setTimeoutMin(properties,
serverMinTimeout);
RaftServerConfigKeys.Rpc.setTimeoutMax(properties,
serverMaxTimeout);
// Set the number of maximum cached segments
RaftServerConfigKeys.Log.setSegmentCacheNumMax(properties, 2);
// TODO: set max write buffer size
TimeUnit nodeFailureTimeoutUnit =
OMConfigKeys.OZONE_OM_RATIS_SERVER_FAILURE_TIMEOUT_DURATION_DEFAULT
.getUnit();
long nodeFailureTimeoutDuration = conf.getTimeDuration(
OMConfigKeys.OZONE_OM_RATIS_SERVER_FAILURE_TIMEOUT_DURATION_KEY,
OMConfigKeys.OZONE_OM_RATIS_SERVER_FAILURE_TIMEOUT_DURATION_DEFAULT
.getDuration(), nodeFailureTimeoutUnit);
final TimeDuration nodeFailureTimeout = TimeDuration.valueOf(
nodeFailureTimeoutDuration, nodeFailureTimeoutUnit);
RaftServerConfigKeys.Notification.setNoLeaderTimeout(properties,
nodeFailureTimeout);
RaftServerConfigKeys.Rpc.setSlownessTimeout(properties,
nodeFailureTimeout);
// Set auto trigger snapshot. We don't need to configure auto trigger
// threshold in OM, as last applied index is flushed during double buffer
// flush automatically. (But added this property internally, so that this
// helps during testing, when want to trigger snapshots frequently, and
// which will purge logs when purge gap condition is satisfied and which
// will trigger installSnapshot when logs are cleaned up.)
// The transaction info value in OM DB is used as
// snapshot value after restart.
RaftServerConfigKeys.Snapshot.setAutoTriggerEnabled(
properties, true);
long snapshotAutoTriggerThreshold = conf.getLong(
OMConfigKeys.OZONE_OM_RATIS_SNAPSHOT_AUTO_TRIGGER_THRESHOLD_KEY,
OMConfigKeys.OZONE_OM_RATIS_SNAPSHOT_AUTO_TRIGGER_THRESHOLD_DEFAULT);
RaftServerConfigKeys.Snapshot.setAutoTriggerThreshold(properties,
snapshotAutoTriggerThreshold);
createRaftServerProperties(conf, properties);
return properties;
}
private void createRaftServerProperties(ConfigurationSource ozoneConf,
RaftProperties raftProperties) {
Map<String, String> ratisServerConf =
getOMHAConfigs(ozoneConf);
ratisServerConf.forEach((key, val) -> {
raftProperties.set(key, val);
});
}
private static Map<String, String> getOMHAConfigs(
ConfigurationSource configuration) {
return configuration
.getPropsMatchPrefixAndTrimPrefix(OZONE_OM_HA_PREFIX + ".");
}
public RaftPeer getLeader() throws IOException {
RaftServer.Division division = server.getDivision(raftGroupId);
if (division.getInfo().isLeader()) {
return division.getPeer();
} else {
ByteString leaderId = division.getInfo().getRoleInfoProto()
.getFollowerInfo().getLeaderInfo().getId().getId();
return leaderId.isEmpty() ? null :
division.getRaftConf().getPeer(RaftPeerId.valueOf(leaderId));
}
}
/**
* Defines RaftServer Status.
*/
public enum RaftServerStatus {
NOT_LEADER,
LEADER_AND_NOT_READY,
LEADER_AND_READY;
}
/**
* Check Leader status and return the state of the RaftServer.
*
* @return RaftServerStatus.
*/
public RaftServerStatus checkLeaderStatus() {
try {
RaftServer.Division division = server.getDivision(raftGroupId);
if (division != null) {
if (!division.getInfo().isLeader()) {
return RaftServerStatus.NOT_LEADER;
} else if (division.getInfo().isLeaderReady()) {
return RaftServerStatus.LEADER_AND_READY;
} else {
return RaftServerStatus.LEADER_AND_NOT_READY;
}
}
} catch (IOException ioe) {
// In this case we return not a leader.
LOG.error("Fail to get RaftServer impl and therefore it's not clear " +
"whether it's leader. ", ioe);
}
return RaftServerStatus.NOT_LEADER;
}
/**
* Get list of peer NodeIds from Ratis.
* @return List of Peer NodeId's.
*/
@VisibleForTesting
public List<String> getCurrentPeersFromRaftConf() throws IOException {
try {
Collection<RaftPeer> currentPeers =
server.getDivision(raftGroupId).getRaftConf().getCurrentPeers();
List<String> currentPeerList = new ArrayList<>();
currentPeers.forEach(e -> currentPeerList.add(e.getId().toString()));
return currentPeerList;
} catch (IOException e) {
// In this case we return not a leader.
throw new IOException("Failed to get peer information from Ratis.", e);
}
}
public int getServerPort() {
return port;
}
@VisibleForTesting
public LifeCycle.State getServerState() {
return server.getLifeCycleState();
}
@VisibleForTesting
public RaftPeerId getRaftPeerId() {
return this.raftPeerId;
}
public static UUID getRaftGroupIdFromOmServiceId(String omServiceId) {
return UUID.nameUUIDFromBytes(omServiceId.getBytes(StandardCharsets.UTF_8));
}
public String getRatisStorageDir() {
return ratisStorageDir;
}
public TermIndex getLastAppliedTermIndex() {
return omStateMachine.getLastAppliedTermIndex();
}
public RaftGroupId getRaftGroupId() {
return raftGroupId;
}
private static Parameters createServerTlsParameters(SecurityConfig conf,
CertificateClient caClient) throws IOException {
if (conf.isSecurityEnabled() && conf.isGrpcTlsEnabled()) {
List<X509Certificate> caList = HAUtils.buildCAX509List(caClient,
conf.getConfiguration());
GrpcTlsConfig config = new GrpcTlsConfig(
caClient.getPrivateKey(), caClient.getCertificate(),
caList, true);
return RatisHelper.setServerTlsConf(config);
}
return null;
}
}