blob: d6d2be6ed8e654419a769854a69c202c1bb70aa8 [file] [log] [blame]
/*
* Licensed to the Apache Software Foundation (ASF) under one
* or more contributor license agreements. See the NOTICE file
* distributed with this work for additional information
* regarding copyright ownership. The ASF licenses this file
* to you under the Apache License, Version 2.0 (the
* "License"); you may not use this file except in compliance
* with the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/
package org.apache.hadoop.ozone.om.ratis;
import java.io.File;
import java.io.IOException;
import java.net.InetSocketAddress;
import java.nio.charset.StandardCharsets;
import java.nio.file.Paths;
import java.util.ArrayList;
import java.util.Collections;
import java.util.List;
import java.util.Optional;
import java.util.UUID;
import java.util.concurrent.Executors;
import java.util.concurrent.ScheduledExecutorService;
import java.util.concurrent.TimeUnit;
import java.util.concurrent.atomic.AtomicLong;
import java.util.concurrent.locks.ReentrantReadWriteLock;
import org.apache.hadoop.hdds.conf.ConfigurationSource;
import org.apache.hadoop.hdds.conf.StorageUnit;
import org.apache.hadoop.hdds.server.ServerUtils;
import org.apache.hadoop.hdds.tracing.TracingUtil;
import org.apache.hadoop.ozone.om.OMConfigKeys;
import org.apache.hadoop.ozone.om.OzoneManager;
import org.apache.hadoop.ozone.om.exceptions.OMException;
import org.apache.hadoop.ozone.om.exceptions.OMLeaderNotReadyException;
import org.apache.hadoop.ozone.om.exceptions.OMNotLeaderException;
import org.apache.hadoop.ozone.om.ha.OMNodeDetails;
import org.apache.hadoop.ozone.om.helpers.OMRatisHelper;
import org.apache.hadoop.ozone.protocol.proto.OzoneManagerProtocolProtos;
import org.apache.hadoop.ozone.protocol.proto.OzoneManagerProtocolProtos.OMRequest;
import org.apache.hadoop.ozone.protocol.proto.OzoneManagerProtocolProtos.OMResponse;
import com.google.common.annotations.VisibleForTesting;
import com.google.common.base.Strings;
import com.google.protobuf.InvalidProtocolBufferException;
import com.google.protobuf.ServiceException;
import org.apache.ratis.RaftConfigKeys;
import org.apache.ratis.conf.RaftProperties;
import org.apache.ratis.grpc.GrpcConfigKeys;
import org.apache.ratis.netty.NettyConfigKeys;
import org.apache.ratis.proto.RaftProtos.RaftPeerRole;
import org.apache.ratis.proto.RaftProtos.RoleInfoProto;
import org.apache.ratis.protocol.ClientId;
import org.apache.ratis.protocol.GroupInfoReply;
import org.apache.ratis.protocol.GroupInfoRequest;
import org.apache.ratis.protocol.LeaderNotReadyException;
import org.apache.ratis.protocol.Message;
import org.apache.ratis.protocol.NotLeaderException;
import org.apache.ratis.protocol.RaftClientReply;
import org.apache.ratis.protocol.RaftClientRequest;
import org.apache.ratis.protocol.RaftGroup;
import org.apache.ratis.protocol.RaftGroupId;
import org.apache.ratis.protocol.RaftPeer;
import org.apache.ratis.protocol.RaftPeerId;
import org.apache.ratis.protocol.StateMachineException;
import org.apache.ratis.rpc.RpcType;
import org.apache.ratis.rpc.SupportedRpcType;
import org.apache.ratis.server.RaftServer;
import org.apache.ratis.server.RaftServerConfigKeys;
import org.apache.ratis.server.protocol.TermIndex;
import org.apache.ratis.thirdparty.com.google.protobuf.ByteString;
import org.apache.ratis.util.LifeCycle;
import org.apache.ratis.util.SizeInBytes;
import org.apache.ratis.util.StringUtils;
import org.apache.ratis.util.TimeDuration;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
/**
* Creates a Ratis server endpoint for OM.
*/
public final class OzoneManagerRatisServer {
private static final Logger LOG = LoggerFactory
.getLogger(OzoneManagerRatisServer.class);
private final int port;
private final InetSocketAddress omRatisAddress;
private final RaftServer server;
private final RaftGroupId raftGroupId;
private final RaftGroup raftGroup;
private final RaftPeerId raftPeerId;
private final OzoneManager ozoneManager;
private final OzoneManagerStateMachine omStateMachine;
private final ClientId clientId = ClientId.randomId();
private final ScheduledExecutorService scheduledRoleChecker;
private long roleCheckInitialDelayMs = 1000; // 1 second default
private long roleCheckIntervalMs;
private ReentrantReadWriteLock roleCheckLock = new ReentrantReadWriteLock();
private Optional<RaftPeerRole> cachedPeerRole = Optional.empty();
private Optional<RaftPeerId> cachedLeaderPeerId = Optional.empty();
private static final AtomicLong CALL_ID_COUNTER = new AtomicLong();
private static long nextCallId() {
return CALL_ID_COUNTER.getAndIncrement() & Long.MAX_VALUE;
}
/**
* Submit request to Ratis server.
* @param omRequest
* @return OMResponse - response returned to the client.
* @throws ServiceException
*/
public OMResponse submitRequest(OMRequest omRequest) throws ServiceException {
RaftClientRequest raftClientRequest =
createWriteRaftClientRequest(omRequest);
RaftClientReply raftClientReply;
try {
raftClientReply = server.submitClientRequestAsync(raftClientRequest)
.get();
} catch (Exception ex) {
throw new ServiceException(ex.getMessage(), ex);
}
return processReply(omRequest, raftClientReply);
}
/**
* Create Write RaftClient request from OMRequest.
* @param omRequest
* @return RaftClientRequest - Raft Client request which is submitted to
* ratis server.
*/
private RaftClientRequest createWriteRaftClientRequest(OMRequest omRequest) {
return new RaftClientRequest(clientId, server.getId(), raftGroupId,
nextCallId(),
Message.valueOf(OMRatisHelper.convertRequestToByteString(omRequest)),
RaftClientRequest.writeRequestType(), null);
}
/**
* Process the raftClientReply and return OMResponse.
* @param omRequest
* @param reply
* @return OMResponse - response which is returned to client.
* @throws ServiceException
*/
private OMResponse processReply(OMRequest omRequest, RaftClientReply reply)
throws ServiceException {
// NotLeader exception is thrown only when the raft server to which the
// request is submitted is not the leader. This can happen first time
// when client is submitting request to OM.
if (!reply.isSuccess()) {
NotLeaderException notLeaderException = reply.getNotLeaderException();
if (notLeaderException != null) {
throw new ServiceException(
OMNotLeaderException.convertToOMNotLeaderException(
notLeaderException, getRaftPeerId()));
}
LeaderNotReadyException leaderNotReadyException =
reply.getLeaderNotReadyException();
if (leaderNotReadyException != null) {
throw new ServiceException(new OMLeaderNotReadyException(
leaderNotReadyException.getMessage()));
}
StateMachineException stateMachineException =
reply.getStateMachineException();
if (stateMachineException != null) {
OMResponse.Builder omResponse = OMResponse.newBuilder()
.setCmdType(omRequest.getCmdType())
.setSuccess(false)
.setTraceID(omRequest.getTraceID());
if (stateMachineException.getCause() != null) {
omResponse.setMessage(stateMachineException.getCause().getMessage());
omResponse.setStatus(
exceptionToResponseStatus(stateMachineException.getCause()));
} else {
// Current Ratis is setting cause, this is an safer side check.
LOG.error("StateMachine exception cause is not set");
omResponse.setStatus(
OzoneManagerProtocolProtos.Status.INTERNAL_ERROR);
omResponse.setMessage(
StringUtils.stringifyException(stateMachineException));
}
if (LOG.isDebugEnabled()) {
LOG.debug("Error while executing ratis request. " +
"stateMachineException: ", stateMachineException);
}
return omResponse.build();
}
}
try {
return OMRatisHelper.getOMResponseFromRaftClientReply(reply);
} catch (InvalidProtocolBufferException ex) {
if (ex.getMessage() != null) {
throw new ServiceException(ex.getMessage(), ex);
} else {
throw new ServiceException(ex);
}
}
// TODO: Still need to handle RaftRetry failure exception and
// NotReplicated exception.
}
/**
* Convert exception to {@link OzoneManagerProtocolProtos.Status}.
* @param cause - Cause from stateMachine exception
* @return {@link OzoneManagerProtocolProtos.Status}
*/
private OzoneManagerProtocolProtos.Status exceptionToResponseStatus(
Throwable cause) {
if (cause instanceof OMException) {
return OzoneManagerProtocolProtos.Status.values()[
((OMException) cause).getResult().ordinal()];
} else {
LOG.error("Unknown error occurs", cause);
return OzoneManagerProtocolProtos.Status.INTERNAL_ERROR;
}
}
/**
* Returns an OM Ratis server.
* @param conf configuration
* @param om the OM instance starting the ratis server
* @param raftGroupIdStr raft group id string
* @param localRaftPeerId raft peer id of this Ratis server
* @param addr address of the ratis server
* @param raftPeers peer nodes in the raft ring
* @throws IOException
*/
private OzoneManagerRatisServer(ConfigurationSource conf,
OzoneManager om,
String raftGroupIdStr, RaftPeerId localRaftPeerId,
InetSocketAddress addr, List<RaftPeer> raftPeers)
throws IOException {
this.ozoneManager = om;
this.omRatisAddress = addr;
this.port = addr.getPort();
RaftProperties serverProperties = newRaftProperties(conf);
this.raftPeerId = localRaftPeerId;
this.raftGroupId = RaftGroupId.valueOf(
getRaftGroupIdFromOmServiceId(raftGroupIdStr));
this.raftGroup = RaftGroup.valueOf(raftGroupId, raftPeers);
StringBuilder raftPeersStr = new StringBuilder();
for (RaftPeer peer : raftPeers) {
raftPeersStr.append(", ").append(peer.getAddress());
}
LOG.info("Instantiating OM Ratis server with GroupID: {} and " +
"Raft Peers: {}", raftGroupIdStr, raftPeersStr.toString().substring(2));
this.omStateMachine = getStateMachine(conf);
this.server = RaftServer.newBuilder()
.setServerId(this.raftPeerId)
.setGroup(this.raftGroup)
.setProperties(serverProperties)
.setStateMachine(omStateMachine)
.build();
// Run a scheduler to check and update the server role on the leader
// periodically
this.scheduledRoleChecker = Executors.newSingleThreadScheduledExecutor();
this.scheduledRoleChecker.scheduleWithFixedDelay(new Runnable() {
@Override
public void run() {
// Run this check only on the leader OM
if (cachedPeerRole.isPresent() &&
cachedPeerRole.get() == RaftPeerRole.LEADER) {
updateServerRole();
}
}
}, roleCheckInitialDelayMs, roleCheckIntervalMs, TimeUnit.MILLISECONDS);
}
/**
* Creates an instance of OzoneManagerRatisServer.
*/
public static OzoneManagerRatisServer newOMRatisServer(
ConfigurationSource ozoneConf, OzoneManager omProtocol,
OMNodeDetails omNodeDetails, List<OMNodeDetails> peerNodes)
throws IOException {
// RaftGroupId is the omServiceId
String omServiceId = omNodeDetails.getOMServiceId();
String omNodeId = omNodeDetails.getOMNodeId();
RaftPeerId localRaftPeerId = RaftPeerId.getRaftPeerId(omNodeId);
InetSocketAddress ratisAddr = new InetSocketAddress(
omNodeDetails.getInetAddress(), omNodeDetails.getRatisPort());
RaftPeer localRaftPeer = new RaftPeer(localRaftPeerId, ratisAddr);
List<RaftPeer> raftPeers = new ArrayList<>();
// Add this Ratis server to the Ratis ring
raftPeers.add(localRaftPeer);
for (OMNodeDetails peerInfo : peerNodes) {
String peerNodeId = peerInfo.getOMNodeId();
RaftPeerId raftPeerId = RaftPeerId.valueOf(peerNodeId);
RaftPeer raftPeer;
if (peerInfo.isHostUnresolved()) {
raftPeer = new RaftPeer(raftPeerId, peerInfo.getRatisHostPortStr());
} else {
InetSocketAddress peerRatisAddr = new InetSocketAddress(
peerInfo.getInetAddress(), peerInfo.getRatisPort());
raftPeer = new RaftPeer(raftPeerId, peerRatisAddr);
}
// Add other OM nodes belonging to the same OM service to the Ratis ring
raftPeers.add(raftPeer);
}
return new OzoneManagerRatisServer(ozoneConf, omProtocol, omServiceId,
localRaftPeerId, ratisAddr, raftPeers);
}
public RaftGroup getRaftGroup() {
return this.raftGroup;
}
/**
* Initializes and returns OzoneManager StateMachine.
*/
private OzoneManagerStateMachine getStateMachine(ConfigurationSource conf)
throws IOException {
return new OzoneManagerStateMachine(this,
TracingUtil.isTracingEnabled(conf));
}
@VisibleForTesting
public OzoneManagerStateMachine getOmStateMachine() {
return omStateMachine;
}
public OzoneManager getOzoneManager() {
return ozoneManager;
}
/**
* Start the Ratis server.
* @throws IOException
*/
public void start() throws IOException {
LOG.info("Starting {} {} at port {}", getClass().getSimpleName(),
server.getId(), port);
server.start();
}
public void stop() {
try {
server.close();
omStateMachine.stop();
} catch (IOException e) {
throw new RuntimeException(e);
}
}
//TODO simplify it to make it shorter
@SuppressWarnings("methodlength")
private RaftProperties newRaftProperties(ConfigurationSource conf) {
final RaftProperties properties = new RaftProperties();
// Set RPC type
final String rpcType = conf.get(
OMConfigKeys.OZONE_OM_RATIS_RPC_TYPE_KEY,
OMConfigKeys.OZONE_OM_RATIS_RPC_TYPE_DEFAULT);
final RpcType rpc = SupportedRpcType.valueOfIgnoreCase(rpcType);
RaftConfigKeys.Rpc.setType(properties, rpc);
// Set the ratis port number
if (rpc == SupportedRpcType.GRPC) {
GrpcConfigKeys.Server.setPort(properties, port);
} else if (rpc == SupportedRpcType.NETTY) {
NettyConfigKeys.Server.setPort(properties, port);
}
// Set Ratis storage directory
String storageDir = OzoneManagerRatisServer.getOMRatisDirectory(conf);
RaftServerConfigKeys.setStorageDir(properties,
Collections.singletonList(new File(storageDir)));
// Set RAFT segment size
final int raftSegmentSize = (int) conf.getStorageSize(
OMConfigKeys.OZONE_OM_RATIS_SEGMENT_SIZE_KEY,
OMConfigKeys.OZONE_OM_RATIS_SEGMENT_SIZE_DEFAULT,
StorageUnit.BYTES);
RaftServerConfigKeys.Log.setSegmentSizeMax(properties,
SizeInBytes.valueOf(raftSegmentSize));
RaftServerConfigKeys.Log.setPurgeUptoSnapshotIndex(properties, true);
// Set RAFT segment pre-allocated size
final int raftSegmentPreallocatedSize = (int) conf.getStorageSize(
OMConfigKeys.OZONE_OM_RATIS_SEGMENT_PREALLOCATED_SIZE_KEY,
OMConfigKeys.OZONE_OM_RATIS_SEGMENT_PREALLOCATED_SIZE_DEFAULT,
StorageUnit.BYTES);
int logAppenderQueueNumElements = conf.getInt(
OMConfigKeys.OZONE_OM_RATIS_LOG_APPENDER_QUEUE_NUM_ELEMENTS,
OMConfigKeys.OZONE_OM_RATIS_LOG_APPENDER_QUEUE_NUM_ELEMENTS_DEFAULT);
final int logAppenderQueueByteLimit = (int) conf.getStorageSize(
OMConfigKeys.OZONE_OM_RATIS_LOG_APPENDER_QUEUE_BYTE_LIMIT,
OMConfigKeys.OZONE_OM_RATIS_LOG_APPENDER_QUEUE_BYTE_LIMIT_DEFAULT,
StorageUnit.BYTES);
RaftServerConfigKeys.Log.Appender.setBufferElementLimit(properties,
logAppenderQueueNumElements);
RaftServerConfigKeys.Log.Appender.setBufferByteLimit(properties,
SizeInBytes.valueOf(logAppenderQueueByteLimit));
RaftServerConfigKeys.Log.setPreallocatedSize(properties,
SizeInBytes.valueOf(raftSegmentPreallocatedSize));
RaftServerConfigKeys.Log.Appender.setInstallSnapshotEnabled(properties,
false);
final int logPurgeGap = conf.getInt(
OMConfigKeys.OZONE_OM_RATIS_LOG_PURGE_GAP,
OMConfigKeys.OZONE_OM_RATIS_LOG_PURGE_GAP_DEFAULT);
RaftServerConfigKeys.Log.setPurgeGap(properties, logPurgeGap);
// For grpc set the maximum message size
// TODO: calculate the optimal max message size
GrpcConfigKeys.setMessageSizeMax(properties,
SizeInBytes.valueOf(logAppenderQueueByteLimit));
// Set the server request timeout
TimeUnit serverRequestTimeoutUnit =
OMConfigKeys.OZONE_OM_RATIS_SERVER_REQUEST_TIMEOUT_DEFAULT.getUnit();
long serverRequestTimeoutDuration = conf.getTimeDuration(
OMConfigKeys.OZONE_OM_RATIS_SERVER_REQUEST_TIMEOUT_KEY,
OMConfigKeys.OZONE_OM_RATIS_SERVER_REQUEST_TIMEOUT_DEFAULT
.getDuration(), serverRequestTimeoutUnit);
final TimeDuration serverRequestTimeout = TimeDuration.valueOf(
serverRequestTimeoutDuration, serverRequestTimeoutUnit);
RaftServerConfigKeys.Rpc.setRequestTimeout(properties,
serverRequestTimeout);
// Set timeout for server retry cache entry
TimeUnit retryCacheTimeoutUnit = OMConfigKeys
.OZONE_OM_RATIS_SERVER_RETRY_CACHE_TIMEOUT_DEFAULT.getUnit();
long retryCacheTimeoutDuration = conf.getTimeDuration(
OMConfigKeys.OZONE_OM_RATIS_SERVER_RETRY_CACHE_TIMEOUT_KEY,
OMConfigKeys.OZONE_OM_RATIS_SERVER_RETRY_CACHE_TIMEOUT_DEFAULT
.getDuration(), retryCacheTimeoutUnit);
final TimeDuration retryCacheTimeout = TimeDuration.valueOf(
retryCacheTimeoutDuration, retryCacheTimeoutUnit);
RaftServerConfigKeys.RetryCache.setExpiryTime(properties,
retryCacheTimeout);
// Set the server min and max timeout
TimeUnit serverMinTimeoutUnit =
OMConfigKeys.OZONE_OM_RATIS_MINIMUM_TIMEOUT_DEFAULT.getUnit();
long serverMinTimeoutDuration = conf.getTimeDuration(
OMConfigKeys.OZONE_OM_RATIS_MINIMUM_TIMEOUT_KEY,
OMConfigKeys.OZONE_OM_RATIS_MINIMUM_TIMEOUT_DEFAULT
.getDuration(), serverMinTimeoutUnit);
final TimeDuration serverMinTimeout = TimeDuration.valueOf(
serverMinTimeoutDuration, serverMinTimeoutUnit);
long serverMaxTimeoutDuration =
serverMinTimeout.toLong(TimeUnit.MILLISECONDS) + 200;
final TimeDuration serverMaxTimeout = TimeDuration.valueOf(
serverMaxTimeoutDuration, serverMinTimeoutUnit);
RaftServerConfigKeys.Rpc.setTimeoutMin(properties,
serverMinTimeout);
RaftServerConfigKeys.Rpc.setTimeoutMax(properties,
serverMaxTimeout);
// Set the number of maximum cached segments
RaftServerConfigKeys.Log.setSegmentCacheNumMax(properties, 2);
// TODO: set max write buffer size
// Set the ratis leader election timeout
TimeUnit leaderElectionMinTimeoutUnit =
OMConfigKeys.OZONE_OM_LEADER_ELECTION_MINIMUM_TIMEOUT_DURATION_DEFAULT
.getUnit();
long leaderElectionMinTimeoutduration = conf.getTimeDuration(
OMConfigKeys.OZONE_OM_LEADER_ELECTION_MINIMUM_TIMEOUT_DURATION_KEY,
OMConfigKeys.OZONE_OM_LEADER_ELECTION_MINIMUM_TIMEOUT_DURATION_DEFAULT
.getDuration(), leaderElectionMinTimeoutUnit);
final TimeDuration leaderElectionMinTimeout = TimeDuration.valueOf(
leaderElectionMinTimeoutduration, leaderElectionMinTimeoutUnit);
RaftServerConfigKeys.Rpc.setTimeoutMin(properties,
leaderElectionMinTimeout);
long leaderElectionMaxTimeout = leaderElectionMinTimeout.toLong(
TimeUnit.MILLISECONDS) + 200;
RaftServerConfigKeys.Rpc.setTimeoutMax(properties,
TimeDuration.valueOf(leaderElectionMaxTimeout, TimeUnit.MILLISECONDS));
TimeUnit nodeFailureTimeoutUnit =
OMConfigKeys.OZONE_OM_RATIS_SERVER_FAILURE_TIMEOUT_DURATION_DEFAULT
.getUnit();
long nodeFailureTimeoutDuration = conf.getTimeDuration(
OMConfigKeys.OZONE_OM_RATIS_SERVER_FAILURE_TIMEOUT_DURATION_KEY,
OMConfigKeys.OZONE_OM_RATIS_SERVER_FAILURE_TIMEOUT_DURATION_DEFAULT
.getDuration(), nodeFailureTimeoutUnit);
final TimeDuration nodeFailureTimeout = TimeDuration.valueOf(
nodeFailureTimeoutDuration, nodeFailureTimeoutUnit);
RaftServerConfigKeys.Notification.setNoLeaderTimeout(properties,
nodeFailureTimeout);
RaftServerConfigKeys.Rpc.setSlownessTimeout(properties,
nodeFailureTimeout);
TimeUnit roleCheckIntervalUnit =
OMConfigKeys.OZONE_OM_RATIS_SERVER_ROLE_CHECK_INTERVAL_DEFAULT
.getUnit();
long roleCheckIntervalDuration = conf.getTimeDuration(
OMConfigKeys.OZONE_OM_RATIS_SERVER_ROLE_CHECK_INTERVAL_KEY,
OMConfigKeys.OZONE_OM_RATIS_SERVER_ROLE_CHECK_INTERVAL_DEFAULT
.getDuration(), nodeFailureTimeoutUnit);
this.roleCheckIntervalMs = TimeDuration.valueOf(
roleCheckIntervalDuration, roleCheckIntervalUnit)
.toLong(TimeUnit.MILLISECONDS);
this.roleCheckInitialDelayMs = leaderElectionMinTimeout
.toLong(TimeUnit.MILLISECONDS);
// Set auto trigger snapshot. We don't need to configure auto trigger
// threshold in OM, as last applied index is flushed during double buffer
// flush automatically. (But added this property internally, so that this
// helps during testing, when want to trigger snapshots frequently, and
// which will purge logs when purge gap condition is satisfied and which
// will trigger installSnapshot when logs are cleaned up.)
// The transaction info value in OM DB is used as
// snapshot value after restart.
RaftServerConfigKeys.Snapshot.setAutoTriggerEnabled(
properties, true);
long snapshotAutoTriggerThreshold = conf.getLong(
OMConfigKeys.OZONE_OM_RATIS_SNAPSHOT_AUTO_TRIGGER_THRESHOLD_KEY,
OMConfigKeys.OZONE_OM_RATIS_SNAPSHOT_AUTO_TRIGGER_THRESHOLD_DEFAULT);
RaftServerConfigKeys.Snapshot.setAutoTriggerThreshold(properties,
snapshotAutoTriggerThreshold);
return properties;
}
/**
* Check the cached leader status.
* @return true if cached role is Leader, false otherwise.
*/
private boolean checkCachedPeerRoleIsLeader() {
this.roleCheckLock.readLock().lock();
try {
if (cachedPeerRole.isPresent() &&
cachedPeerRole.get() == RaftPeerRole.LEADER) {
return true;
}
return false;
} finally {
this.roleCheckLock.readLock().unlock();
}
}
/**
* Check if the current OM node is the leader node.
* @return true if Leader, false otherwise.
*/
public boolean isLeader() {
if (checkCachedPeerRoleIsLeader()) {
return true;
}
// Get the server role from ratis server and update the cached values.
updateServerRole();
// After updating the server role, check and return if leader or not.
return checkCachedPeerRoleIsLeader();
}
/**
* Get the suggested leader peer id.
* @return RaftPeerId of the suggested leader node.
*/
public Optional<RaftPeerId> getCachedLeaderPeerId() {
this.roleCheckLock.readLock().lock();
try {
return cachedLeaderPeerId;
} finally {
this.roleCheckLock.readLock().unlock();
}
}
/**
* Get the gorup info (peer role and leader peer id) from Ratis server and
* update the OM server role.
*/
public void updateServerRole() {
try {
GroupInfoReply groupInfo = getGroupInfo();
RoleInfoProto roleInfoProto = groupInfo.getRoleInfoProto();
RaftPeerRole thisNodeRole = roleInfoProto.getRole();
if (thisNodeRole.equals(RaftPeerRole.LEADER)) {
setServerRole(thisNodeRole, raftPeerId);
} else if (thisNodeRole.equals(RaftPeerRole.FOLLOWER)) {
ByteString leaderNodeId = roleInfoProto.getFollowerInfo()
.getLeaderInfo().getId().getId();
// There may be a chance, here we get leaderNodeId as null. For
// example, in 3 node OM Ratis, if 2 OM nodes are down, there will
// be no leader.
RaftPeerId leaderPeerId = null;
if (leaderNodeId != null && !leaderNodeId.isEmpty()) {
leaderPeerId = RaftPeerId.valueOf(leaderNodeId);
}
setServerRole(thisNodeRole, leaderPeerId);
} else {
setServerRole(thisNodeRole, null);
}
} catch (IOException e) {
LOG.error("Failed to retrieve RaftPeerRole. Setting cached role to " +
"{} and resetting leader info.", RaftPeerRole.UNRECOGNIZED, e);
setServerRole(null, null);
}
}
/**
* Set the current server role and the leader peer id.
*/
private void setServerRole(RaftPeerRole currentRole,
RaftPeerId leaderPeerId) {
this.roleCheckLock.writeLock().lock();
try {
this.cachedPeerRole = Optional.ofNullable(currentRole);
this.cachedLeaderPeerId = Optional.ofNullable(leaderPeerId);
} finally {
this.roleCheckLock.writeLock().unlock();
}
}
private GroupInfoReply getGroupInfo() throws IOException {
GroupInfoRequest groupInfoRequest = new GroupInfoRequest(clientId,
raftPeerId, raftGroupId, nextCallId());
GroupInfoReply groupInfo = server.getGroupInfo(groupInfoRequest);
return groupInfo;
}
public int getServerPort() {
return port;
}
@VisibleForTesting
public LifeCycle.State getServerState() {
return server.getLifeCycleState();
}
@VisibleForTesting
public RaftPeerId getRaftPeerId() {
return this.raftPeerId;
}
private UUID getRaftGroupIdFromOmServiceId(String omServiceId) {
return UUID.nameUUIDFromBytes(omServiceId.getBytes(StandardCharsets.UTF_8));
}
/**
* Get the local directory where ratis logs will be stored.
*/
public static String getOMRatisDirectory(ConfigurationSource conf) {
String storageDir = conf.get(OMConfigKeys.OZONE_OM_RATIS_STORAGE_DIR);
if (Strings.isNullOrEmpty(storageDir)) {
storageDir = ServerUtils.getDefaultRatisDirectory(conf);
}
return storageDir;
}
public static String getOMRatisSnapshotDirectory(ConfigurationSource conf) {
String snapshotDir = conf.get(OMConfigKeys.OZONE_OM_RATIS_SNAPSHOT_DIR);
if (Strings.isNullOrEmpty(snapshotDir)) {
snapshotDir = Paths.get(getOMRatisDirectory(conf),
"snapshot").toString();
}
return snapshotDir;
}
public TermIndex getLastAppliedTermIndex() {
return omStateMachine.getLastAppliedTermIndex();
}
}