blob: 32684de5b73f29f79d78749bb047011138a75702 [file] [log] [blame]
/**
* Licensed to the Apache Software Foundation (ASF) under one
* or more contributor license agreements. See the NOTICE file
* distributed with this work for additional information
* regarding copyright ownership. The ASF licenses this file
* to you under the Apache License, Version 2.0 (the
* "License"); you may not use this file except in compliance
* with the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/
package org.apache.hadoop.ozone.om.ha;
import com.google.common.annotations.VisibleForTesting;
import org.apache.hadoop.conf.Configuration;
import org.apache.hadoop.hdds.conf.OzoneConfiguration;
import org.apache.hadoop.io.Text;
import org.apache.hadoop.io.retry.FailoverProxyProvider;
import org.apache.hadoop.ipc.Client;
import org.apache.hadoop.ipc.ProtobufRpcEngine;
import org.apache.hadoop.ipc.RPC;
import org.apache.hadoop.net.NetUtils;
import org.apache.hadoop.ozone.OmUtils;
import org.apache.hadoop.ozone.OzoneConsts;
import org.apache.hadoop.ozone.om.protocolPB.OzoneManagerProtocolPB;
import org.apache.hadoop.security.UserGroupInformation;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
import java.io.Closeable;
import java.io.IOException;
import java.net.InetSocketAddress;
import java.util.ArrayList;
import java.util.Collection;
import java.util.Collections;
import java.util.HashMap;
import java.util.List;
import java.util.Map;
import static org.apache.hadoop.ozone.om.OMConfigKeys.OZONE_OM_ADDRESS_KEY;
/**
* A failover proxy provider implementation which allows clients to configure
* multiple OMs to connect to. In case of OM failover, client can try
* connecting to another OM node from the list of proxies.
*/
public class OMFailoverProxyProvider implements
FailoverProxyProvider<OzoneManagerProtocolPB>, Closeable {
public static final Logger LOG =
LoggerFactory.getLogger(OMFailoverProxyProvider.class);
// Map of OMNodeID to its proxy
private Map<String, ProxyInfo<OzoneManagerProtocolPB>> omProxies;
private Map<String, OMProxyInfo> omProxyInfos;
private List<String> omNodeIDList;
private String currentProxyOMNodeId;
private int currentProxyIndex;
private final Configuration conf;
private final long omVersion;
private final UserGroupInformation ugi;
private final Text delegationTokenService;
private final String omServiceId;
public OMFailoverProxyProvider(OzoneConfiguration configuration,
UserGroupInformation ugi, String omServiceId) throws IOException {
this.conf = configuration;
this.omVersion = RPC.getProtocolVersion(OzoneManagerProtocolPB.class);
this.ugi = ugi;
this.omServiceId = omServiceId;
loadOMClientConfigs(conf, this.omServiceId);
this.delegationTokenService = computeDelegationTokenService();
currentProxyIndex = 0;
currentProxyOMNodeId = omNodeIDList.get(currentProxyIndex);
}
public OMFailoverProxyProvider(OzoneConfiguration configuration,
UserGroupInformation ugi) throws IOException {
this(configuration, ugi, null);
}
private void loadOMClientConfigs(Configuration config, String omSvcId)
throws IOException {
this.omProxies = new HashMap<>();
this.omProxyInfos = new HashMap<>();
this.omNodeIDList = new ArrayList<>();
Collection<String> omServiceIds = Collections.singletonList(omSvcId);
for (String serviceId : OmUtils.emptyAsSingletonNull(omServiceIds)) {
Collection<String> omNodeIds = OmUtils.getOMNodeIds(config, serviceId);
for (String nodeId : OmUtils.emptyAsSingletonNull(omNodeIds)) {
String rpcAddrKey = OmUtils.addKeySuffixes(OZONE_OM_ADDRESS_KEY,
serviceId, nodeId);
String rpcAddrStr = OmUtils.getOmRpcAddress(config, rpcAddrKey);
if (rpcAddrStr == null) {
continue;
}
OMProxyInfo omProxyInfo = new OMProxyInfo(nodeId, rpcAddrStr);
if (omProxyInfo.getAddress() != null) {
ProxyInfo<OzoneManagerProtocolPB> proxyInfo =
new ProxyInfo(null, omProxyInfo.toString());
// For a non-HA OM setup, nodeId might be null. If so, we assign it
// a dummy value
if (nodeId == null) {
nodeId = OzoneConsts.OM_NODE_ID_DUMMY;
}
omProxies.put(nodeId, proxyInfo);
omProxyInfos.put(nodeId, omProxyInfo);
omNodeIDList.add(nodeId);
} else {
LOG.error("Failed to create OM proxy for {} at address {}",
nodeId, rpcAddrStr);
}
}
}
if (omProxies.isEmpty()) {
throw new IllegalArgumentException("Could not find any configured " +
"addresses for OM. Please configure the system with "
+ OZONE_OM_ADDRESS_KEY);
}
}
@VisibleForTesting
public synchronized String getCurrentProxyOMNodeId() {
return currentProxyOMNodeId;
}
private OzoneManagerProtocolPB createOMProxy(InetSocketAddress omAddress)
throws IOException {
RPC.setProtocolEngine(conf, OzoneManagerProtocolPB.class,
ProtobufRpcEngine.class);
return RPC.getProxy(OzoneManagerProtocolPB.class, omVersion, omAddress, ugi,
conf, NetUtils.getDefaultSocketFactory(conf),
Client.getRpcTimeout(conf));
}
/**
* Get the proxy object which should be used until the next failover event
* occurs. RPC proxy object is intialized lazily.
* @return the OM proxy object to invoke methods upon
*/
@Override
public synchronized ProxyInfo getProxy() {
ProxyInfo currentProxyInfo = omProxies.get(currentProxyOMNodeId);
createOMProxyIfNeeded(currentProxyInfo, currentProxyOMNodeId);
return currentProxyInfo;
}
/**
* Creates proxy object if it does not already exist.
*/
private void createOMProxyIfNeeded(ProxyInfo proxyInfo,
String nodeId) {
if (proxyInfo.proxy == null) {
InetSocketAddress address = omProxyInfos.get(nodeId).getAddress();
try {
proxyInfo.proxy = createOMProxy(address);
} catch (IOException ioe) {
LOG.error("{} Failed to create RPC proxy to OM at {}",
this.getClass().getSimpleName(), address, ioe);
throw new RuntimeException(ioe);
}
}
}
public Text getCurrentProxyDelegationToken() {
return delegationTokenService;
}
private Text computeDelegationTokenService() {
// For HA, this will return "," separated address of all OM's.
StringBuilder rpcAddress = new StringBuilder();
int count = 0;
for (Map.Entry<String, OMProxyInfo> omProxyInfoSet :
omProxyInfos.entrySet()) {
count++;
rpcAddress =
rpcAddress.append(
omProxyInfoSet.getValue().getDelegationTokenService());
if (omProxyInfos.size() != count) {
rpcAddress.append(",");
}
}
return new Text(rpcAddress.toString());
}
/**
* Called whenever an error warrants failing over. It is determined by the
* retry policy.
*/
@Override
public void performFailover(OzoneManagerProtocolPB currentProxy) {
int newProxyIndex = incrementProxyIndex();
if (LOG.isDebugEnabled()) {
LOG.debug("Failing over OM proxy to index: {}, nodeId: {}",
newProxyIndex, omNodeIDList.get(newProxyIndex));
}
}
/**
* Update the proxy index to the next proxy in the list.
* @return the new proxy index
*/
private synchronized int incrementProxyIndex() {
currentProxyIndex = (currentProxyIndex + 1) % omProxies.size();
currentProxyOMNodeId = omNodeIDList.get(currentProxyIndex);
return currentProxyIndex;
}
@Override
public Class<OzoneManagerProtocolPB> getInterface() {
return OzoneManagerProtocolPB.class;
}
/**
* Performs failover if the leaderOMNodeId returned through OMReponse does
* not match the current leaderOMNodeId cached by the proxy provider.
*/
public void performFailoverIfRequired(String newLeaderOMNodeId) {
if (newLeaderOMNodeId == null) {
LOG.debug("No suggested leader nodeId. Performing failover to next peer" +
" node");
performFailover(null);
} else {
if (updateLeaderOMNodeId(newLeaderOMNodeId)) {
LOG.debug("Failing over OM proxy to nodeId: {}", newLeaderOMNodeId);
}
}
}
/**
* Failover to the OM proxy specified by the new leader OMNodeId.
* @param newLeaderOMNodeId OMNodeId to failover to.
* @return true if failover is successful, false otherwise.
*/
synchronized boolean updateLeaderOMNodeId(String newLeaderOMNodeId) {
if (!currentProxyOMNodeId.equals(newLeaderOMNodeId)) {
if (omProxies.containsKey(newLeaderOMNodeId)) {
currentProxyOMNodeId = newLeaderOMNodeId;
currentProxyIndex = omNodeIDList.indexOf(currentProxyOMNodeId);
return true;
}
}
return false;
}
/**
* Close all the proxy objects which have been opened over the lifetime of
* the proxy provider.
*/
@Override
public synchronized void close() throws IOException {
for (ProxyInfo<OzoneManagerProtocolPB> proxy : omProxies.values()) {
OzoneManagerProtocolPB omProxy = proxy.proxy;
if (omProxy != null) {
RPC.stopProxy(omProxy);
}
}
}
@VisibleForTesting
public List<ProxyInfo> getOMProxies() {
return new ArrayList<ProxyInfo>(omProxies.values());
}
@VisibleForTesting
public List<OMProxyInfo> getOMProxyInfos() {
return new ArrayList<OMProxyInfo>(omProxyInfos.values());
}
}