blob: a736d27b53a324c886c9597b5b607608b19e7c34 [file] [log] [blame]
/**
* Licensed to the Apache Software Foundation (ASF) under one
* or more contributor license agreements. See the NOTICE file
* distributed with this work for additional information
* regarding copyright ownership. The ASF licenses this file
* to you under the Apache License, Version 2.0 (the
* "License"); you may not use this file except in compliance
* with the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/
package org.apache.hadoop.hdfs.server.namenode.ha;
import java.io.IOException;
import java.net.URISyntaxException;
import org.apache.hadoop.conf.Configuration;
import org.apache.hadoop.fs.FileSystem;
import org.apache.hadoop.hdfs.DFSConfigKeys;
import org.apache.hadoop.hdfs.MiniDFSCluster;
import org.apache.hadoop.hdfs.MiniDFSNNTopology;
import org.apache.hadoop.hdfs.server.blockmanagement.BlockManagerTestUtil;
import org.apache.hadoop.hdfs.server.datanode.DataNode;
import org.apache.hadoop.hdfs.server.datanode.DataNodeTestUtils;
import org.apache.hadoop.hdfs.server.namenode.NameNode;
import org.apache.hadoop.test.MultithreadedTestUtil.RepeatingTestThread;
import org.apache.hadoop.test.MultithreadedTestUtil.TestContext;
/**
* Utility class to start an HA cluster, and then start threads
* to periodically fail back and forth, accelerate block deletion
* processing, etc.
*/
public class HAStressTestHarness {
final Configuration conf;
private MiniDFSCluster cluster;
static final int BLOCK_SIZE = 1024;
final TestContext testCtx = new TestContext();
private int nns = 2;
public HAStressTestHarness() {
conf = new Configuration();
conf.setInt(DFSConfigKeys.DFS_BLOCK_SIZE_KEY, BLOCK_SIZE);
conf.setInt(DFSConfigKeys.DFS_HEARTBEAT_INTERVAL_KEY, 1);
conf.setInt(DFSConfigKeys.DFS_HA_TAILEDITS_PERIOD_KEY, 1);
// Increase max streams so that we re-replicate quickly.
conf.setInt(DFSConfigKeys.DFS_NAMENODE_REPLICATION_MAX_STREAMS_KEY, 16);
conf.setInt(
DFSConfigKeys.DFS_NAMENODE_REPLICATION_STREAMS_HARD_LIMIT_KEY, 16);
}
/**
* Set the number of namenodes that should be run. This must be set before calling
* {@link #startCluster()}
*/
public void setNumberOfNameNodes(int nns) {
this.nns = nns;
}
/**
* Start and return the MiniDFSCluster.
*/
public MiniDFSCluster startCluster() throws IOException {
cluster = new MiniDFSCluster.Builder(conf)
.nnTopology(MiniDFSNNTopology.simpleHATopology(nns))
.numDataNodes(3)
.build();
return cluster;
}
/**
* Return a filesystem with client-failover configured for the
* cluster.
*/
public FileSystem getFailoverFs() throws IOException, URISyntaxException {
return HATestUtil.configureFailoverFs(cluster, conf);
}
/**
* Add a thread which periodically triggers deletion reports,
* heartbeats, and NN-side block work.
* @param interval millisecond period on which to run
*/
public void addReplicationTriggerThread(final int interval) {
testCtx.addThread(new RepeatingTestThread(testCtx) {
@Override
public void doAnAction() throws Exception {
for (DataNode dn : cluster.getDataNodes()) {
DataNodeTestUtils.triggerDeletionReport(dn);
DataNodeTestUtils.triggerHeartbeat(dn);
}
for (int i = 0; i < 2; i++) {
NameNode nn = cluster.getNameNode(i);
BlockManagerTestUtil.computeAllPendingWork(
nn.getNamesystem().getBlockManager());
}
Thread.sleep(interval);
}
});
}
/**
* Add a thread which periodically triggers failover back and forth between the namenodes.
*/
public void addFailoverThread(final int msBetweenFailovers) {
testCtx.addThread(new RepeatingTestThread(testCtx) {
@Override
public void doAnAction() throws Exception {
// fail over from one namenode to the next, all the way back to the original NN
for (int i = 0; i < nns; i++) {
// next node, mod nns so we wrap to the 0th NN on the last iteration
int next = (i + 1) % nns;
System.err.println("==============================\n"
+ "[Starting] Failing over from " + i + "->" + next + "\n"
+ "==============================");
cluster.transitionToStandby(i);
cluster.transitionToActive(next);
System.err.println("==============================\n"
+ "[Completed] Failing over from " + i + "->" + next + ". Sleeping for "+
(msBetweenFailovers/1000) +"sec \n"
+ "==============================");
Thread.sleep(msBetweenFailovers);
}
}
});
}
/**
* Start all of the threads which have been added.
*/
public void startThreads() {
this.testCtx.startThreads();
}
/**
* Stop threads, propagating any exceptions that might have been thrown.
*/
public void stopThreads() throws Exception {
this.testCtx.stop();
}
/**
* Shutdown the minicluster, as well as any of the running threads.
*/
public void shutdown() throws Exception {
this.testCtx.stop();
if (cluster != null) {
this.cluster.shutdown();
cluster = null;
}
}
}