blob: 09c3a88064f7e9dd0f5f67882cfe663bc9e956e0 [file] [log] [blame]
/**
* Licensed to the Apache Software Foundation (ASF) under one
* or more contributor license agreements. See the NOTICE file
* distributed with this work for additional information
* regarding copyright ownership. The ASF licenses this file
* to you under the Apache License, Version 2.0 (the
* "License"); you may not use this file except in compliance
* with the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/
package org.apache.hadoop.dfs;
import java.io.*;
import java.net.*;
import java.util.ArrayList;
import java.util.Collection;
import org.apache.hadoop.conf.Configuration;
import org.apache.hadoop.dfs.FSConstants.StartupOption;
import org.apache.hadoop.fs.*;
/**
* This class creates a single-process DFS cluster for junit testing.
* The data directories for DFS are undering the testing directory.
*/
public class MiniDFSCluster {
private Configuration conf;
private NameNode nameNode;
private ArrayList<DataNode> dataNodes = new ArrayList<DataNode>();
private File base_dir;
private File data_dir;
/**
* Modify the config and start up the servers with the given operation.
* Servers will be started on free ports.
* <p>
* The caller must manage the creation of NameNode and DataNode directories
* and have already set dfs.name.dir and dfs.data.dir in the given conf.
*
* @param conf the base configuration to use in starting the servers. This
* will be modified as necessary.
* @param numDataNodes Number of DataNodes to start; may be zero
* @param nameNodeOperation the operation with which to start the servers. If null
* or StartupOption.FORMAT, then StartupOption.REGULAR will be used.
*/
public MiniDFSCluster(Configuration conf,
int numDataNodes,
StartupOption nameNodeOperation) throws IOException {
this(0, conf, numDataNodes, false, false, nameNodeOperation, null);
}
/**
* Modify the config and start up the servers. The rpc and info ports for
* servers are guaranteed to use free ports.
* <p>
* NameNode and DataNode directory creation and configuration will be
* managed by this class.
*
* @param conf the base configuration to use in starting the servers. This
* will be modified as necessary.
* @param numDataNodes Number of DataNodes to start; may be zero
* @param format if true, format the NameNode and DataNodes before starting up
* @param racks array of strings indicating the rack that each DataNode is on
*/
public MiniDFSCluster(Configuration conf,
int numDataNodes,
boolean format,
String[] racks) throws IOException {
this(0, conf, numDataNodes, format, true, null, racks);
}
/**
* NOTE: if possible, the other constructors should be used as they will
* ensure that the servers use free ports.
* <p>
* Modify the config and start up the servers.
*
* @param nameNodePort suggestion for which rpc port to use. caller should
* use getNameNodePort() to get the actual port used.
* @param conf the base configuration to use in starting the servers. This
* will be modified as necessary.
* @param numDataNodes Number of DataNodes to start; may be zero
* @param format if true, format the NameNode and DataNodes before starting up
* @param manageDfsDirs if true, the data directories for servers will be
* created and dfs.name.dir and dfs.data.dir will be set in the conf
* @param operation the operation with which to start the servers. If null
* or StartupOption.FORMAT, then StartupOption.REGULAR will be used.
* @param racks array of strings indicating the rack that each DataNode is on
*/
public MiniDFSCluster(int nameNodePort,
Configuration conf,
int numDataNodes,
boolean format,
boolean manageDfsDirs,
StartupOption operation,
String[] racks) throws IOException {
this.conf = conf;
base_dir = new File(System.getProperty("test.build.data"), "dfs/");
data_dir = new File(base_dir, "data");
// Setup the NameNode configuration
conf.set("fs.default.name", "localhost:"+ Integer.toString(nameNodePort));
conf.setInt("dfs.info.port", 0);
if (manageDfsDirs) {
conf.set("dfs.name.dir", new File(base_dir, "name1").getPath()+","+
new File(base_dir, "name2").getPath());
}
conf.setInt("dfs.replication", Math.min(3, numDataNodes));
conf.setInt("dfs.safemode.extension", 0);
conf.setInt("dfs.namenode.decommission.interval", 3 * 1000); // 3 second
// Format and clean out DataNode directories
if (format) {
if (data_dir.exists() && !FileUtil.fullyDelete(data_dir)) {
throw new IOException("Cannot remove data directory: " + data_dir);
}
NameNode.format(conf);
}
// Start the NameNode
String[] args = (operation == null ||
operation == StartupOption.FORMAT ||
operation == StartupOption.REGULAR) ?
new String[] {} : new String[] {"-"+operation.toString()};
nameNode = NameNode.createNameNode(args, conf);
// Start the DataNodes
startDataNodes(conf, numDataNodes, manageDfsDirs, operation, racks);
if (numDataNodes > 0) {
while (!isClusterUp()) {
try {
System.err.println("Waiting for the Mini HDFS Cluster to start...");
Thread.sleep(1000);
} catch (InterruptedException e) {
}
}
}
}
/**
* Modify the config and start up the DataNodes. The info port for
* DataNodes is guaranteed to use a free port.
*
* @param conf the base configuration to use in starting the DataNodes. This
* will be modified as necessary.
* @param numDataNodes Number of DataNodes to start; may be zero
* @param manageDfsDirs if true, the data directories for DataNodes will be
* created and dfs.data.dir will be set in the conf
* @param operation the operation with which to start the DataNodes. If null
* or StartupOption.FORMAT, then StartupOption.REGULAR will be used.
* @param racks array of strings indicating the rack that each DataNode is on
*
* @throws IllegalStateException if NameNode has been shutdown
*/
public void startDataNodes(Configuration conf, int numDataNodes,
boolean manageDfsDirs, StartupOption operation,
String[] racks) throws IOException {
if (nameNode == null) {
throw new IllegalStateException("NameNode is not running");
}
// Set up the right ports for the datanodes
conf.setInt("dfs.datanode.info.port", 0);
InetSocketAddress nnAddr = nameNode.getNameNodeAddress();
int nameNodePort = nnAddr.getPort();
conf.set("fs.default.name",
nnAddr.getHostName()+ ":" + Integer.toString(nameNodePort));
String[] args = (operation == null ||
operation == StartupOption.FORMAT ||
operation == StartupOption.REGULAR) ?
null : new String[] {"-"+operation.toString()};
String [] dnArgs = (operation == StartupOption.UPGRADE) ? null : args;
for (int i = 0; i < numDataNodes; i++) {
Configuration dnConf = new Configuration(conf);
if (manageDfsDirs) {
File dir1 = new File(data_dir, "data"+(2*i+1));
File dir2 = new File(data_dir, "data"+(2*i+2));
dir1.mkdirs();
dir2.mkdirs();
if (!dir1.isDirectory() || !dir2.isDirectory()) {
throw new IOException("Mkdirs failed to create directory for DataNode "
+ i + ": " + dir1 + " or " + dir2);
}
dnConf.set("dfs.data.dir", dir1.getPath() + "," + dir2.getPath());
}
if (racks != null && i < racks.length) {
dnConf.set("dfs.datanode.rack", racks[i]);
}
System.out.println("Starting DataNode " + i + " with dfs.data.dir: "
+ dnConf.get("dfs.data.dir"));
dataNodes.add(DataNode.createDataNode(dnArgs, dnConf));
}
}
/**
* If the NameNode is running, attempt to finalize a previous upgrade.
* When this method return, the NameNode should be finalized, but
* DataNodes may not be since that occurs asynchronously.
*
* @throws IllegalStateException if the Namenode is not running.
*/
public void finalizeCluster(Configuration conf) throws Exception {
if (nameNode == null) {
throw new IllegalStateException("Attempting to finalize "
+ "Namenode but it is not running");
}
new DFSAdmin().doMain(conf, new String[] {"-finalizeUpgrade"});
}
/**
* Gets the started NameNode. May be null.
*/
public NameNode getNameNode() {
return nameNode;
}
/**
* Gets a list of the started DataNodes. May be empty.
*/
public ArrayList<DataNode> getDataNodes() {
return dataNodes;
}
/**
* Gets the rpc port used by the NameNode, because the caller
* supplied port is not necessarily the actual port used.
*/
public int getNameNodePort() {
return nameNode.getNameNodeAddress().getPort();
}
/**
* Shut down the servers that are up.
*/
public void shutdown() {
System.out.println("Shutting down the Mini HDFS Cluster");
shutdownDataNodes();
if (nameNode != null) {
nameNode.stop();
nameNode.join();
nameNode = null;
}
}
/**
* Shutdown all DataNodes started by this class. The NameNode
* is left running so that new DataNodes may be started.
*/
public void shutdownDataNodes() {
for (int i = dataNodes.size()-1; i >= 0; i--) {
System.out.println("Shutting down DataNode " + i);
DataNode dn = dataNodes.remove(i);
dn.shutdown();
}
}
/**
* Returns true if the NameNode is running and is out of Safe Mode.
*/
public boolean isClusterUp() {
if (nameNode == null) {
return false;
}
try {
long[] sizes = nameNode.getStats();
boolean isUp = false;
synchronized (this) {
isUp = (!nameNode.isInSafeMode() && sizes[0] != 0);
}
return isUp;
} catch (IOException ie) {
return false;
}
}
/**
* Returns true if there is at least one DataNode running.
*/
public boolean isDataNodeUp() {
if (dataNodes == null || dataNodes.size() == 0) {
return false;
}
return true;
}
/**
* Get a client handle to the DFS cluster.
*/
public FileSystem getFileSystem() throws IOException {
return FileSystem.get(conf);
}
/**
* Get the directories where the namenode stores its state.
*/
public Collection<File> getNameDirs() {
return FSNamesystem.getNamespaceDirs(conf);
}
/**
* Wait until the cluster is active and running.
*/
public void waitActive() throws IOException {
InetSocketAddress addr = new InetSocketAddress("localhost",
getNameNodePort());
DFSClient client = new DFSClient(addr, conf);
//
// get initial state of datanodes
//
DatanodeInfo[] oldinfo = client.datanodeReport();
while (oldinfo.length != dataNodes.size()) {
try {
Thread.sleep(500);
} catch (Exception e) {
}
oldinfo = client.datanodeReport();
}
//
// wait till all datanodes send at least yet another heartbeat
//
int numdead = 0;
while (numdead > 0) {
try {
Thread.sleep(500);
} catch (Exception e) {
}
DatanodeInfo[] info = client.datanodeReport();
if (info.length != dataNodes.size()) {
continue;
}
numdead = 0;
for (int i = 0; i < info.length; i++) {
if (oldinfo[i].getLastUpdate() >= info[i].getLastUpdate()) {
numdead++;
}
}
}
client.close();
}
}