blob: ec87079bf938212be379654a265dfc9c746af360 [file] [log] [blame]
/**
* Licensed to the Apache Software Foundation (ASF) under one
* or more contributor license agreements. See the NOTICE file
* distributed with this work for additional information
* regarding copyright ownership. The ASF licenses this file
* to you under the Apache License, Version 2.0 (the
* "License"); you may not use this file except in compliance
* with the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/
package org.apache.hadoop.hdfs;
import java.io.File;
import java.io.IOException;
import java.net.InetSocketAddress;
import org.apache.commons.logging.Log;
import org.apache.commons.logging.LogFactory;
import org.apache.hadoop.conf.Configuration;
import org.apache.hadoop.fs.FileSystem;
import org.apache.hadoop.fs.FileUtil;
import org.apache.hadoop.fs.permission.FsPermission;
import org.apache.hadoop.hdfs.server.common.HdfsConstants.StartupOption;
import org.apache.hadoop.hdfs.server.datanode.DataNode;
import org.apache.hadoop.hdfs.server.datanode.SimulatedFSDataset;
import org.apache.hadoop.net.NetUtils;
import org.apache.hadoop.net.StaticMapping;
public class MiniDFSClusterWithNodeGroup extends MiniDFSCluster {
private static String[] NODE_GROUPS = null;
private static final Log LOG = LogFactory.getLog(MiniDFSClusterWithNodeGroup.class);
public MiniDFSClusterWithNodeGroup(int nameNodePort,
Configuration conf,
int numDataNodes,
boolean format,
boolean manageDfsDirs,
StartupOption operation,
String[] racks,
long[] simulatedCapacities) throws IOException {
super(nameNodePort, conf, numDataNodes, format, manageDfsDirs,
manageDfsDirs, operation, racks, null, simulatedCapacities);
}
public MiniDFSClusterWithNodeGroup(int nameNodePort,
Configuration conf,
int numDataNodes,
boolean format,
boolean manageDfsDirs,
StartupOption operation,
String[] racks,
String[] hosts,
long[] simulatedCapacities) throws IOException {
super(nameNodePort, conf, numDataNodes, format, manageDfsDirs,
manageDfsDirs, operation, racks, hosts, simulatedCapacities);
}
// NODE_GROUPS should be set before constructor being executed.
public static void setNodeGroups(String[] nodeGroups) {
NODE_GROUPS = nodeGroups;
}
public synchronized void startDataNodes(Configuration conf, int numDataNodes,
boolean manageDfsDirs, StartupOption operation,
String[] racks, String[] nodeGroups, String[] hosts,
long[] simulatedCapacities) throws IOException {
conf.set("slave.host.name", "127.0.0.1");
int curDatanodesNum = dataNodes.size();
// for mincluster's the default initialDelay for BRs is 0
if (conf.get(DFSConfigKeys.DFS_BLOCKREPORT_INITIAL_DELAY_KEY) == null) {
conf.setLong(DFSConfigKeys.DFS_BLOCKREPORT_INITIAL_DELAY_KEY, 0);
}
// If minicluster's name node is null assume that the conf has been
// set with the right address:port of the name node.
//
if (nameNode != null) { // set conf from the name node
InetSocketAddress nnAddr = nameNode.getNameNodeAddress();
int nameNodePort = nnAddr.getPort();
FileSystem.setDefaultUri(conf,
"hdfs://" +
nameNode.getNamenodeHostName(conf, nnAddr) +
":" + Integer.toString(nameNodePort));
}
if (racks != null && numDataNodes > racks.length ) {
throw new IllegalArgumentException( "The length of racks [" + racks.length
+ "] is less than the number of datanodes [" + numDataNodes + "].");
}
if (nodeGroups != null && numDataNodes > nodeGroups.length ) {
throw new IllegalArgumentException( "The length of nodeGroups [" + nodeGroups.length
+ "] is less than the number of datanodes [" + numDataNodes + "].");
}
if (hosts != null && numDataNodes > hosts.length ) {
throw new IllegalArgumentException( "The length of hosts [" + hosts.length
+ "] is less than the number of datanodes [" + numDataNodes + "].");
}
//Generate some hostnames if required
if (racks != null && hosts == null) {
hosts = new String[numDataNodes];
for (int i = curDatanodesNum; i < curDatanodesNum + numDataNodes; i++) {
hosts[i - curDatanodesNum] = "host" + i + ".foo.com";
}
}
if (simulatedCapacities != null
&& numDataNodes > simulatedCapacities.length) {
throw new IllegalArgumentException( "The length of simulatedCapacities ["
+ simulatedCapacities.length
+ "] is less than the number of datanodes [" + numDataNodes + "].");
}
// Set up the right ports for the datanodes
conf.set("dfs.datanode.address", "127.0.0.1:0");
conf.set("dfs.datanode.http.address", "127.0.0.1:0");
conf.set("dfs.datanode.ipc.address", "127.0.0.1:0");
String [] dnArgs = (operation == null ||
operation != StartupOption.ROLLBACK) ?
null : new String[] {operation.getName()};
for (int i = curDatanodesNum; i < curDatanodesNum+numDataNodes; i++) {
Configuration dnConf = new Configuration(conf);
if (manageDfsDirs) {
File dir1 = new File(data_dir, "data"+(2*i+1));
File dir2 = new File(data_dir, "data"+(2*i+2));
dir1.mkdirs();
dir2.mkdirs();
if (!dir1.isDirectory() || !dir2.isDirectory()) {
throw new IOException("Mkdirs failed to create directory for DataNode "
+ i + ": " + dir1 + " or " + dir2);
}
// Set default permissions on data dirs as not all platforms have the
// same defaults
FileUtil.setPermission(dir1, new FsPermission(
conf.get("dfs.datanode.data.dir.perm",
DataNode.DEFAULT_DATA_DIR_PERMISSION)));
FileUtil.setPermission(dir2, new FsPermission(
conf.get("dfs.datanode.data.dir.perm",
DataNode.DEFAULT_DATA_DIR_PERMISSION)));
dnConf.set(DataNode.DATA_DIR_KEY, dir1.getPath() + "," + dir2.getPath());
}
if (simulatedCapacities != null) {
dnConf.setBoolean("dfs.datanode.simulateddatastorage", true);
dnConf.setLong(SimulatedFSDataset.CONFIG_PROPERTY_CAPACITY,
simulatedCapacities[i-curDatanodesNum]);
}
LOG.info("Starting DataNode " + i + " with "
+ DFSConfigKeys.DFS_DATANODE_DATA_DIR_KEY + ": "
+ dnConf.get(DFSConfigKeys.DFS_DATANODE_DATA_DIR_KEY));
if (hosts != null) {
dnConf.set("slave.host.name", hosts[i - curDatanodesNum]);
LOG.info("Starting DataNode " + i + " with hostname set to: "
+ dnConf.get("slave.host.name"));
}
if (racks != null) {
String name = hosts[i - curDatanodesNum];
if (nodeGroups == null) {
LOG.info("Adding node with hostname : " + name + " to rack " +
racks[i-curDatanodesNum]);
StaticMapping.addNodeToRack(name,racks[i-curDatanodesNum]);
} else {
LOG.info("Adding node with hostname : " + name + " to serverGroup " +
nodeGroups[i-curDatanodesNum] + " and rack " +
racks[i-curDatanodesNum]);
StaticMapping.addNodeToRack(name,racks[i-curDatanodesNum] +
nodeGroups[i-curDatanodesNum]);
}
}
Configuration newconf = new Configuration(dnConf); // save config
if (hosts != null) {
NetUtils.addStaticResolution(hosts[i - curDatanodesNum], "localhost");
}
DataNode dn = DataNode.instantiateDataNode(dnArgs, dnConf);
if(dn == null)
throw new IOException("Cannot start DataNode in "
+ dnConf.get(DFSConfigKeys.DFS_DATANODE_DATA_DIR_KEY));
//since the HDFS does things based on IP:port, we need to add the mapping
//for IP:port to rackId
String ipAddr = dn.getSelfAddr().getAddress().getHostAddress();
if (racks != null) {
int port = dn.getSelfAddr().getPort();
if (nodeGroups == null) {
LOG.info("Adding node with IP:port : " + ipAddr + ":" + port +
" to rack " + racks[i-curDatanodesNum]);
StaticMapping.addNodeToRack(ipAddr + ":" + port,
racks[i-curDatanodesNum]);
} else {
LOG.info("Adding node with IP:port : " + ipAddr + ":" + port + " to nodeGroup " +
nodeGroups[i-curDatanodesNum] + " and rack " + racks[i-curDatanodesNum]);
StaticMapping.addNodeToRack(ipAddr + ":" + port, racks[i-curDatanodesNum] +
nodeGroups[i-curDatanodesNum]);
}
}
DataNode.runDatanodeDaemon(dn);
dataNodes.add(new DataNodeProperties(dn, newconf, dnArgs));
}
curDatanodesNum += numDataNodes;
this.numDataNodes += numDataNodes;
waitActive();
}
public synchronized void startDataNodes(Configuration conf, int numDataNodes,
boolean manageDfsDirs, StartupOption operation,
String[] racks, String[] nodeGroups, String[] hosts,
long[] simulatedCapacities,
boolean setupHostsFile) throws IOException {
startDataNodes(conf, numDataNodes, manageDfsDirs, operation, racks, nodeGroups,
hosts, simulatedCapacities);
}
// This is for initialize from parent class.
@Override
public synchronized void startDataNodes(Configuration conf, int numDataNodes,
boolean manageDfsDirs, StartupOption operation,
String[] racks, String[] hosts,
long[] simulatedCapacities) throws IOException {
startDataNodes(conf, numDataNodes, manageDfsDirs, operation, racks, NODE_GROUPS, hosts,
simulatedCapacities);
}
}