blob: b0991a6a526309c4cded436d721c51fcb2a140d9 [file] [log] [blame]
* Licensed to the Apache Software Foundation (ASF) under one
* or more contributor license agreements. See the NOTICE file
* distributed with this work for additional information
* regarding copyright ownership. The ASF licenses this file
* to you under the Apache License, Version 2.0 (the
* "License"); you may not use this file except in compliance
* with the License. You may obtain a copy of the License at
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* See the License for the specific language governing permissions and
* limitations under the License.
import java.util.ArrayList;
import java.util.Collection;
import java.util.Collections;
import java.util.HashMap;
import java.util.HashSet;
import java.util.List;
import java.util.Map;
import java.util.Set;
import org.apache.hadoop.conf.Configuration;
import org.apache.hadoop.hdds.conf.ConfigurationSource;
import org.apache.hadoop.hdds.conf.OzoneConfiguration;
import org.apache.hadoop.hdds.utils.LegacyHadoopConfigurationSource;
import org.apache.hadoop.ipc.ProtobufRpcEngine;
import org.apache.hadoop.ipc.RPC;
import org.apache.hadoop.ozone.OmUtils;
import org.apache.hadoop.ozone.OzoneConfigKeys;
import org.apache.hadoop.ozone.OzoneConsts;
import static;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
* A failover proxy provider implementation which allows clients to configure
* multiple OMs to connect to. In case of OM failover, client can try
* connecting to another OM node from the list of proxies.
public class OMFailoverProxyProvider implements
FailoverProxyProvider<OzoneManagerProtocolPB>, Closeable {
public static final Logger LOG =
// Map of OMNodeID to its proxy
private Map<String, ProxyInfo<OzoneManagerProtocolPB>> omProxies;
private Map<String, OMProxyInfo> omProxyInfos;
private List<String> omNodeIDList;
private String currentProxyOMNodeId;
private int currentProxyIndex;
private final ConfigurationSource conf;
private final long omVersion;
private final UserGroupInformation ugi;
private final Text delegationTokenService;
private final String omServiceId;
// OMFailoverProxyProvider, on encountering certain exception, tries each OM
// once in a round robin fashion. After that it waits for configured time
// before attempting to contact all the OMs again. For other exceptions
// such as LeaderNotReadyException, the same OM is contacted again with a
// linearly increasing wait time.
private Set<String> attemptedOMs = new HashSet<>();
private String lastAttemptedOM;
private int numAttemptsOnSameOM = 0;
private final long waitBetweenRetries;
public OMFailoverProxyProvider(ConfigurationSource configuration,
UserGroupInformation ugi, String omServiceId) throws IOException {
this.conf = configuration;
this.omVersion = RPC.getProtocolVersion(OzoneManagerProtocolPB.class);
this.ugi = ugi;
this.omServiceId = omServiceId;
loadOMClientConfigs(conf, this.omServiceId);
this.delegationTokenService = computeDelegationTokenService();
currentProxyIndex = 0;
currentProxyOMNodeId = omNodeIDList.get(currentProxyIndex);
waitBetweenRetries = conf.getLong(
public OMFailoverProxyProvider(OzoneConfiguration configuration,
UserGroupInformation ugi) throws IOException {
this(configuration, ugi, null);
private void loadOMClientConfigs(ConfigurationSource config, String omSvcId)
throws IOException {
this.omProxies = new HashMap<>();
this.omProxyInfos = new HashMap<>();
this.omNodeIDList = new ArrayList<>();
Collection<String> omServiceIds = Collections.singletonList(omSvcId);
for (String serviceId : OmUtils.emptyAsSingletonNull(omServiceIds)) {
Collection<String> omNodeIds = OmUtils.getOMNodeIds(config, serviceId);
for (String nodeId : OmUtils.emptyAsSingletonNull(omNodeIds)) {
String rpcAddrKey = OmUtils.addKeySuffixes(OZONE_OM_ADDRESS_KEY,
serviceId, nodeId);
String rpcAddrStr = OmUtils.getOmRpcAddress(config, rpcAddrKey);
if (rpcAddrStr == null) {
OMProxyInfo omProxyInfo = new OMProxyInfo(serviceId, nodeId,
if (omProxyInfo.getAddress() != null) {
ProxyInfo<OzoneManagerProtocolPB> proxyInfo =
new ProxyInfo(null, omProxyInfo.toString());
// For a non-HA OM setup, nodeId might be null. If so, we assign it
// a dummy value
if (nodeId == null) {
nodeId = OzoneConsts.OM_NODE_ID_DUMMY;
omProxies.put(nodeId, proxyInfo);
omProxyInfos.put(nodeId, omProxyInfo);
} else {
LOG.error("Failed to create OM proxy for {} at address {}",
nodeId, rpcAddrStr);
if (omProxies.isEmpty()) {
throw new IllegalArgumentException("Could not find any configured " +
"addresses for OM. Please configure the system with "
public synchronized String getCurrentProxyOMNodeId() {
return currentProxyOMNodeId;
private OzoneManagerProtocolPB createOMProxy(InetSocketAddress omAddress)
throws IOException {
Configuration hadoopConf =
RPC.setProtocolEngine(hadoopConf, OzoneManagerProtocolPB.class,
return RPC.getProxy(OzoneManagerProtocolPB.class, omVersion, omAddress, ugi,
hadoopConf, NetUtils.getDefaultSocketFactory(hadoopConf),
(int) OmUtils.getOMClientRpcTimeOut(hadoopConf));
* Get the proxy object which should be used until the next failover event
* occurs. RPC proxy object is intialized lazily.
* @return the OM proxy object to invoke methods upon
public synchronized ProxyInfo getProxy() {
ProxyInfo currentProxyInfo = omProxies.get(currentProxyOMNodeId);
createOMProxyIfNeeded(currentProxyInfo, currentProxyOMNodeId);
return currentProxyInfo;
* Creates proxy object if it does not already exist.
private void createOMProxyIfNeeded(ProxyInfo proxyInfo,
String nodeId) {
if (proxyInfo.proxy == null) {
InetSocketAddress address = omProxyInfos.get(nodeId).getAddress();
try {
OzoneManagerProtocolPB proxy = createOMProxy(address);
try {
proxyInfo.proxy = proxy;
} catch (IllegalAccessError iae) {
new ProxyInfo<>(proxy, proxyInfo.proxyInfo));
} catch (IOException ioe) {
LOG.error("{} Failed to create RPC proxy to OM at {}",
this.getClass().getSimpleName(), address, ioe);
throw new RuntimeException(ioe);
public Text getCurrentProxyDelegationToken() {
return delegationTokenService;
private Text computeDelegationTokenService() {
// For HA, this will return "," separated address of all OM's.
List<String> addresses = new ArrayList<>();
for (Map.Entry<String, OMProxyInfo> omProxyInfoSet :
omProxyInfos.entrySet()) {
Text dtService = omProxyInfoSet.getValue().getDelegationTokenService();
// During client object creation when one of the OM configured address
// in unreachable, dtService can be null.
if (dtService != null) {
if (!addresses.isEmpty()) {
return new Text(String.join(",", addresses));
} else {
// If all OM addresses are unresolvable, set dt service to null. Let
// this fail in later step when during connection setup.
return null;
public Class<OzoneManagerProtocolPB> getInterface() {
return OzoneManagerProtocolPB.class;
* Called whenever an error warrants failing over. It is determined by the
* retry policy.
* This is a dummy call from {@link RetryInvocationHandler}. The actual
* failover should be performed using either
* {@link OMFailoverProxyProvider#performFailoverIfRequired(String)} or
* {@link OMFailoverProxyProvider#performFailoverToNextProxy()}.
* In {@link OzoneManagerProtocolClientSideTranslatorPB}, we first
* manually failover and then call the RetryAction FAILOVER_AND_RETRY. This
* is done because we do not want to always failover to the next proxy. If we
* get a OMNotLeaderException with a suggested leader, then we want to
* failover to that OM proxy instead. Hence, we failover manually and the
* {@link FailoverProxyProvider#performFailover(Object)} call should not do
* failover again.
public void performFailover(OzoneManagerProtocolPB currentProxy) {
if (LOG.isDebugEnabled()) {
int currentIndex = getCurrentProxyIndex();
LOG.debug("Failing over OM proxy to index: {}, nodeId: {}",
currentIndex, omNodeIDList.get(currentIndex));
* Performs failover if the leaderOMNodeId returned through OMReponse does
* not match the current leaderOMNodeId cached by the proxy provider.
public void performFailoverIfRequired(String newLeaderOMNodeId) {
if (newLeaderOMNodeId == null) {
LOG.debug("No suggested leader nodeId. Performing failover to next peer" +
" node");
} else {
if (updateLeaderOMNodeId(newLeaderOMNodeId)) {
LOG.debug("Failing over OM proxy to nodeId: {}", newLeaderOMNodeId);
* Performs failover if the leaderOMNodeId returned through OMResponse does
* not match the current leaderOMNodeId cached by the proxy provider.
public void performFailoverToNextProxy() {
int newProxyIndex = incrementProxyIndex();
if (LOG.isDebugEnabled()) {
LOG.debug("Incrementing OM proxy index to {}, nodeId: {}",
newProxyIndex, omNodeIDList.get(newProxyIndex));
* Update the proxy index to the next proxy in the list.
* @return the new proxy index
private synchronized int incrementProxyIndex() {
// Before failing over to next proxy, add the proxy OM (which has
// returned an exception) to the list of attemptedOMs.
lastAttemptedOM = currentProxyOMNodeId;
currentProxyIndex = (currentProxyIndex + 1) % omProxies.size();
currentProxyOMNodeId = omNodeIDList.get(currentProxyIndex);
return currentProxyIndex;
* Failover to the OM proxy specified by the new leader OMNodeId.
* @param newLeaderOMNodeId OMNodeId to failover to.
* @return true if failover is successful, false otherwise.
synchronized boolean updateLeaderOMNodeId(String newLeaderOMNodeId) {
if (!currentProxyOMNodeId.equals(newLeaderOMNodeId)) {
if (omProxies.containsKey(newLeaderOMNodeId)) {
lastAttemptedOM = currentProxyOMNodeId;
currentProxyOMNodeId = newLeaderOMNodeId;
currentProxyIndex = omNodeIDList.indexOf(currentProxyOMNodeId);
return true;
} else {
lastAttemptedOM = currentProxyOMNodeId;
return false;
private synchronized int getCurrentProxyIndex() {
return currentProxyIndex;
public synchronized long getWaitTime() {
if (currentProxyOMNodeId.equals(lastAttemptedOM)) {
// Clear attemptedOMs list as round robin has been broken. Add only the
// The same OM will be contacted again. So wait and then retry.
return (waitBetweenRetries * numAttemptsOnSameOM);
// Reset numAttemptsOnSameOM as we failed over to a different OM.
numAttemptsOnSameOM = 0;
// OMs are being contacted in round robin way. Check if all the OMs have
// been contacted in this attempt.
for (String omNodeID : omProxyInfos.keySet()) {
if (!attemptedOMs.contains(omNodeID)) {
return 0;
// This implies all the OMs have been contacted once. Return true and
// clear the list as we are going to inject a wait and the next check
// should not include these atttempts again.
return waitBetweenRetries;
* Close all the proxy objects which have been opened over the lifetime of
* the proxy provider.
public synchronized void close() throws IOException {
for (ProxyInfo<OzoneManagerProtocolPB> proxy : omProxies.values()) {
OzoneManagerProtocolPB omProxy = proxy.proxy;
if (omProxy != null) {
public List<ProxyInfo> getOMProxies() {
return new ArrayList<ProxyInfo>(omProxies.values());
public List<OMProxyInfo> getOMProxyInfos() {
return new ArrayList<OMProxyInfo>(omProxyInfos.values());