blob: bdd7a200ca911db1cec23353de5aa0571bdde090 [file] [log] [blame]
/**
* Licensed to the Apache Software Foundation (ASF) under one
* or more contributor license agreements. See the NOTICE file
* distributed with this work for additional information
* regarding copyright ownership. The ASF licenses this file
* to you under the Apache License, Version 2.0 (the
* "License"); you may not use this file except in compliance
* with the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/
package org.apache.hadoop.hdds.scm.proxy;
import com.google.common.annotations.VisibleForTesting;
import org.apache.hadoop.conf.Configuration;
import org.apache.hadoop.hdds.conf.ConfigurationException;
import org.apache.hadoop.hdds.conf.ConfigurationSource;
import org.apache.hadoop.hdds.scm.ha.SCMNodeInfo;
import org.apache.hadoop.hdds.scm.protocolPB.ScmBlockLocationProtocolPB;
import org.apache.hadoop.hdds.utils.LegacyHadoopConfigurationSource;
import org.apache.hadoop.io.retry.FailoverProxyProvider;
import org.apache.hadoop.io.retry.RetryPolicies;
import org.apache.hadoop.io.retry.RetryPolicy;
import org.apache.hadoop.io.retry.RetryPolicy.RetryAction;
import org.apache.hadoop.ipc.ProtobufRpcEngine;
import org.apache.hadoop.ipc.RPC;
import org.apache.hadoop.net.NetUtils;
import org.apache.hadoop.security.UserGroupInformation;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
import java.io.Closeable;
import java.io.IOException;
import java.net.InetSocketAddress;
import java.util.ArrayList;
import java.util.HashMap;
import java.util.List;
import java.util.Map;
import static org.apache.hadoop.ozone.OzoneConsts.SCM_DUMMY_SERVICE_ID;
/**
* Failover proxy provider for SCM block location.
*/
public class SCMBlockLocationFailoverProxyProvider implements
FailoverProxyProvider<ScmBlockLocationProtocolPB>, Closeable {
public static final Logger LOG =
LoggerFactory.getLogger(SCMBlockLocationFailoverProxyProvider.class);
private Map<String, ProxyInfo<ScmBlockLocationProtocolPB>> scmProxies;
private Map<String, SCMProxyInfo> scmProxyInfoMap;
private List<String> scmNodeIds;
private String currentProxySCMNodeId;
private int currentProxyIndex;
private final ConfigurationSource conf;
private final long scmVersion;
private String scmServiceId;
private String lastAttemptedLeader;
private final int maxRetryCount;
private final long retryInterval;
private final UserGroupInformation ugi;
public SCMBlockLocationFailoverProxyProvider(ConfigurationSource conf) {
this.conf = conf;
this.scmVersion = RPC.getProtocolVersion(ScmBlockLocationProtocolPB.class);
try {
this.ugi = UserGroupInformation.getCurrentUser();
} catch (IOException ex) {
LOG.error("Unable to fetch user credentials from UGI", ex);
throw new RuntimeException(ex);
}
// Set some constant for non-HA.
if (scmServiceId == null) {
scmServiceId = SCM_DUMMY_SERVICE_ID;
}
this.scmProxies = new HashMap<>();
this.scmProxyInfoMap = new HashMap<>();
loadConfigs();
this.currentProxyIndex = 0;
currentProxySCMNodeId = scmNodeIds.get(currentProxyIndex);
SCMClientConfig config = conf.getObject(SCMClientConfig.class);
this.maxRetryCount = config.getRetryCount();
this.retryInterval = config.getRetryInterval();
}
private void loadConfigs() {
scmNodeIds = new ArrayList<>();
List<SCMNodeInfo> scmNodeInfoList = SCMNodeInfo.buildNodeInfo(conf);
for (SCMNodeInfo scmNodeInfo : scmNodeInfoList) {
if (scmNodeInfo.getBlockClientAddress() == null) {
throw new ConfigurationException("SCM BlockClient Address could not " +
"be obtained from config. Config is not properly defined");
} else {
InetSocketAddress scmBlockClientAddress =
NetUtils.createSocketAddr(scmNodeInfo.getBlockClientAddress());
scmServiceId = scmNodeInfo.getServiceId();
String scmNodeId = scmNodeInfo.getNodeId();
scmNodeIds.add(scmNodeId);
SCMProxyInfo scmProxyInfo = new SCMProxyInfo(
scmNodeInfo.getServiceId(), scmNodeInfo.getNodeId(),
scmBlockClientAddress);
ProxyInfo<ScmBlockLocationProtocolPB> proxy
= new ProxyInfo<>(null, scmProxyInfo.toString());
scmProxies.put(scmNodeId, proxy);
scmProxyInfoMap.put(scmNodeId, scmProxyInfo);
}
}
}
@VisibleForTesting
public synchronized String getCurrentProxyOMNodeId() {
return currentProxySCMNodeId;
}
@Override
public synchronized ProxyInfo getProxy() {
ProxyInfo currentProxyInfo = scmProxies.get(currentProxySCMNodeId);
createSCMProxyIfNeeded(currentProxyInfo, currentProxySCMNodeId);
return currentProxyInfo;
}
@Override
public void performFailover(ScmBlockLocationProtocolPB newLeader) {
// Should do nothing here.
LOG.debug("Failing over to next proxy. {}", getCurrentProxyOMNodeId());
}
public void performFailoverToAssignedLeader(String newLeader) {
if (newLeader == null) {
// If newLeader is not assigned, it will fail over to next proxy.
nextProxyIndex();
} else {
if (!assignLeaderToNode(newLeader)) {
LOG.debug("Failing over SCM proxy to nodeId: {}", newLeader);
nextProxyIndex();
}
}
}
@Override
public Class<ScmBlockLocationProtocolPB> getInterface() {
return ScmBlockLocationProtocolPB.class;
}
@Override
public synchronized void close() throws IOException {
for (ProxyInfo<ScmBlockLocationProtocolPB> proxy : scmProxies.values()) {
ScmBlockLocationProtocolPB scmProxy = proxy.proxy;
if (scmProxy != null) {
RPC.stopProxy(scmProxy);
}
}
}
public RetryAction getRetryAction(int failovers) {
if (failovers < maxRetryCount) {
return new RetryAction(RetryAction.RetryDecision.FAILOVER_AND_RETRY,
getRetryInterval());
} else {
return RetryAction.FAIL;
}
}
private synchronized long getRetryInterval() {
// TODO add exponential backup
return retryInterval;
}
private synchronized int nextProxyIndex() {
lastAttemptedLeader = currentProxySCMNodeId;
// round robin the next proxy
currentProxyIndex = (currentProxyIndex + 1) % scmProxies.size();
currentProxySCMNodeId = scmNodeIds.get(currentProxyIndex);
return currentProxyIndex;
}
private synchronized boolean assignLeaderToNode(String newLeaderNodeId) {
if (!currentProxySCMNodeId.equals(newLeaderNodeId)) {
if (scmProxies.containsKey(newLeaderNodeId)) {
lastAttemptedLeader = currentProxySCMNodeId;
currentProxySCMNodeId = newLeaderNodeId;
currentProxyIndex = scmNodeIds.indexOf(currentProxySCMNodeId);
return true;
}
} else {
lastAttemptedLeader = currentProxySCMNodeId;
}
return false;
}
/**
* Creates proxy object if it does not already exist.
*/
private void createSCMProxyIfNeeded(ProxyInfo proxyInfo,
String nodeId) {
if (proxyInfo.proxy == null) {
InetSocketAddress address = scmProxyInfoMap.get(nodeId).getAddress();
try {
ScmBlockLocationProtocolPB proxy = createSCMProxy(address);
try {
proxyInfo.proxy = proxy;
} catch (IllegalAccessError iae) {
scmProxies.put(nodeId,
new ProxyInfo<>(proxy, proxyInfo.proxyInfo));
}
} catch (IOException ioe) {
LOG.error("{} Failed to create RPC proxy to SCM at {}",
this.getClass().getSimpleName(), address, ioe);
throw new RuntimeException(ioe);
}
}
}
private ScmBlockLocationProtocolPB createSCMProxy(
InetSocketAddress scmAddress) throws IOException {
Configuration hadoopConf =
LegacyHadoopConfigurationSource.asHadoopConfiguration(conf);
RPC.setProtocolEngine(hadoopConf, ScmBlockLocationProtocolPB.class,
ProtobufRpcEngine.class);
// FailoverOnNetworkException ensures that the IPC layer does not attempt
// retries on the same OM in case of connection exception. This retry
// policy essentially results in TRY_ONCE_THEN_FAIL.
RetryPolicy connectionRetryPolicy = RetryPolicies
.failoverOnNetworkException(0);
return RPC.getProtocolProxy(ScmBlockLocationProtocolPB.class, scmVersion,
scmAddress, ugi, hadoopConf,
NetUtils.getDefaultSocketFactory(hadoopConf),
(int)conf.getObject(SCMClientConfig.class).getRpcTimeOut(),
connectionRetryPolicy).getProxy();
}
public RetryPolicy getSCMBlockLocationRetryPolicy(String newLeader) {
RetryPolicy retryPolicy = new RetryPolicy() {
@Override
public RetryAction shouldRetry(Exception e, int retry,
int failover, boolean b) {
performFailoverToAssignedLeader(newLeader);
return getRetryAction(failover);
}
};
return retryPolicy;
}
}