blob: e5ab3ab27707a2dfd4c139867a0c17b5fdbd8017 [file] [log] [blame]
/**
* Licensed to the Apache Software Foundation (ASF) under one
* or more contributor license agreements. See the NOTICE file
* distributed with this work for additional information
* regarding copyright ownership. The ASF licenses this file
* to you under the Apache License, Version 2.0 (the
* "License"); you may not use this file except in compliance
* with the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/
package org.apache.hadoop.hdfs.server.federation.router;
import static org.apache.hadoop.hdfs.server.federation.FederationTestUtils.simulateSlowNamenode;
import static org.apache.hadoop.test.GenericTestUtils.assertExceptionContains;
import static org.apache.hadoop.test.GenericTestUtils.waitFor;
import static org.junit.Assert.assertEquals;
import static org.junit.Assert.assertNotEquals;
import static org.junit.Assert.assertTrue;
import static org.junit.Assert.fail;
import java.io.IOException;
import java.util.List;
import java.util.concurrent.TimeUnit;
import org.apache.hadoop.conf.Configuration;
import org.apache.hadoop.fs.CommonConfigurationKeys;
import org.apache.hadoop.fs.permission.FsPermission;
import org.apache.hadoop.hdfs.DFSClient;
import org.apache.hadoop.hdfs.MiniDFSCluster;
import org.apache.hadoop.hdfs.protocol.ClientProtocol;
import org.apache.hadoop.hdfs.server.federation.MiniRouterDFSCluster.NamenodeContext;
import org.apache.hadoop.hdfs.server.federation.MiniRouterDFSCluster.RouterContext;
import org.apache.hadoop.hdfs.server.federation.RouterConfigBuilder;
import org.apache.hadoop.hdfs.server.federation.StateStoreDFSCluster;
import org.apache.hadoop.hdfs.server.federation.metrics.FederationRPCMetrics;
import org.apache.hadoop.hdfs.server.federation.metrics.NamenodeBeanMetrics;
import org.apache.hadoop.hdfs.server.federation.resolver.FederationNamenodeContext;
import org.apache.hadoop.hdfs.server.federation.resolver.MembershipNamenodeResolver;
import org.apache.hadoop.hdfs.server.federation.resolver.NamenodeStatusReport;
import org.apache.hadoop.hdfs.server.namenode.NameNode;
import org.apache.hadoop.ipc.RemoteException;
import org.codehaus.jettison.json.JSONException;
import org.codehaus.jettison.json.JSONObject;
import org.junit.After;
import org.junit.Before;
import org.junit.Rule;
import org.junit.Test;
import org.junit.rules.Timeout;
import com.google.common.base.Supplier;
/**
* Test retry behavior of the Router RPC Client.
*/
public class TestRouterRPCClientRetries {
private static StateStoreDFSCluster cluster;
private static NamenodeContext nnContext1;
private static RouterContext routerContext;
private static MembershipNamenodeResolver resolver;
private static ClientProtocol routerProtocol;
@Rule
public final Timeout testTimeout = new Timeout(100000);
@Before
public void setUp() throws Exception {
// Build and start a federated cluster
cluster = new StateStoreDFSCluster(false, 2);
Configuration routerConf = new RouterConfigBuilder()
.stateStore()
.metrics()
.admin()
.rpc()
.build();
routerConf.setTimeDuration(
NamenodeBeanMetrics.DN_REPORT_CACHE_EXPIRE, 1, TimeUnit.SECONDS);
// reduce IPC client connection retry times and interval time
Configuration clientConf = new Configuration(false);
clientConf.setInt(
CommonConfigurationKeys.IPC_CLIENT_CONNECT_MAX_RETRIES_KEY, 1);
clientConf.setInt(
CommonConfigurationKeys.IPC_CLIENT_CONNECT_RETRY_INTERVAL_KEY, 100);
// Set the DNs to belong to only one subcluster
cluster.setIndependentDNs();
cluster.addRouterOverrides(routerConf);
// override some settings for the client
cluster.startCluster(clientConf);
cluster.startRouters();
cluster.waitClusterUp();
nnContext1 = cluster.getNamenode(cluster.getNameservices().get(0), null);
routerContext = cluster.getRandomRouter();
resolver = (MembershipNamenodeResolver) routerContext.getRouter()
.getNamenodeResolver();
routerProtocol = routerContext.getClient().getNamenode();
}
@After
public void tearDown() {
if (cluster != null) {
cluster.stopRouter(routerContext);
cluster.shutdown();
cluster = null;
}
}
@Test
public void testRetryWhenAllNameServiceDown() throws Exception {
// shutdown the dfs cluster
MiniDFSCluster dfsCluster = cluster.getCluster();
dfsCluster.shutdown();
// register an invalid namenode report
registerInvalidNameReport();
// Create a directory via the router
String dirPath = "/testRetryWhenClusterisDown";
FsPermission permission = new FsPermission("705");
try {
routerProtocol.mkdirs(dirPath, permission, false);
fail("Should have thrown RemoteException error.");
} catch (RemoteException e) {
String ns0 = cluster.getNameservices().get(0);
assertExceptionContains(
"No namenode available under nameservice " + ns0, e);
}
// Verify the retry times, it should only retry one time.
FederationRPCMetrics rpcMetrics = routerContext.getRouter()
.getRpcServer().getRPCMetrics();
assertEquals(1, rpcMetrics.getProxyOpRetries());
}
@Test
public void testRetryWhenOneNameServiceDown() throws Exception {
// shutdown the dfs cluster
MiniDFSCluster dfsCluster = cluster.getCluster();
dfsCluster.shutdownNameNode(0);
// register an invalid namenode report
registerInvalidNameReport();
DFSClient client = nnContext1.getClient();
// Renew lease for the DFS client, it will succeed.
routerProtocol.renewLease(client.getClientName());
// Verify the retry times, it will retry one time for ns0.
FederationRPCMetrics rpcMetrics = routerContext.getRouter()
.getRpcServer().getRPCMetrics();
assertEquals(1, rpcMetrics.getProxyOpRetries());
}
/**
* Register an invalid namenode report.
* @throws IOException
*/
private void registerInvalidNameReport() throws IOException {
String ns0 = cluster.getNameservices().get(0);
List<? extends FederationNamenodeContext> origin = resolver
.getNamenodesForNameserviceId(ns0);
FederationNamenodeContext nnInfo = origin.get(0);
NamenodeStatusReport report = new NamenodeStatusReport(ns0,
nnInfo.getNamenodeId(), nnInfo.getRpcAddress(),
nnInfo.getServiceAddress(), nnInfo.getLifelineAddress(),
nnInfo.getWebAddress());
report.setRegistrationValid(false);
assertTrue(resolver.registerNamenode(report));
resolver.loadCache(true);
}
@Test
public void testNamenodeMetricsSlow() throws Exception {
final Router router = routerContext.getRouter();
final NamenodeBeanMetrics metrics = router.getNamenodeMetrics();
// Initially, there are 4 DNs in total
final String jsonString0 = metrics.getLiveNodes();
assertEquals(4, getNumDatanodes(jsonString0));
// The response should be cached
assertEquals(jsonString0, metrics.getLiveNodes());
// Check that the cached value gets updated eventually
waitUpdateLiveNodes(jsonString0, metrics);
final String jsonString2 = metrics.getLiveNodes();
assertNotEquals(jsonString0, jsonString2);
assertEquals(4, getNumDatanodes(jsonString2));
// Making subcluster0 slow to reply, should only get DNs from nn1
MiniDFSCluster dfsCluster = cluster.getCluster();
NameNode nn0 = dfsCluster.getNameNode(0);
simulateSlowNamenode(nn0, 3);
waitUpdateLiveNodes(jsonString2, metrics);
final String jsonString3 = metrics.getLiveNodes();
assertEquals(2, getNumDatanodes(jsonString3));
// Making subcluster1 slow to reply, shouldn't get any DNs
NameNode nn1 = dfsCluster.getNameNode(1);
simulateSlowNamenode(nn1, 3);
waitUpdateLiveNodes(jsonString3, metrics);
final String jsonString4 = metrics.getLiveNodes();
assertEquals(0, getNumDatanodes(jsonString4));
}
/**
* Get the number of nodes in a JSON string.
* @param jsonString JSON string containing nodes.
* @return Number of nodes.
* @throws JSONException If the JSON string is not properly formed.
*/
private static int getNumDatanodes(final String jsonString)
throws JSONException {
JSONObject jsonObject = new JSONObject(jsonString);
if (jsonObject.length() == 0) {
return 0;
}
return jsonObject.names().length();
}
/**
* Wait until the cached live nodes value is updated.
* @param oldValue Old cached value.
* @param metrics Namenode metrics beans to get the live nodes from.
* @throws Exception If it cannot wait.
*/
private static void waitUpdateLiveNodes(
final String oldValue, final NamenodeBeanMetrics metrics)
throws Exception {
waitFor(new Supplier<Boolean>() {
@Override
public Boolean get() {
return !oldValue.equals(metrics.getLiveNodes());
}
}, 500, 5 * 1000);
}
}