blob: 65eb13980a44c4113d50896868589ad0538a73b4 [file] [log] [blame]
/**
* Licensed to the Apache Software Foundation (ASF) under one
* or more contributor license agreements. See the NOTICE file
* distributed with this work for additional information
* regarding copyright ownership. The ASF licenses this file
* to you under the Apache License, Version 2.0 (the
* "License"); you may not use this file except in compliance
* with the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/
package org.apache.hadoop.ozone.om.ha;
import com.google.common.annotations.VisibleForTesting;
import com.google.protobuf.ServiceException;
import java.io.Closeable;
import java.io.IOException;
import java.net.InetSocketAddress;
import java.util.ArrayList;
import java.util.Collection;
import java.util.Collections;
import java.util.HashMap;
import java.util.HashSet;
import java.util.List;
import java.util.Map;
import java.util.Set;
import org.apache.hadoop.conf.Configuration;
import org.apache.hadoop.hdds.conf.ConfigurationSource;
import org.apache.hadoop.hdds.utils.LegacyHadoopConfigurationSource;
import org.apache.hadoop.io.Text;
import org.apache.hadoop.io.retry.FailoverProxyProvider;
import org.apache.hadoop.io.retry.RetryInvocationHandler;
import org.apache.hadoop.io.retry.RetryPolicies;
import org.apache.hadoop.io.retry.RetryPolicy;
import org.apache.hadoop.io.retry.RetryPolicy.RetryAction.RetryDecision;
import org.apache.hadoop.ipc.ProtobufRpcEngine;
import org.apache.hadoop.ipc.RPC;
import org.apache.hadoop.ipc.RemoteException;
import org.apache.hadoop.net.NetUtils;
import org.apache.hadoop.ozone.OmUtils;
import org.apache.hadoop.ozone.OzoneConfigKeys;
import org.apache.hadoop.ozone.OzoneConsts;
import org.apache.hadoop.ozone.om.exceptions.OMLeaderNotReadyException;
import org.apache.hadoop.ozone.om.exceptions.OMNotLeaderException;
import org.apache.hadoop.ozone.om.protocolPB.OzoneManagerProtocolClientSideTranslatorPB;
import org.apache.hadoop.ozone.om.protocolPB.OzoneManagerProtocolPB;
import org.apache.hadoop.security.UserGroupInformation;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
import static org.apache.hadoop.ozone.om.OMConfigKeys.OZONE_OM_ADDRESS_KEY;
/**
* A failover proxy provider implementation which allows clients to configure
* multiple OMs to connect to. In case of OM failover, client can try
* connecting to another OM node from the list of proxies.
*/
public class OMFailoverProxyProvider implements
FailoverProxyProvider<OzoneManagerProtocolPB>, Closeable {
public static final Logger LOG =
LoggerFactory.getLogger(OMFailoverProxyProvider.class);
// Map of OMNodeID to its proxy
private Map<String, ProxyInfo<OzoneManagerProtocolPB>> omProxies;
private Map<String, OMProxyInfo> omProxyInfos;
private List<String> omNodeIDList;
private String currentProxyOMNodeId;
private int currentProxyIndex;
private final ConfigurationSource conf;
private final long omVersion;
private final UserGroupInformation ugi;
private final Text delegationTokenService;
private final String omServiceId;
private List<String> retryExceptions = new ArrayList<>();
// OMFailoverProxyProvider, on encountering certain exception, tries each OM
// once in a round robin fashion. After that it waits for configured time
// before attempting to contact all the OMs again. For other exceptions
// such as LeaderNotReadyException, the same OM is contacted again with a
// linearly increasing wait time.
private Set<String> attemptedOMs = new HashSet<>();
private String lastAttemptedOM;
private int numAttemptsOnSameOM = 0;
private final long waitBetweenRetries;
private Set<String> accessControlExceptionOMs = new HashSet<>();
public OMFailoverProxyProvider(ConfigurationSource configuration,
UserGroupInformation ugi, String omServiceId) throws IOException {
this.conf = configuration;
this.omVersion = RPC.getProtocolVersion(OzoneManagerProtocolPB.class);
this.ugi = ugi;
this.omServiceId = omServiceId;
loadOMClientConfigs(conf, this.omServiceId);
this.delegationTokenService = computeDelegationTokenService();
currentProxyIndex = 0;
currentProxyOMNodeId = omNodeIDList.get(currentProxyIndex);
waitBetweenRetries = conf.getLong(
OzoneConfigKeys.OZONE_CLIENT_WAIT_BETWEEN_RETRIES_MILLIS_KEY,
OzoneConfigKeys.OZONE_CLIENT_WAIT_BETWEEN_RETRIES_MILLIS_DEFAULT);
}
protected void loadOMClientConfigs(ConfigurationSource config, String omSvcId)
throws IOException {
this.omProxies = new HashMap<>();
this.omProxyInfos = new HashMap<>();
this.omNodeIDList = new ArrayList<>();
Collection<String> omServiceIds = Collections.singletonList(omSvcId);
for (String serviceId : OmUtils.emptyAsSingletonNull(omServiceIds)) {
Collection<String> omNodeIds = OmUtils.getOMNodeIds(config, serviceId);
for (String nodeId : OmUtils.emptyAsSingletonNull(omNodeIds)) {
String rpcAddrKey = OmUtils.addKeySuffixes(OZONE_OM_ADDRESS_KEY,
serviceId, nodeId);
String rpcAddrStr = OmUtils.getOmRpcAddress(config, rpcAddrKey);
if (rpcAddrStr == null) {
continue;
}
OMProxyInfo omProxyInfo = new OMProxyInfo(serviceId, nodeId,
rpcAddrStr);
if (omProxyInfo.getAddress() != null) {
// For a non-HA OM setup, nodeId might be null. If so, we assign it
// a dummy value
if (nodeId == null) {
nodeId = OzoneConsts.OM_NODE_ID_DUMMY;
}
// ProxyInfo will be set during first time call to server.
omProxies.put(nodeId, null);
omProxyInfos.put(nodeId, omProxyInfo);
omNodeIDList.add(nodeId);
} else {
LOG.error("Failed to create OM proxy for {} at address {}",
nodeId, rpcAddrStr);
}
}
}
if (omProxies.isEmpty()) {
throw new IllegalArgumentException("Could not find any configured " +
"addresses for OM. Please configure the system with "
+ OZONE_OM_ADDRESS_KEY);
}
}
@VisibleForTesting
public synchronized String getCurrentProxyOMNodeId() {
return currentProxyOMNodeId;
}
private OzoneManagerProtocolPB createOMProxy(InetSocketAddress omAddress)
throws IOException {
Configuration hadoopConf =
LegacyHadoopConfigurationSource.asHadoopConfiguration(conf);
RPC.setProtocolEngine(hadoopConf, OzoneManagerProtocolPB.class,
ProtobufRpcEngine.class);
// FailoverOnNetworkException ensures that the IPC layer does not attempt
// retries on the same OM in case of connection exception. This retry
// policy essentially results in TRY_ONCE_THEN_FAIL.
RetryPolicy connectionRetryPolicy = RetryPolicies
.failoverOnNetworkException(0);
return RPC.getProtocolProxy(OzoneManagerProtocolPB.class, omVersion,
omAddress, ugi, hadoopConf, NetUtils.getDefaultSocketFactory(
hadoopConf), (int) OmUtils.getOMClientRpcTimeOut(conf),
connectionRetryPolicy).getProxy();
}
/**
* Get the proxy object which should be used until the next failover event
* occurs. RPC proxy object is intialized lazily.
* @return the OM proxy object to invoke methods upon
*/
@Override
public synchronized ProxyInfo getProxy() {
ProxyInfo currentProxyInfo = omProxies.get(currentProxyOMNodeId);
if (currentProxyInfo == null) {
currentProxyInfo = createOMProxy(currentProxyOMNodeId);
}
return currentProxyInfo;
}
/**
* Creates proxy object.
*/
protected ProxyInfo createOMProxy(String nodeId) {
OMProxyInfo omProxyInfo = omProxyInfos.get(nodeId);
InetSocketAddress address = omProxyInfo.getAddress();
ProxyInfo proxyInfo;
try {
OzoneManagerProtocolPB proxy = createOMProxy(address);
// Create proxyInfo here, to make it work with all Hadoop versions.
proxyInfo = new ProxyInfo<>(proxy, omProxyInfos.toString());
omProxies.put(nodeId, proxyInfo);
} catch (IOException ioe) {
LOG.error("{} Failed to create RPC proxy to OM at {}",
this.getClass().getSimpleName(), address, ioe);
throw new RuntimeException(ioe);
}
return proxyInfo;
}
@VisibleForTesting
public RetryPolicy getRetryPolicy(int maxFailovers) {
// Client will attempt upto maxFailovers number of failovers between
// available OMs before throwing exception.
RetryPolicy retryPolicy = new RetryPolicy() {
@Override
public RetryAction shouldRetry(Exception exception, int retries,
int failovers, boolean isIdempotentOrAtMostOnce)
throws Exception {
if (LOG.isDebugEnabled()) {
if (exception.getCause() != null) {
LOG.debug("RetryProxy: OM {}: {}: {}", getCurrentProxyOMNodeId(),
exception.getCause().getClass().getSimpleName(),
exception.getCause().getMessage());
} else {
LOG.debug("RetryProxy: OM {}: {}", getCurrentProxyOMNodeId(),
exception.getMessage());
}
}
retryExceptions.add(getExceptionMsg(exception, failovers));
if (exception instanceof ServiceException) {
OMNotLeaderException notLeaderException =
getNotLeaderException(exception);
if (notLeaderException != null) {
// TODO: NotLeaderException should include the host
// address of the suggested leader along with the nodeID.
// Failing over just based on nodeID is not very robust.
// OMFailoverProxyProvider#performFailover() is a dummy call and
// does not perform any failover. Failover manually to the next OM.
performFailoverToNextProxy();
return getRetryAction(RetryDecision.FAILOVER_AND_RETRY, failovers);
}
OMLeaderNotReadyException leaderNotReadyException =
getLeaderNotReadyException(exception);
if (leaderNotReadyException != null) {
// Retry on same OM again as leader OM is not ready.
// Failing over to same OM so that wait time between retries is
// incremented
performFailoverIfRequired(getCurrentProxyOMNodeId());
return getRetryAction(RetryDecision.FAILOVER_AND_RETRY, failovers);
}
}
if (!shouldFailover(exception)) {
return RetryAction.FAIL; // do not retry
}
// For all other exceptions, fail over manually to the next OM Node
// proxy.
performFailoverToNextProxy();
return getRetryAction(RetryDecision.FAILOVER_AND_RETRY, failovers);
}
private RetryAction getRetryAction(RetryDecision fallbackAction,
int failovers) {
if (failovers < maxFailovers) {
return new RetryAction(fallbackAction, getWaitTime());
} else {
StringBuilder allRetryExceptions = new StringBuilder();
allRetryExceptions.append("\n");
retryExceptions.stream().forEach(e -> allRetryExceptions.append(e)
.append("\n"));
LOG.error("Failed to connect to OMs: {}. Attempted {} failovers. " +
"Got following exceptions during retries: {}",
getOMProxyInfos(), maxFailovers,
allRetryExceptions.toString());
retryExceptions.clear();
return RetryAction.FAIL;
}
}
};
return retryPolicy;
}
public Text getCurrentProxyDelegationToken() {
return delegationTokenService;
}
protected Text computeDelegationTokenService() {
// For HA, this will return "," separated address of all OM's.
List<String> addresses = new ArrayList<>();
for (Map.Entry<String, OMProxyInfo> omProxyInfoSet :
omProxyInfos.entrySet()) {
Text dtService = omProxyInfoSet.getValue().getDelegationTokenService();
// During client object creation when one of the OM configured address
// in unreachable, dtService can be null.
if (dtService != null) {
addresses.add(dtService.toString());
}
}
if (!addresses.isEmpty()) {
Collections.sort(addresses);
return new Text(String.join(",", addresses));
} else {
// If all OM addresses are unresolvable, set dt service to null. Let
// this fail in later step when during connection setup.
return null;
}
}
@Override
public Class<OzoneManagerProtocolPB> getInterface() {
return OzoneManagerProtocolPB.class;
}
/**
* Called whenever an error warrants failing over. It is determined by the
* retry policy.
*
* This is a dummy call from {@link RetryInvocationHandler}. The actual
* failover should be performed using either
* {@link OMFailoverProxyProvider#performFailoverIfRequired(String)} or
* {@link OMFailoverProxyProvider#performFailoverToNextProxy()}.
*
* In {@link OzoneManagerProtocolClientSideTranslatorPB}, we first
* manually failover and then call the RetryAction FAILOVER_AND_RETRY. This
* is done because we do not want to always failover to the next proxy. If we
* get a OMNotLeaderException with a suggested leader, then we want to
* failover to that OM proxy instead. Hence, we failover manually and the
* {@link FailoverProxyProvider#performFailover(Object)} call should not do
* failover again.
*/
@Override
public void performFailover(OzoneManagerProtocolPB currentProxy) {
if (LOG.isDebugEnabled()) {
int currentIndex = getCurrentProxyIndex();
LOG.debug("Failing over OM proxy to index: {}, nodeId: {}",
currentIndex, omNodeIDList.get(currentIndex));
}
}
/**
* Performs failover if the leaderOMNodeId returned through OMReponse does
* not match the current leaderOMNodeId cached by the proxy provider.
*/
public void performFailoverIfRequired(String newLeaderOMNodeId) {
if (newLeaderOMNodeId == null) {
LOG.debug("No suggested leader nodeId. Performing failover to next peer" +
" node");
performFailoverToNextProxy();
} else {
if (updateLeaderOMNodeId(newLeaderOMNodeId)) {
LOG.debug("Failing over OM proxy to nodeId: {}", newLeaderOMNodeId);
}
}
}
/**
* Performs failover if the leaderOMNodeId returned through OMResponse does
* not match the current leaderOMNodeId cached by the proxy provider.
*/
public void performFailoverToNextProxy() {
int newProxyIndex = incrementProxyIndex();
if (LOG.isDebugEnabled()) {
LOG.debug("Incrementing OM proxy index to {}, nodeId: {}",
newProxyIndex, omNodeIDList.get(newProxyIndex));
}
}
/**
* Update the proxy index to the next proxy in the list.
* @return the new proxy index
*/
private synchronized int incrementProxyIndex() {
// Before failing over to next proxy, add the proxy OM (which has
// returned an exception) to the list of attemptedOMs.
lastAttemptedOM = currentProxyOMNodeId;
attemptedOMs.add(currentProxyOMNodeId);
currentProxyIndex = (currentProxyIndex + 1) % omProxies.size();
currentProxyOMNodeId = omNodeIDList.get(currentProxyIndex);
return currentProxyIndex;
}
/**
* Failover to the OM proxy specified by the new leader OMNodeId.
* @param newLeaderOMNodeId OMNodeId to failover to.
* @return true if failover is successful, false otherwise.
*/
synchronized boolean updateLeaderOMNodeId(String newLeaderOMNodeId) {
if (!currentProxyOMNodeId.equals(newLeaderOMNodeId)) {
if (omProxies.containsKey(newLeaderOMNodeId)) {
lastAttemptedOM = currentProxyOMNodeId;
currentProxyOMNodeId = newLeaderOMNodeId;
currentProxyIndex = omNodeIDList.indexOf(currentProxyOMNodeId);
return true;
}
} else {
lastAttemptedOM = currentProxyOMNodeId;
}
return false;
}
private synchronized int getCurrentProxyIndex() {
return currentProxyIndex;
}
public synchronized long getWaitTime() {
if (currentProxyOMNodeId.equals(lastAttemptedOM)) {
// Clear attemptedOMs list as round robin has been broken.
attemptedOMs.clear();
// The same OM will be contacted again. So wait and then retry.
numAttemptsOnSameOM++;
return (waitBetweenRetries * numAttemptsOnSameOM);
}
// Reset numAttemptsOnSameOM as we failed over to a different OM.
numAttemptsOnSameOM = 0;
// OMs are being contacted in round robin way. Check if all the OMs have
// been contacted in this attempt.
for (String omNodeID : omProxyInfos.keySet()) {
if (!attemptedOMs.contains(omNodeID)) {
return 0;
}
}
// This implies all the OMs have been contacted once. Return true and
// clear the list as we are going to inject a wait and the next check
// should not include these atttempts again.
attemptedOMs.clear();
return waitBetweenRetries;
}
public synchronized boolean shouldFailover(Exception ex) {
if (OmUtils.isAccessControlException(ex)) {
// Retry all available OMs once before failing with
// AccessControlException.
if (accessControlExceptionOMs.contains(currentProxyOMNodeId)) {
accessControlExceptionOMs.clear();
return false;
} else {
accessControlExceptionOMs.add(currentProxyOMNodeId);
if (accessControlExceptionOMs.containsAll(omNodeIDList)) {
return false;
}
}
}
return true;
}
/**
* Close all the proxy objects which have been opened over the lifetime of
* the proxy provider.
*/
@Override
public synchronized void close() throws IOException {
for (ProxyInfo<OzoneManagerProtocolPB> proxyInfo : omProxies.values()) {
if (proxyInfo != null) {
RPC.stopProxy(proxyInfo.proxy);
}
}
}
@VisibleForTesting
public List<ProxyInfo> getOMProxies() {
return new ArrayList<ProxyInfo>(omProxies.values());
}
@VisibleForTesting
public Map<String, ProxyInfo<OzoneManagerProtocolPB>> getOMProxyMap() {
return omProxies;
}
@VisibleForTesting
public List<OMProxyInfo> getOMProxyInfos() {
return new ArrayList<OMProxyInfo>(omProxyInfos.values());
}
private static String getExceptionMsg(Exception e, int retryAttempt) {
StringBuilder exceptionMsg = new StringBuilder()
.append("Retry Attempt ")
.append(retryAttempt)
.append(" Exception - ");
if (e.getCause() == null) {
exceptionMsg.append(e.getClass().getCanonicalName())
.append(": ")
.append(e.getMessage());
} else {
exceptionMsg.append(e.getCause().getClass().getCanonicalName())
.append(": ")
.append(e.getCause().getMessage());
}
return exceptionMsg.toString();
}
/**
* Check if exception is OMLeaderNotReadyException.
*
* @param exception
* @return OMLeaderNotReadyException
*/
private static OMLeaderNotReadyException getLeaderNotReadyException(
Exception exception) {
Throwable cause = exception.getCause();
if (cause instanceof RemoteException) {
IOException ioException =
((RemoteException) cause).unwrapRemoteException();
if (ioException instanceof OMLeaderNotReadyException) {
return (OMLeaderNotReadyException) ioException;
}
}
return null;
}
/**
* Check if exception is a OMNotLeaderException.
*
* @return OMNotLeaderException.
*/
public static OMNotLeaderException getNotLeaderException(
Exception exception) {
Throwable cause = exception.getCause();
if (cause instanceof RemoteException) {
IOException ioException =
((RemoteException) cause).unwrapRemoteException();
if (ioException instanceof OMNotLeaderException) {
return (OMNotLeaderException) ioException;
}
}
return null;
}
@VisibleForTesting
protected void setProxiesForTesting(
Map<String, ProxyInfo<OzoneManagerProtocolPB>> testOMProxies,
Map<String, OMProxyInfo> testOMProxyInfos,
List<String> testOMNodeIDList) {
this.omProxies = testOMProxies;
this.omProxyInfos = testOMProxyInfos;
this.omNodeIDList = testOMNodeIDList;
}
}