| /** |
| * Licensed to the Apache Software Foundation (ASF) under one |
| * or more contributor license agreements. See the NOTICE file |
| * distributed with this work for additional information |
| * regarding copyright ownership. The ASF licenses this file |
| * to you under the Apache License, Version 2.0 (the |
| * "License"); you may not use this file except in compliance |
| * with the License. You may obtain a copy of the License at |
| * |
| * http://www.apache.org/licenses/LICENSE-2.0 |
| * |
| * Unless required by applicable law or agreed to in writing, software |
| * distributed under the License is distributed on an "AS IS" BASIS, |
| * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. |
| * See the License for the specific language governing permissions and |
| * limitations under the License. |
| */ |
| package org.apache.hadoop.hdfs; |
| |
| import static org.apache.hadoop.hdfs.server.common.Util.fileAsURI; |
| |
| import java.io.File; |
| import java.io.FileWriter; |
| import java.io.IOException; |
| import java.io.PrintWriter; |
| import java.io.RandomAccessFile; |
| import java.net.InetSocketAddress; |
| import java.net.ServerSocket; |
| import java.net.URI; |
| import java.net.URISyntaxException; |
| import java.nio.channels.FileChannel; |
| import java.security.PrivilegedExceptionAction; |
| import java.util.ArrayList; |
| import java.util.Collection; |
| import java.util.Random; |
| |
| import org.apache.commons.logging.Log; |
| import org.apache.commons.logging.LogFactory; |
| import org.apache.hadoop.conf.Configuration; |
| import org.apache.hadoop.fs.CommonConfigurationKeys; |
| import org.apache.hadoop.fs.FileSystem; |
| import org.apache.hadoop.fs.FileUtil; |
| import org.apache.hadoop.hdfs.protocol.Block; |
| import org.apache.hadoop.hdfs.protocol.BlockListAsLongs; |
| import org.apache.hadoop.hdfs.protocol.ClientProtocol; |
| import org.apache.hadoop.hdfs.protocol.DatanodeInfo; |
| import org.apache.hadoop.hdfs.protocol.ExtendedBlock; |
| import org.apache.hadoop.hdfs.protocol.FSConstants.DatanodeReportType; |
| import org.apache.hadoop.hdfs.server.common.Storage; |
| import org.apache.hadoop.hdfs.server.common.HdfsConstants.StartupOption; |
| import org.apache.hadoop.hdfs.server.datanode.DataNode; |
| import org.apache.hadoop.hdfs.server.datanode.DataStorage; |
| import org.apache.hadoop.hdfs.server.datanode.FSDatasetInterface; |
| import org.apache.hadoop.hdfs.server.datanode.SimulatedFSDataset; |
| import org.apache.hadoop.hdfs.server.datanode.DataNodeTestUtils; |
| import org.apache.hadoop.hdfs.server.namenode.FSNamesystem; |
| import org.apache.hadoop.hdfs.server.namenode.NameNode; |
| import org.apache.hadoop.hdfs.server.namenode.NameNodeAdapter; |
| import org.apache.hadoop.hdfs.server.protocol.DatanodeProtocol; |
| import org.apache.hadoop.hdfs.server.protocol.DatanodeRegistration; |
| import org.apache.hadoop.hdfs.server.protocol.NamenodeProtocol; |
| import org.apache.hadoop.hdfs.server.protocol.NamenodeProtocols; |
| import org.apache.hadoop.hdfs.tools.DFSAdmin; |
| import org.apache.hadoop.metrics2.lib.DefaultMetricsSystem; |
| import org.apache.hadoop.net.DNSToSwitchMapping; |
| import org.apache.hadoop.net.NetUtils; |
| import org.apache.hadoop.net.StaticMapping; |
| import org.apache.hadoop.security.RefreshUserMappingsProtocol; |
| import org.apache.hadoop.security.UserGroupInformation; |
| import org.apache.hadoop.security.authorize.ProxyUsers; |
| import org.apache.hadoop.security.authorize.RefreshAuthorizationPolicyProtocol; |
| import org.apache.hadoop.test.GenericTestUtils; |
| import org.apache.hadoop.tools.GetUserMappingsProtocol; |
| import org.apache.hadoop.util.StringUtils; |
| import org.apache.hadoop.util.ToolRunner; |
| |
| /** |
| * This class creates a single-process DFS cluster for junit testing. |
| * The data directories for non-simulated DFS are under the testing directory. |
| * For simulated data nodes, no underlying fs storage is used. |
| */ |
| public class MiniDFSCluster { |
| |
| private static final String NAMESERVICE_ID_PREFIX = "nameserviceId"; |
| private static final Log LOG = LogFactory.getLog(MiniDFSCluster.class); |
| |
| static { DefaultMetricsSystem.setMiniClusterMode(true); } |
| |
| /** |
| * Class to construct instances of MiniDFSClusters with specific options. |
| */ |
| public static class Builder { |
| private int nameNodePort = 0; |
| private final Configuration conf; |
| private int numNameNodes = 1; |
| private int numDataNodes = 1; |
| private boolean format = true; |
| private boolean manageNameDfsDirs = true; |
| private boolean manageDataDfsDirs = true; |
| private StartupOption option = null; |
| private String[] racks = null; |
| private String [] hosts = null; |
| private long [] simulatedCapacities = null; |
| private String clusterId = null; |
| private boolean waitSafeMode = true; |
| private boolean setupHostsFile = false; |
| private boolean federation = false; |
| |
| public Builder(Configuration conf) { |
| this.conf = conf; |
| } |
| |
| /** |
| * default false - non federated cluster |
| * @param val |
| * @return Builder object |
| */ |
| public Builder federation (boolean val){ |
| this.federation = val; |
| return this; |
| } |
| /** |
| * Default: 0 |
| */ |
| public Builder nameNodePort(int val) { |
| this.nameNodePort = val; |
| return this; |
| } |
| |
| /** |
| * Default: 1 |
| */ |
| public Builder numNameNodes(int val) { |
| this.numNameNodes = val; |
| return this; |
| } |
| |
| /** |
| * Default: 1 |
| */ |
| public Builder numDataNodes(int val) { |
| this.numDataNodes = val; |
| return this; |
| } |
| |
| /** |
| * Default: true |
| */ |
| public Builder format(boolean val) { |
| this.format = val; |
| return this; |
| } |
| |
| /** |
| * Default: true |
| */ |
| public Builder manageNameDfsDirs(boolean val) { |
| this.manageNameDfsDirs = val; |
| return this; |
| } |
| |
| /** |
| * Default: true |
| */ |
| public Builder manageDataDfsDirs(boolean val) { |
| this.manageDataDfsDirs = val; |
| return this; |
| } |
| |
| /** |
| * Default: null |
| */ |
| public Builder startupOption(StartupOption val) { |
| this.option = val; |
| return this; |
| } |
| |
| /** |
| * Default: null |
| */ |
| public Builder racks(String[] val) { |
| this.racks = val; |
| return this; |
| } |
| |
| /** |
| * Default: null |
| */ |
| public Builder hosts(String[] val) { |
| this.hosts = val; |
| return this; |
| } |
| |
| /** |
| * Default: null |
| */ |
| public Builder simulatedCapacities(long[] val) { |
| this.simulatedCapacities = val; |
| return this; |
| } |
| |
| /** |
| * Default: true |
| */ |
| public Builder waitSafeMode(boolean val) { |
| this.waitSafeMode = val; |
| return this; |
| } |
| |
| /** |
| * Default: null |
| */ |
| public Builder clusterId(String cid) { |
| this.clusterId = cid; |
| return this; |
| } |
| |
| /** |
| * Default: false |
| * When true the hosts file/include file for the cluster is setup |
| */ |
| public Builder setupHostsFile(boolean val) { |
| this.setupHostsFile = val; |
| return this; |
| } |
| |
| /** |
| * Construct the actual MiniDFSCluster |
| */ |
| public MiniDFSCluster build() throws IOException { |
| return new MiniDFSCluster(this); |
| } |
| } |
| |
| /** |
| * Used by builder to create and return an instance of MiniDFSCluster |
| */ |
| private MiniDFSCluster(Builder builder) throws IOException { |
| LOG.info("starting cluster with " + builder.numNameNodes + " namenodes."); |
| nameNodes = new NameNodeInfo[builder.numNameNodes]; |
| // try to determine if in federation mode |
| if(builder.numNameNodes > 1) |
| builder.federation = true; |
| |
| initMiniDFSCluster(builder.nameNodePort, |
| builder.conf, |
| builder.numDataNodes, |
| builder.format, |
| builder.manageNameDfsDirs, |
| builder.manageDataDfsDirs, |
| builder.option, |
| builder.racks, |
| builder.hosts, |
| builder.simulatedCapacities, |
| builder.clusterId, |
| builder.waitSafeMode, |
| builder.setupHostsFile, |
| builder.federation); |
| } |
| |
| public class DataNodeProperties { |
| DataNode datanode; |
| Configuration conf; |
| String[] dnArgs; |
| |
| DataNodeProperties(DataNode node, Configuration conf, String[] args) { |
| this.datanode = node; |
| this.conf = conf; |
| this.dnArgs = args; |
| } |
| } |
| |
| private Configuration conf; |
| private NameNodeInfo[] nameNodes; |
| private int numDataNodes; |
| private ArrayList<DataNodeProperties> dataNodes = |
| new ArrayList<DataNodeProperties>(); |
| private File base_dir; |
| private File data_dir; |
| private boolean federation = false; |
| private boolean waitSafeMode = true; |
| |
| /** |
| * Stores the information related to a namenode in the cluster |
| */ |
| static class NameNodeInfo { |
| final NameNode nameNode; |
| final Configuration conf; |
| NameNodeInfo(NameNode nn, Configuration conf) { |
| this.nameNode = nn; |
| this.conf = conf; |
| } |
| } |
| |
| /** |
| * This null constructor is used only when wishing to start a data node cluster |
| * without a name node (ie when the name node is started elsewhere). |
| */ |
| public MiniDFSCluster() { |
| nameNodes = new NameNodeInfo[0]; // No namenode in the cluster |
| } |
| |
| /** |
| * Modify the config and start up the servers with the given operation. |
| * Servers will be started on free ports. |
| * <p> |
| * The caller must manage the creation of NameNode and DataNode directories |
| * and have already set dfs.namenode.name.dir and dfs.datanode.data.dir in the given conf. |
| * |
| * @param conf the base configuration to use in starting the servers. This |
| * will be modified as necessary. |
| * @param numDataNodes Number of DataNodes to start; may be zero |
| * @param nameNodeOperation the operation with which to start the servers. If null |
| * or StartupOption.FORMAT, then StartupOption.REGULAR will be used. |
| */ |
| @Deprecated // in 22 to be removed in 24. Use MiniDFSCluster.Builder instead |
| public MiniDFSCluster(Configuration conf, |
| int numDataNodes, |
| StartupOption nameNodeOperation) throws IOException { |
| this(0, conf, numDataNodes, false, false, false, nameNodeOperation, |
| null, null, null); |
| } |
| |
| /** |
| * Modify the config and start up the servers. The rpc and info ports for |
| * servers are guaranteed to use free ports. |
| * <p> |
| * NameNode and DataNode directory creation and configuration will be |
| * managed by this class. |
| * |
| * @param conf the base configuration to use in starting the servers. This |
| * will be modified as necessary. |
| * @param numDataNodes Number of DataNodes to start; may be zero |
| * @param format if true, format the NameNode and DataNodes before starting up |
| * @param racks array of strings indicating the rack that each DataNode is on |
| */ |
| @Deprecated // in 22 to be removed in 24. Use MiniDFSCluster.Builder instead |
| public MiniDFSCluster(Configuration conf, |
| int numDataNodes, |
| boolean format, |
| String[] racks) throws IOException { |
| this(0, conf, numDataNodes, format, true, true, null, racks, null, null); |
| } |
| |
| /** |
| * Modify the config and start up the servers. The rpc and info ports for |
| * servers are guaranteed to use free ports. |
| * <p> |
| * NameNode and DataNode directory creation and configuration will be |
| * managed by this class. |
| * |
| * @param conf the base configuration to use in starting the servers. This |
| * will be modified as necessary. |
| * @param numDataNodes Number of DataNodes to start; may be zero |
| * @param format if true, format the NameNode and DataNodes before starting up |
| * @param racks array of strings indicating the rack that each DataNode is on |
| * @param hosts array of strings indicating the hostname for each DataNode |
| */ |
| @Deprecated // in 22 to be removed in 24. Use MiniDFSCluster.Builder instead |
| public MiniDFSCluster(Configuration conf, |
| int numDataNodes, |
| boolean format, |
| String[] racks, String[] hosts) throws IOException { |
| this(0, conf, numDataNodes, format, true, true, null, racks, hosts, null); |
| } |
| |
| /** |
| * NOTE: if possible, the other constructors that don't have nameNode port |
| * parameter should be used as they will ensure that the servers use free ports. |
| * <p> |
| * Modify the config and start up the servers. |
| * |
| * @param nameNodePort suggestion for which rpc port to use. caller should |
| * use getNameNodePort() to get the actual port used. |
| * @param conf the base configuration to use in starting the servers. This |
| * will be modified as necessary. |
| * @param numDataNodes Number of DataNodes to start; may be zero |
| * @param format if true, format the NameNode and DataNodes before starting up |
| * @param manageDfsDirs if true, the data directories for servers will be |
| * created and dfs.namenode.name.dir and dfs.datanode.data.dir will be set in the conf |
| * @param operation the operation with which to start the servers. If null |
| * or StartupOption.FORMAT, then StartupOption.REGULAR will be used. |
| * @param racks array of strings indicating the rack that each DataNode is on |
| */ |
| @Deprecated // in 22 to be removed in 24. Use MiniDFSCluster.Builder instead |
| public MiniDFSCluster(int nameNodePort, |
| Configuration conf, |
| int numDataNodes, |
| boolean format, |
| boolean manageDfsDirs, |
| StartupOption operation, |
| String[] racks) throws IOException { |
| this(nameNodePort, conf, numDataNodes, format, manageDfsDirs, manageDfsDirs, |
| operation, racks, null, null); |
| } |
| |
| /** |
| * NOTE: if possible, the other constructors that don't have nameNode port |
| * parameter should be used as they will ensure that the servers use free ports. |
| * <p> |
| * Modify the config and start up the servers. |
| * |
| * @param nameNodePort suggestion for which rpc port to use. caller should |
| * use getNameNodePort() to get the actual port used. |
| * @param conf the base configuration to use in starting the servers. This |
| * will be modified as necessary. |
| * @param numDataNodes Number of DataNodes to start; may be zero |
| * @param format if true, format the NameNode and DataNodes before starting up |
| * @param manageDfsDirs if true, the data directories for servers will be |
| * created and dfs.namenode.name.dir and dfs.datanode.data.dir will be set in the conf |
| * @param operation the operation with which to start the servers. If null |
| * or StartupOption.FORMAT, then StartupOption.REGULAR will be used. |
| * @param racks array of strings indicating the rack that each DataNode is on |
| * @param simulatedCapacities array of capacities of the simulated data nodes |
| */ |
| @Deprecated // in 22 to be removed in 24. Use MiniDFSCluster.Builder instead |
| public MiniDFSCluster(int nameNodePort, |
| Configuration conf, |
| int numDataNodes, |
| boolean format, |
| boolean manageDfsDirs, |
| StartupOption operation, |
| String[] racks, |
| long[] simulatedCapacities) throws IOException { |
| this(nameNodePort, conf, numDataNodes, format, manageDfsDirs, manageDfsDirs, |
| operation, racks, null, simulatedCapacities); |
| } |
| |
| /** |
| * NOTE: if possible, the other constructors that don't have nameNode port |
| * parameter should be used as they will ensure that the servers use free ports. |
| * <p> |
| * Modify the config and start up the servers. |
| * |
| * @param nameNodePort suggestion for which rpc port to use. caller should |
| * use getNameNodePort() to get the actual port used. |
| * @param conf the base configuration to use in starting the servers. This |
| * will be modified as necessary. |
| * @param numDataNodes Number of DataNodes to start; may be zero |
| * @param format if true, format the NameNode and DataNodes before starting up |
| * @param manageNameDfsDirs if true, the data directories for servers will be |
| * created and dfs.namenode.name.dir and dfs.datanode.data.dir will be set in the conf |
| * @param manageDataDfsDirs if true, the data directories for datanodes will |
| * be created and dfs.datanode.data.dir set to same in the conf |
| * @param operation the operation with which to start the servers. If null |
| * or StartupOption.FORMAT, then StartupOption.REGULAR will be used. |
| * @param racks array of strings indicating the rack that each DataNode is on |
| * @param hosts array of strings indicating the hostnames of each DataNode |
| * @param simulatedCapacities array of capacities of the simulated data nodes |
| */ |
| @Deprecated // in 22 to be removed in 24. Use MiniDFSCluster.Builder instead |
| public MiniDFSCluster(int nameNodePort, |
| Configuration conf, |
| int numDataNodes, |
| boolean format, |
| boolean manageNameDfsDirs, |
| boolean manageDataDfsDirs, |
| StartupOption operation, |
| String[] racks, String hosts[], |
| long[] simulatedCapacities) throws IOException { |
| this.nameNodes = new NameNodeInfo[1]; // Single namenode in the cluster |
| initMiniDFSCluster(nameNodePort, conf, numDataNodes, format, |
| manageNameDfsDirs, manageDataDfsDirs, operation, racks, hosts, |
| simulatedCapacities, null, true, false, false); |
| } |
| |
| private void initMiniDFSCluster(int nameNodePort, Configuration conf, |
| int numDataNodes, boolean format, boolean manageNameDfsDirs, |
| boolean manageDataDfsDirs, StartupOption operation, String[] racks, |
| String[] hosts, long[] simulatedCapacities, String clusterId, |
| boolean waitSafeMode, boolean setupHostsFile, boolean federation) |
| throws IOException { |
| this.conf = conf; |
| base_dir = new File(getBaseDirectory()); |
| data_dir = new File(base_dir, "data"); |
| this.federation = federation; |
| this.waitSafeMode = waitSafeMode; |
| |
| // use alternate RPC engine if spec'd |
| String rpcEngineName = System.getProperty("hdfs.rpc.engine"); |
| if (rpcEngineName != null && !"".equals(rpcEngineName)) { |
| |
| System.out.println("HDFS using RPCEngine: "+rpcEngineName); |
| try { |
| Class<?> rpcEngine = conf.getClassByName(rpcEngineName); |
| setRpcEngine(conf, NamenodeProtocols.class, rpcEngine); |
| setRpcEngine(conf, NamenodeProtocol.class, rpcEngine); |
| setRpcEngine(conf, ClientProtocol.class, rpcEngine); |
| setRpcEngine(conf, DatanodeProtocol.class, rpcEngine); |
| setRpcEngine(conf, RefreshAuthorizationPolicyProtocol.class, rpcEngine); |
| setRpcEngine(conf, RefreshUserMappingsProtocol.class, rpcEngine); |
| setRpcEngine(conf, GetUserMappingsProtocol.class, rpcEngine); |
| } catch (ClassNotFoundException e) { |
| throw new RuntimeException(e); |
| } |
| |
| // disable service authorization, as it does not work with tunnelled RPC |
| conf.setBoolean(CommonConfigurationKeys.HADOOP_SECURITY_AUTHORIZATION, |
| false); |
| } |
| |
| int replication = conf.getInt(DFSConfigKeys.DFS_REPLICATION_KEY, 3); |
| conf.setInt(DFSConfigKeys.DFS_REPLICATION_KEY, Math.min(replication, numDataNodes)); |
| conf.setInt(DFSConfigKeys.DFS_NAMENODE_SAFEMODE_EXTENSION_KEY, 0); |
| conf.setInt(DFSConfigKeys.DFS_NAMENODE_DECOMMISSION_INTERVAL_KEY, 3); // 3 second |
| conf.setClass(DFSConfigKeys.NET_TOPOLOGY_NODE_SWITCH_MAPPING_IMPL_KEY, |
| StaticMapping.class, DNSToSwitchMapping.class); |
| |
| Collection<String> nameserviceIds = DFSUtil.getNameServiceIds(conf); |
| if(nameserviceIds.size() > 1) |
| federation = true; |
| |
| if (!federation) { |
| conf.set(DFSConfigKeys.FS_DEFAULT_NAME_KEY, "127.0.0.1:" + nameNodePort); |
| conf.set(DFSConfigKeys.DFS_NAMENODE_HTTP_ADDRESS_KEY, "127.0.0.1:0"); |
| NameNode nn = createNameNode(0, conf, numDataNodes, manageNameDfsDirs, |
| format, operation, clusterId); |
| nameNodes[0] = new NameNodeInfo(nn, conf); |
| FileSystem.setDefaultUri(conf, getURI(0)); |
| } else { |
| if (nameserviceIds.isEmpty()) { |
| for (int i = 0; i < nameNodes.length; i++) { |
| nameserviceIds.add(NAMESERVICE_ID_PREFIX + i); |
| } |
| } |
| initFederationConf(conf, nameserviceIds, numDataNodes, nameNodePort); |
| createFederationNamenodes(conf, nameserviceIds, manageNameDfsDirs, format, |
| operation, clusterId); |
| } |
| |
| if (format) { |
| if (data_dir.exists() && !FileUtil.fullyDelete(data_dir)) { |
| throw new IOException("Cannot remove data directory: " + data_dir); |
| } |
| } |
| |
| // Start the DataNodes |
| startDataNodes(conf, numDataNodes, manageDataDfsDirs, operation, racks, |
| hosts, simulatedCapacities, setupHostsFile); |
| waitClusterUp(); |
| //make sure ProxyUsers uses the latest conf |
| ProxyUsers.refreshSuperUserGroupsConfiguration(conf); |
| } |
| |
| /** Initialize configuration for federated cluster */ |
| private static void initFederationConf(Configuration conf, |
| Collection<String> nameserviceIds, int numDataNodes, int nnPort) { |
| String nameserviceIdList = ""; |
| for (String nameserviceId : nameserviceIds) { |
| // Create comma separated list of nameserviceIds |
| if (nameserviceIdList.length() > 0) { |
| nameserviceIdList += ","; |
| } |
| nameserviceIdList += nameserviceId; |
| initFederatedNamenodeAddress(conf, nameserviceId, nnPort); |
| nnPort = nnPort == 0 ? 0 : nnPort + 2; |
| } |
| conf.set(DFSConfigKeys.DFS_FEDERATION_NAMESERVICES, nameserviceIdList); |
| } |
| |
| /* For federated namenode initialize the address:port */ |
| private static void initFederatedNamenodeAddress(Configuration conf, |
| String nameserviceId, int nnPort) { |
| // Set nameserviceId specific key |
| String key = DFSUtil.getNameServiceIdKey( |
| DFSConfigKeys.DFS_NAMENODE_HTTP_ADDRESS_KEY, nameserviceId); |
| conf.set(key, "127.0.0.1:0"); |
| |
| key = DFSUtil.getNameServiceIdKey( |
| DFSConfigKeys.DFS_NAMENODE_RPC_ADDRESS_KEY, nameserviceId); |
| conf.set(key, "127.0.0.1:" + nnPort); |
| } |
| |
| private void createFederationNamenodes(Configuration conf, |
| Collection<String> nameserviceIds, boolean manageNameDfsDirs, |
| boolean format, StartupOption operation, String clusterId) |
| throws IOException { |
| // Create namenodes in the cluster |
| int nnCounter = 0; |
| for (String nameserviceId : nameserviceIds) { |
| createFederatedNameNode(nnCounter++, conf, numDataNodes, manageNameDfsDirs, |
| format, operation, clusterId, nameserviceId); |
| } |
| } |
| |
| private NameNode createNameNode(int nnIndex, Configuration conf, |
| int numDataNodes, boolean manageNameDfsDirs, boolean format, |
| StartupOption operation, String clusterId) |
| throws IOException { |
| if (manageNameDfsDirs) { |
| conf.set(DFSConfigKeys.DFS_NAMENODE_NAME_DIR_KEY, |
| fileAsURI(new File(base_dir, "name" + (2*nnIndex + 1)))+","+ |
| fileAsURI(new File(base_dir, "name" + (2*nnIndex + 2)))); |
| conf.set(DFSConfigKeys.DFS_NAMENODE_CHECKPOINT_DIR_KEY, |
| fileAsURI(new File(base_dir, "namesecondary" + (2*nnIndex + 1)))+","+ |
| fileAsURI(new File(base_dir, "namesecondary" + (2*nnIndex + 2)))); |
| } |
| |
| // Format and clean out DataNode directories |
| if (format) { |
| DFSTestUtil.formatNameNode(conf); |
| } |
| if (operation == StartupOption.UPGRADE){ |
| operation.setClusterId(clusterId); |
| } |
| |
| // Start the NameNode |
| String[] args = (operation == null || |
| operation == StartupOption.FORMAT || |
| operation == StartupOption.REGULAR) ? |
| new String[] {} : new String[] {operation.getName()}; |
| return NameNode.createNameNode(args, conf); |
| } |
| |
| private void createFederatedNameNode(int nnIndex, Configuration conf, |
| int numDataNodes, boolean manageNameDfsDirs, boolean format, |
| StartupOption operation, String clusterId, String nameserviceId) |
| throws IOException { |
| conf.set(DFSConfigKeys.DFS_FEDERATION_NAMESERVICE_ID, nameserviceId); |
| NameNode nn = createNameNode(nnIndex, conf, numDataNodes, manageNameDfsDirs, |
| format, operation, clusterId); |
| conf.set(DFSUtil.getNameServiceIdKey( |
| DFSConfigKeys.DFS_NAMENODE_RPC_ADDRESS_KEY, nameserviceId), NameNode |
| .getHostPortString(nn.getNameNodeAddress())); |
| conf.set(DFSUtil.getNameServiceIdKey( |
| DFSConfigKeys.DFS_NAMENODE_HTTP_ADDRESS_KEY, nameserviceId), NameNode |
| .getHostPortString(nn.getHttpAddress())); |
| DFSUtil.setGenericConf(conf, nameserviceId, |
| DFSConfigKeys.DFS_NAMENODE_HTTP_ADDRESS_KEY); |
| nameNodes[nnIndex] = new NameNodeInfo(nn, new Configuration(conf)); |
| } |
| |
| private void setRpcEngine(Configuration conf, Class<?> protocol, Class<?> engine) { |
| conf.setClass("rpc.engine."+protocol.getName(), engine, Object.class); |
| } |
| |
| /** |
| * @return URI of the namenode from a single namenode MiniDFSCluster |
| */ |
| public URI getURI() { |
| checkSingleNameNode(); |
| return getURI(0); |
| } |
| |
| /** |
| * @return URI of the given namenode in MiniDFSCluster |
| */ |
| public URI getURI(int nnIndex) { |
| InetSocketAddress addr = nameNodes[nnIndex].nameNode.getNameNodeAddress(); |
| String hostPort = NameNode.getHostPortString(addr); |
| URI uri = null; |
| try { |
| uri = new URI("hdfs://" + hostPort); |
| } catch (URISyntaxException e) { |
| NameNode.LOG.warn("unexpected URISyntaxException: " + e ); |
| } |
| return uri; |
| } |
| |
| /** |
| * @return Configuration of for the given namenode |
| */ |
| public Configuration getConfiguration(int nnIndex) { |
| return nameNodes[nnIndex].conf; |
| } |
| |
| /** |
| * wait for the given namenode to get out of safemode. |
| */ |
| public void waitNameNodeUp(int nnIndex) { |
| while (!isNameNodeUp(nnIndex)) { |
| try { |
| LOG.warn("Waiting for namenode at " + nnIndex + " to start..."); |
| Thread.sleep(1000); |
| } catch (InterruptedException e) { |
| } |
| } |
| } |
| |
| /** |
| * wait for the cluster to get out of safemode. |
| */ |
| public void waitClusterUp() { |
| if (numDataNodes > 0) { |
| while (!isClusterUp()) { |
| try { |
| LOG.warn("Waiting for the Mini HDFS Cluster to start..."); |
| Thread.sleep(1000); |
| } catch (InterruptedException e) { |
| } |
| } |
| } |
| } |
| |
| /** |
| * Modify the config and start up additional DataNodes. The info port for |
| * DataNodes is guaranteed to use a free port. |
| * |
| * Data nodes can run with the name node in the mini cluster or |
| * a real name node. For example, running with a real name node is useful |
| * when running simulated data nodes with a real name node. |
| * If minicluster's name node is null assume that the conf has been |
| * set with the right address:port of the name node. |
| * |
| * @param conf the base configuration to use in starting the DataNodes. This |
| * will be modified as necessary. |
| * @param numDataNodes Number of DataNodes to start; may be zero |
| * @param manageDfsDirs if true, the data directories for DataNodes will be |
| * created and dfs.datanode.data.dir will be set in the conf |
| * @param operation the operation with which to start the DataNodes. If null |
| * or StartupOption.FORMAT, then StartupOption.REGULAR will be used. |
| * @param racks array of strings indicating the rack that each DataNode is on |
| * @param hosts array of strings indicating the hostnames for each DataNode |
| * @param simulatedCapacities array of capacities of the simulated data nodes |
| * |
| * @throws IllegalStateException if NameNode has been shutdown |
| */ |
| public synchronized void startDataNodes(Configuration conf, int numDataNodes, |
| boolean manageDfsDirs, StartupOption operation, |
| String[] racks, String[] hosts, |
| long[] simulatedCapacities) throws IOException { |
| startDataNodes(conf, numDataNodes, manageDfsDirs, operation, racks, |
| hosts, simulatedCapacities, false); |
| } |
| |
| /** |
| * Modify the config and start up additional DataNodes. The info port for |
| * DataNodes is guaranteed to use a free port. |
| * |
| * Data nodes can run with the name node in the mini cluster or |
| * a real name node. For example, running with a real name node is useful |
| * when running simulated data nodes with a real name node. |
| * If minicluster's name node is null assume that the conf has been |
| * set with the right address:port of the name node. |
| * |
| * @param conf the base configuration to use in starting the DataNodes. This |
| * will be modified as necessary. |
| * @param numDataNodes Number of DataNodes to start; may be zero |
| * @param manageDfsDirs if true, the data directories for DataNodes will be |
| * created and dfs.datanode.data.dir will be set in the conf |
| * @param operation the operation with which to start the DataNodes. If null |
| * or StartupOption.FORMAT, then StartupOption.REGULAR will be used. |
| * @param racks array of strings indicating the rack that each DataNode is on |
| * @param hosts array of strings indicating the hostnames for each DataNode |
| * @param simulatedCapacities array of capacities of the simulated data nodes |
| * @param setupHostsFile add new nodes to dfs hosts files |
| * |
| * @throws IllegalStateException if NameNode has been shutdown |
| */ |
| public synchronized void startDataNodes(Configuration conf, int numDataNodes, |
| boolean manageDfsDirs, StartupOption operation, |
| String[] racks, String[] hosts, |
| long[] simulatedCapacities, |
| boolean setupHostsFile) throws IOException { |
| |
| int curDatanodesNum = dataNodes.size(); |
| // for mincluster's the default initialDelay for BRs is 0 |
| if (conf.get(DFSConfigKeys.DFS_BLOCKREPORT_INITIAL_DELAY_KEY) == null) { |
| conf.setLong(DFSConfigKeys.DFS_BLOCKREPORT_INITIAL_DELAY_KEY, 0); |
| } |
| // If minicluster's name node is null assume that the conf has been |
| // set with the right address:port of the name node. |
| // |
| if (racks != null && numDataNodes > racks.length ) { |
| throw new IllegalArgumentException( "The length of racks [" + racks.length |
| + "] is less than the number of datanodes [" + numDataNodes + "]."); |
| } |
| if (hosts != null && numDataNodes > hosts.length ) { |
| throw new IllegalArgumentException( "The length of hosts [" + hosts.length |
| + "] is less than the number of datanodes [" + numDataNodes + "]."); |
| } |
| //Generate some hostnames if required |
| if (racks != null && hosts == null) { |
| hosts = new String[numDataNodes]; |
| for (int i = curDatanodesNum; i < curDatanodesNum + numDataNodes; i++) { |
| hosts[i - curDatanodesNum] = "host" + i + ".foo.com"; |
| } |
| } |
| |
| if (simulatedCapacities != null |
| && numDataNodes > simulatedCapacities.length) { |
| throw new IllegalArgumentException( "The length of simulatedCapacities [" |
| + simulatedCapacities.length |
| + "] is less than the number of datanodes [" + numDataNodes + "]."); |
| } |
| |
| String [] dnArgs = (operation == null || |
| operation != StartupOption.ROLLBACK) ? |
| null : new String[] {operation.getName()}; |
| |
| |
| for (int i = curDatanodesNum; i < curDatanodesNum+numDataNodes; i++) { |
| Configuration dnConf = new HdfsConfiguration(conf); |
| // Set up datanode address |
| setupDatanodeAddress(dnConf, setupHostsFile); |
| if (manageDfsDirs) { |
| File dir1 = getStorageDir(i, 0); |
| File dir2 = getStorageDir(i, 1); |
| dir1.mkdirs(); |
| dir2.mkdirs(); |
| if (!dir1.isDirectory() || !dir2.isDirectory()) { |
| throw new IOException("Mkdirs failed to create directory for DataNode " |
| + i + ": " + dir1 + " or " + dir2); |
| } |
| String dirs = fileAsURI(dir1) + "," + fileAsURI(dir2); |
| dnConf.set(DFSConfigKeys.DFS_DATANODE_DATA_DIR_KEY, dirs); |
| conf.set(DFSConfigKeys.DFS_DATANODE_DATA_DIR_KEY, dirs); |
| } |
| if (simulatedCapacities != null) { |
| dnConf.setBoolean(SimulatedFSDataset.CONFIG_PROPERTY_SIMULATED, true); |
| dnConf.setLong(SimulatedFSDataset.CONFIG_PROPERTY_CAPACITY, |
| simulatedCapacities[i-curDatanodesNum]); |
| } |
| System.out.println("Starting DataNode " + i + " with " |
| + DFSConfigKeys.DFS_DATANODE_DATA_DIR_KEY + ": " |
| + dnConf.get(DFSConfigKeys.DFS_DATANODE_DATA_DIR_KEY)); |
| if (hosts != null) { |
| dnConf.set(DFSConfigKeys.DFS_DATANODE_HOST_NAME_KEY, hosts[i - curDatanodesNum]); |
| System.out.println("Starting DataNode " + i + " with hostname set to: " |
| + dnConf.get(DFSConfigKeys.DFS_DATANODE_HOST_NAME_KEY)); |
| } |
| if (racks != null) { |
| String name = hosts[i - curDatanodesNum]; |
| System.out.println("Adding node with hostname : " + name + " to rack "+ |
| racks[i-curDatanodesNum]); |
| StaticMapping.addNodeToRack(name, |
| racks[i-curDatanodesNum]); |
| } |
| Configuration newconf = new HdfsConfiguration(dnConf); // save config |
| if (hosts != null) { |
| NetUtils.addStaticResolution(hosts[i - curDatanodesNum], "localhost"); |
| } |
| DataNode dn = DataNode.instantiateDataNode(dnArgs, dnConf); |
| if(dn == null) |
| throw new IOException("Cannot start DataNode in " |
| + dnConf.get(DFSConfigKeys.DFS_DATANODE_DATA_DIR_KEY)); |
| //since the HDFS does things based on IP:port, we need to add the mapping |
| //for IP:port to rackId |
| String ipAddr = dn.getSelfAddr().getAddress().getHostAddress(); |
| if (racks != null) { |
| int port = dn.getSelfAddr().getPort(); |
| System.out.println("Adding node with IP:port : " + ipAddr + ":" + port+ |
| " to rack " + racks[i-curDatanodesNum]); |
| StaticMapping.addNodeToRack(ipAddr + ":" + port, |
| racks[i-curDatanodesNum]); |
| } |
| dn.runDatanodeDaemon(); |
| dataNodes.add(new DataNodeProperties(dn, newconf, dnArgs)); |
| } |
| curDatanodesNum += numDataNodes; |
| this.numDataNodes += numDataNodes; |
| waitActive(); |
| } |
| |
| |
| |
| /** |
| * Modify the config and start up the DataNodes. The info port for |
| * DataNodes is guaranteed to use a free port. |
| * |
| * @param conf the base configuration to use in starting the DataNodes. This |
| * will be modified as necessary. |
| * @param numDataNodes Number of DataNodes to start; may be zero |
| * @param manageDfsDirs if true, the data directories for DataNodes will be |
| * created and dfs.datanode.data.dir will be set in the conf |
| * @param operation the operation with which to start the DataNodes. If null |
| * or StartupOption.FORMAT, then StartupOption.REGULAR will be used. |
| * @param racks array of strings indicating the rack that each DataNode is on |
| * |
| * @throws IllegalStateException if NameNode has been shutdown |
| */ |
| |
| public void startDataNodes(Configuration conf, int numDataNodes, |
| boolean manageDfsDirs, StartupOption operation, |
| String[] racks |
| ) throws IOException { |
| startDataNodes(conf, numDataNodes, manageDfsDirs, operation, racks, null, |
| null, false); |
| } |
| |
| /** |
| * Modify the config and start up additional DataNodes. The info port for |
| * DataNodes is guaranteed to use a free port. |
| * |
| * Data nodes can run with the name node in the mini cluster or |
| * a real name node. For example, running with a real name node is useful |
| * when running simulated data nodes with a real name node. |
| * If minicluster's name node is null assume that the conf has been |
| * set with the right address:port of the name node. |
| * |
| * @param conf the base configuration to use in starting the DataNodes. This |
| * will be modified as necessary. |
| * @param numDataNodes Number of DataNodes to start; may be zero |
| * @param manageDfsDirs if true, the data directories for DataNodes will be |
| * created and dfs.datanode.data.dir will be set in the conf |
| * @param operation the operation with which to start the DataNodes. If null |
| * or StartupOption.FORMAT, then StartupOption.REGULAR will be used. |
| * @param racks array of strings indicating the rack that each DataNode is on |
| * @param simulatedCapacities array of capacities of the simulated data nodes |
| * |
| * @throws IllegalStateException if NameNode has been shutdown |
| */ |
| public void startDataNodes(Configuration conf, int numDataNodes, |
| boolean manageDfsDirs, StartupOption operation, |
| String[] racks, |
| long[] simulatedCapacities) throws IOException { |
| startDataNodes(conf, numDataNodes, manageDfsDirs, operation, racks, null, |
| simulatedCapacities, false); |
| |
| } |
| |
| /** |
| * Finalize the namenode. Block pools corresponding to the namenode are |
| * finalized on the datanode. |
| */ |
| private void finalizeNamenode(NameNode nn, Configuration conf) throws Exception { |
| if (nn == null) { |
| throw new IllegalStateException("Attempting to finalize " |
| + "Namenode but it is not running"); |
| } |
| ToolRunner.run(new DFSAdmin(conf), new String[] {"-finalizeUpgrade"}); |
| } |
| |
| /** |
| * Finalize cluster for the namenode at the given index |
| * @see MiniDFSCluster#finalizeCluster(Configuration) |
| * @param nnIndex |
| * @param conf |
| * @throws Exception |
| */ |
| public void finalizeCluster(int nnIndex, Configuration conf) throws Exception { |
| finalizeNamenode(nameNodes[nnIndex].nameNode, nameNodes[nnIndex].conf); |
| } |
| |
| /** |
| * If the NameNode is running, attempt to finalize a previous upgrade. |
| * When this method return, the NameNode should be finalized, but |
| * DataNodes may not be since that occurs asynchronously. |
| * |
| * @throws IllegalStateException if the Namenode is not running. |
| */ |
| public void finalizeCluster(Configuration conf) throws Exception { |
| for (NameNodeInfo nnInfo : nameNodes) { |
| if (nnInfo == null) { |
| throw new IllegalStateException("Attempting to finalize " |
| + "Namenode but it is not running"); |
| } |
| finalizeNamenode(nnInfo.nameNode, nnInfo.conf); |
| } |
| } |
| |
| public int getNumNameNodes() { |
| return nameNodes.length; |
| } |
| |
| /** |
| * Gets the started NameNode. May be null. |
| */ |
| public NameNode getNameNode() { |
| checkSingleNameNode(); |
| return getNameNode(0); |
| } |
| |
| /** |
| * Gets the NameNode for the index. May be null. |
| */ |
| public NameNode getNameNode(int nnIndex) { |
| return nameNodes[nnIndex].nameNode; |
| } |
| |
| /** |
| * Return the {@link FSNamesystem} object. |
| * @return {@link FSNamesystem} object. |
| */ |
| public FSNamesystem getNamesystem() { |
| checkSingleNameNode(); |
| return NameNodeAdapter.getNamesystem(nameNodes[0].nameNode); |
| } |
| |
| public FSNamesystem getNamesystem(int nnIndex) { |
| return NameNodeAdapter.getNamesystem(nameNodes[nnIndex].nameNode); |
| } |
| |
| /** |
| * Gets a list of the started DataNodes. May be empty. |
| */ |
| public ArrayList<DataNode> getDataNodes() { |
| ArrayList<DataNode> list = new ArrayList<DataNode>(); |
| for (int i = 0; i < dataNodes.size(); i++) { |
| DataNode node = dataNodes.get(i).datanode; |
| list.add(node); |
| } |
| return list; |
| } |
| |
| /** @return the datanode having the ipc server listen port */ |
| public DataNode getDataNode(int ipcPort) { |
| for(DataNode dn : getDataNodes()) { |
| if (dn.ipcServer.getListenerAddress().getPort() == ipcPort) { |
| return dn; |
| } |
| } |
| return null; |
| } |
| |
| /** |
| * Gets the rpc port used by the NameNode, because the caller |
| * supplied port is not necessarily the actual port used. |
| * Assumption: cluster has a single namenode |
| */ |
| public int getNameNodePort() { |
| checkSingleNameNode(); |
| return getNameNodePort(0); |
| } |
| |
| /** |
| * Gets the rpc port used by the NameNode at the given index, because the |
| * caller supplied port is not necessarily the actual port used. |
| */ |
| public int getNameNodePort(int nnIndex) { |
| return nameNodes[nnIndex].nameNode.getNameNodeAddress().getPort(); |
| } |
| |
| /** |
| * Shutdown all the nodes in the cluster. |
| */ |
| public void shutdown() { |
| System.out.println("Shutting down the Mini HDFS Cluster"); |
| shutdownDataNodes(); |
| for (NameNodeInfo nnInfo : nameNodes) { |
| NameNode nameNode = nnInfo.nameNode; |
| if (nameNode != null) { |
| nameNode.stop(); |
| nameNode.join(); |
| nameNode = null; |
| } |
| } |
| } |
| |
| /** |
| * Shutdown all DataNodes started by this class. The NameNode |
| * is left running so that new DataNodes may be started. |
| */ |
| public void shutdownDataNodes() { |
| for (int i = dataNodes.size()-1; i >= 0; i--) { |
| LOG.info("Shutting down DataNode " + i); |
| DataNode dn = dataNodes.remove(i).datanode; |
| dn.shutdown(); |
| numDataNodes--; |
| } |
| } |
| |
| /** |
| * Shutdown all the namenodes. |
| */ |
| public synchronized void shutdownNameNodes() { |
| for (int i = 0; i < nameNodes.length; i++) { |
| shutdownNameNode(i); |
| } |
| } |
| |
| /** |
| * Shutdown the namenode at a given index. |
| */ |
| public synchronized void shutdownNameNode(int nnIndex) { |
| NameNode nn = nameNodes[nnIndex].nameNode; |
| if (nn != null) { |
| System.out.println("Shutting down the namenode"); |
| nn.stop(); |
| nn.join(); |
| Configuration conf = nameNodes[nnIndex].conf; |
| nameNodes[nnIndex] = new NameNodeInfo(null, conf); |
| } |
| } |
| |
| /** |
| * Restart namenode at a given index. |
| */ |
| public synchronized void restartNameNode(int nnIndex) throws IOException { |
| Configuration conf = nameNodes[nnIndex].conf; |
| shutdownNameNode(nnIndex); |
| NameNode nn = NameNode.createNameNode(new String[] {}, conf); |
| nameNodes[nnIndex] = new NameNodeInfo(nn, conf); |
| waitClusterUp(); |
| System.out.println("Restarted the namenode"); |
| int failedCount = 0; |
| while (true) { |
| try { |
| waitActive(); |
| break; |
| } catch (IOException e) { |
| failedCount++; |
| // Cached RPC connection to namenode, if any, is expected to fail once |
| if (failedCount > 1) { |
| System.out.println("Tried waitActive() " + failedCount |
| + " time(s) and failed, giving up. " |
| + StringUtils.stringifyException(e)); |
| throw e; |
| } |
| } |
| } |
| System.out.println("Cluster is active"); |
| } |
| |
| /** |
| * Return the contents of the given block on the given datanode. |
| * |
| * @param block block to be corrupted |
| * @throws IOException on error accessing the file for the given block |
| */ |
| public int corruptBlockOnDataNodes(ExtendedBlock block) throws IOException{ |
| int blocksCorrupted = 0; |
| File[] blockFiles = getAllBlockFiles(block); |
| for (File f : blockFiles) { |
| if (corruptBlock(f)) { |
| blocksCorrupted++; |
| } |
| } |
| return blocksCorrupted; |
| } |
| |
| public String readBlockOnDataNode(int i, ExtendedBlock block) |
| throws IOException { |
| assert (i >= 0 && i < dataNodes.size()) : "Invalid datanode "+i; |
| File blockFile = getBlockFile(i, block); |
| if (blockFile != null && blockFile.exists()) { |
| return DFSTestUtil.readFile(blockFile); |
| } |
| return null; |
| } |
| |
| /** |
| * Corrupt a block on a particular datanode. |
| * |
| * @param i index of the datanode |
| * @param blk name of the block |
| * @throws IOException on error accessing the given block or if |
| * the contents of the block (on the same datanode) differ. |
| * @return true if a replica was corrupted, false otherwise |
| * Types: delete, write bad data, truncate |
| */ |
| public static boolean corruptReplica(int i, ExtendedBlock blk) |
| throws IOException { |
| File blockFile = getBlockFile(i, blk); |
| return corruptBlock(blockFile); |
| } |
| |
| /* |
| * Corrupt a block on a particular datanode |
| */ |
| public static boolean corruptBlock(File blockFile) throws IOException { |
| if (blockFile == null || !blockFile.exists()) { |
| return false; |
| } |
| // Corrupt replica by writing random bytes into replica |
| Random random = new Random(); |
| RandomAccessFile raFile = new RandomAccessFile(blockFile, "rw"); |
| FileChannel channel = raFile.getChannel(); |
| String badString = "BADBAD"; |
| int rand = random.nextInt((int)channel.size()/2); |
| raFile.seek(rand); |
| raFile.write(badString.getBytes()); |
| raFile.close(); |
| LOG.warn("Corrupting the block " + blockFile); |
| return true; |
| } |
| |
| /* |
| * Shutdown a particular datanode |
| */ |
| public synchronized DataNodeProperties stopDataNode(int i) { |
| if (i < 0 || i >= dataNodes.size()) { |
| return null; |
| } |
| DataNodeProperties dnprop = dataNodes.remove(i); |
| DataNode dn = dnprop.datanode; |
| System.out.println("MiniDFSCluster Stopping DataNode " + |
| dn.getMachineName() + |
| " from a total of " + (dataNodes.size() + 1) + |
| " datanodes."); |
| dn.shutdown(); |
| numDataNodes--; |
| return dnprop; |
| } |
| |
| /* |
| * Shutdown a datanode by name. |
| */ |
| public synchronized DataNodeProperties stopDataNode(String name) { |
| int i; |
| for (i = 0; i < dataNodes.size(); i++) { |
| DataNode dn = dataNodes.get(i).datanode; |
| // get BP registration |
| DatanodeRegistration dnR = |
| DataNodeTestUtils.getDNRegistrationByMachineName(dn, name); |
| LOG.info("for name=" + name + " found bp=" + dnR + |
| "; with dnMn=" + dn.getMachineName()); |
| if(dnR != null) { |
| break; |
| } |
| } |
| return stopDataNode(i); |
| } |
| |
| /** |
| * Restart a datanode |
| * @param dnprop datanode's property |
| * @return true if restarting is successful |
| * @throws IOException |
| */ |
| public boolean restartDataNode(DataNodeProperties dnprop) throws IOException { |
| return restartDataNode(dnprop, false); |
| } |
| |
| /** |
| * Restart a datanode, on the same port if requested |
| * @param dnprop the datanode to restart |
| * @param keepPort whether to use the same port |
| * @return true if restarting is successful |
| * @throws IOException |
| */ |
| public synchronized boolean restartDataNode(DataNodeProperties dnprop, |
| boolean keepPort) throws IOException { |
| Configuration conf = dnprop.conf; |
| String[] args = dnprop.dnArgs; |
| Configuration newconf = new HdfsConfiguration(conf); // save cloned config |
| if (keepPort) { |
| InetSocketAddress addr = dnprop.datanode.getSelfAddr(); |
| conf.set("dfs.datanode.address", addr.getAddress().getHostAddress() + ":" |
| + addr.getPort()); |
| } |
| dataNodes.add(new DataNodeProperties(DataNode.createDataNode(args, conf), |
| newconf, args)); |
| numDataNodes++; |
| return true; |
| } |
| |
| /* |
| * Restart a particular datanode, use newly assigned port |
| */ |
| public boolean restartDataNode(int i) throws IOException { |
| return restartDataNode(i, false); |
| } |
| |
| /* |
| * Restart a particular datanode, on the same port if keepPort is true |
| */ |
| public synchronized boolean restartDataNode(int i, boolean keepPort) |
| throws IOException { |
| DataNodeProperties dnprop = stopDataNode(i); |
| if (dnprop == null) { |
| return false; |
| } else { |
| return restartDataNode(dnprop, keepPort); |
| } |
| } |
| |
| /* |
| * Restart all datanodes, on the same ports if keepPort is true |
| */ |
| public synchronized boolean restartDataNodes(boolean keepPort) |
| throws IOException { |
| for (int i = dataNodes.size() - 1; i >= 0; i--) { |
| if (!restartDataNode(i, keepPort)) |
| return false; |
| System.out.println("Restarted DataNode " + i); |
| } |
| return true; |
| } |
| |
| /* |
| * Restart all datanodes, use newly assigned ports |
| */ |
| public boolean restartDataNodes() throws IOException { |
| return restartDataNodes(false); |
| } |
| |
| /** |
| * Returns true if the NameNode is running and is out of Safe Mode |
| * or if waiting for safe mode is disabled. |
| */ |
| public boolean isNameNodeUp(int nnIndex) { |
| NameNode nameNode = nameNodes[nnIndex].nameNode; |
| if (nameNode == null) { |
| return false; |
| } |
| long[] sizes = nameNode.getStats(); |
| boolean isUp = false; |
| synchronized (this) { |
| isUp = ((!nameNode.isInSafeMode() || !waitSafeMode) && sizes[0] != 0); |
| } |
| return isUp; |
| } |
| |
| /** |
| * Returns true if all the NameNodes are running and is out of Safe Mode. |
| */ |
| public boolean isClusterUp() { |
| for (int index = 0; index < nameNodes.length; index++) { |
| if (!isNameNodeUp(index)) { |
| return false; |
| } |
| } |
| return true; |
| } |
| |
| /** |
| * Returns true if there is at least one DataNode running. |
| */ |
| public boolean isDataNodeUp() { |
| if (dataNodes == null || dataNodes.size() == 0) { |
| return false; |
| } |
| for (DataNodeProperties dn : dataNodes) { |
| if (dn.datanode.isDatanodeUp()) { |
| return true; |
| } |
| } |
| return false; |
| } |
| |
| /** |
| * Get a client handle to the DFS cluster with a single namenode. |
| */ |
| public FileSystem getFileSystem() throws IOException { |
| checkSingleNameNode(); |
| return getFileSystem(0); |
| } |
| |
| /** |
| * Get a client handle to the DFS cluster for the namenode at given index. |
| */ |
| public FileSystem getFileSystem(int nnIndex) throws IOException { |
| return FileSystem.get(getURI(nnIndex), nameNodes[nnIndex].conf); |
| } |
| |
| /** |
| * Get another FileSystem instance that is different from FileSystem.get(conf). |
| * This simulating different threads working on different FileSystem instances. |
| */ |
| public FileSystem getNewFileSystemInstance(int nnIndex) throws IOException { |
| return FileSystem.newInstance(getURI(nnIndex), nameNodes[nnIndex].conf); |
| } |
| |
| /** |
| * @return a http URL |
| */ |
| public String getHttpUri(int nnIndex) throws IOException { |
| return "http://" |
| + nameNodes[nnIndex].conf |
| .get(DFSConfigKeys.DFS_NAMENODE_HTTP_ADDRESS_KEY); |
| } |
| |
| /** |
| * @return a {@link HftpFileSystem} object. |
| */ |
| public HftpFileSystem getHftpFileSystem(int nnIndex) throws IOException { |
| String uri = "hftp://" |
| + nameNodes[nnIndex].conf |
| .get(DFSConfigKeys.DFS_NAMENODE_HTTP_ADDRESS_KEY); |
| try { |
| return (HftpFileSystem)FileSystem.get(new URI(uri), conf); |
| } catch (URISyntaxException e) { |
| throw new IOException(e); |
| } |
| } |
| |
| /** |
| * @return a {@link HftpFileSystem} object as specified user. |
| */ |
| public HftpFileSystem getHftpFileSystemAs(final String username, |
| final Configuration conf, final int nnIndex, final String... groups) |
| throws IOException, InterruptedException { |
| final UserGroupInformation ugi = UserGroupInformation.createUserForTesting( |
| username, groups); |
| return ugi.doAs(new PrivilegedExceptionAction<HftpFileSystem>() { |
| @Override |
| public HftpFileSystem run() throws Exception { |
| return getHftpFileSystem(nnIndex); |
| } |
| }); |
| } |
| |
| /** |
| * Get the directories where the namenode stores its image. |
| */ |
| public Collection<URI> getNameDirs(int nnIndex) { |
| return FSNamesystem.getNamespaceDirs(nameNodes[nnIndex].conf); |
| } |
| |
| /** |
| * Get the directories where the namenode stores its edits. |
| */ |
| public Collection<URI> getNameEditsDirs(int nnIndex) { |
| return FSNamesystem.getNamespaceEditsDirs(nameNodes[nnIndex].conf); |
| } |
| |
| /** Wait until the given namenode gets registration from all the datanodes */ |
| public void waitActive(int nnIndex) throws IOException { |
| if (nameNodes.length == 0 || nameNodes[nnIndex] == null) { |
| return; |
| } |
| InetSocketAddress addr = nameNodes[nnIndex].nameNode.getServiceRpcAddress(); |
| DFSClient client = new DFSClient(addr, conf); |
| |
| // ensure all datanodes have registered and sent heartbeat to the namenode |
| while (shouldWait(client.datanodeReport(DatanodeReportType.LIVE), addr)) { |
| try { |
| LOG.info("Waiting for cluster to become active"); |
| Thread.sleep(100); |
| } catch (InterruptedException e) { |
| } |
| } |
| |
| client.close(); |
| } |
| |
| /** |
| * Wait until the cluster is active and running. |
| */ |
| public void waitActive() throws IOException { |
| for (int index = 0; index < nameNodes.length; index++) { |
| waitActive(index); |
| } |
| } |
| |
| private synchronized boolean shouldWait(DatanodeInfo[] dnInfo, |
| InetSocketAddress addr) { |
| // If a datanode failed to start, then do not wait |
| for (DataNodeProperties dn : dataNodes) { |
| // the datanode thread communicating with the namenode should be alive |
| if (!dn.datanode.isBPServiceAlive(addr)) { |
| LOG.warn("BPOfferService failed to start in datanode " + dn.datanode |
| + " for namenode at " + addr); |
| return false; |
| } |
| } |
| |
| // Wait for expected number of datanodes to start |
| if (dnInfo.length != numDataNodes) { |
| return true; |
| } |
| |
| // if one of the data nodes is not fully started, continue to wait |
| for (DataNodeProperties dn : dataNodes) { |
| if (!dn.datanode.isDatanodeFullyStarted()) { |
| return true; |
| } |
| } |
| |
| // make sure all datanodes have sent first heartbeat to namenode, |
| // using (capacity == 0) as proxy. |
| for (DatanodeInfo dn : dnInfo) { |
| if (dn.getCapacity() == 0) { |
| return true; |
| } |
| } |
| |
| // If datanode dataset is not initialized then wait |
| for (DataNodeProperties dn : dataNodes) { |
| if (dn.datanode.data == null) { |
| return true; |
| } |
| } |
| return false; |
| } |
| |
| public void formatDataNodeDirs() throws IOException { |
| base_dir = new File(getBaseDirectory()); |
| data_dir = new File(base_dir, "data"); |
| if (data_dir.exists() && !FileUtil.fullyDelete(data_dir)) { |
| throw new IOException("Cannot remove data directory: " + data_dir); |
| } |
| } |
| |
| /** |
| * |
| * @param dataNodeIndex - data node whose block report is desired - the index is same as for getDataNodes() |
| * @return the block report for the specified data node |
| */ |
| public Iterable<Block> getBlockReport(String bpid, int dataNodeIndex) { |
| if (dataNodeIndex < 0 || dataNodeIndex > dataNodes.size()) { |
| throw new IndexOutOfBoundsException(); |
| } |
| return dataNodes.get(dataNodeIndex).datanode.getFSDataset().getBlockReport( |
| bpid); |
| } |
| |
| |
| /** |
| * |
| * @return block reports from all data nodes |
| * BlockListAsLongs is indexed in the same order as the list of datanodes returned by getDataNodes() |
| */ |
| public Iterable<Block>[] getAllBlockReports(String bpid) { |
| int numDataNodes = dataNodes.size(); |
| Iterable<Block>[] result = new BlockListAsLongs[numDataNodes]; |
| for (int i = 0; i < numDataNodes; ++i) { |
| result[i] = getBlockReport(bpid, i); |
| } |
| return result; |
| } |
| |
| |
| /** |
| * This method is valid only if the data nodes have simulated data |
| * @param dataNodeIndex - data node i which to inject - the index is same as for getDataNodes() |
| * @param blocksToInject - the blocks |
| * @throws IOException |
| * if not simulatedFSDataset |
| * if any of blocks already exist in the data node |
| * |
| */ |
| public void injectBlocks(int dataNodeIndex, Iterable<Block> blocksToInject) throws IOException { |
| if (dataNodeIndex < 0 || dataNodeIndex > dataNodes.size()) { |
| throw new IndexOutOfBoundsException(); |
| } |
| FSDatasetInterface dataSet = dataNodes.get(dataNodeIndex).datanode.getFSDataset(); |
| if (!(dataSet instanceof SimulatedFSDataset)) { |
| throw new IOException("injectBlocks is valid only for SimilatedFSDataset"); |
| } |
| String bpid = getNamesystem().getBlockPoolId(); |
| SimulatedFSDataset sdataset = (SimulatedFSDataset) dataSet; |
| sdataset.injectBlocks(bpid, blocksToInject); |
| dataNodes.get(dataNodeIndex).datanode.scheduleAllBlockReport(0); |
| } |
| |
| /** |
| * Multiple-NameNode version of {@link #injectBlocks(Iterable[])}. |
| */ |
| public void injectBlocks(int nameNodeIndex, int dataNodeIndex, |
| Iterable<Block> blocksToInject) throws IOException { |
| if (dataNodeIndex < 0 || dataNodeIndex > dataNodes.size()) { |
| throw new IndexOutOfBoundsException(); |
| } |
| FSDatasetInterface dataSet = dataNodes.get(dataNodeIndex).datanode.getFSDataset(); |
| if (!(dataSet instanceof SimulatedFSDataset)) { |
| throw new IOException("injectBlocks is valid only for SimilatedFSDataset"); |
| } |
| String bpid = getNamesystem(nameNodeIndex).getBlockPoolId(); |
| SimulatedFSDataset sdataset = (SimulatedFSDataset) dataSet; |
| sdataset.injectBlocks(bpid, blocksToInject); |
| dataNodes.get(dataNodeIndex).datanode.scheduleAllBlockReport(0); |
| } |
| |
| /** |
| * This method is valid only if the data nodes have simulated data |
| * @param blocksToInject - blocksToInject[] is indexed in the same order as the list |
| * of datanodes returned by getDataNodes() |
| * @throws IOException |
| * if not simulatedFSDataset |
| * if any of blocks already exist in the data nodes |
| * Note the rest of the blocks are not injected. |
| */ |
| public void injectBlocks(Iterable<Block>[] blocksToInject) |
| throws IOException { |
| if (blocksToInject.length > dataNodes.size()) { |
| throw new IndexOutOfBoundsException(); |
| } |
| for (int i = 0; i < blocksToInject.length; ++i) { |
| injectBlocks(i, blocksToInject[i]); |
| } |
| } |
| |
| /** |
| * Set the softLimit and hardLimit of client lease periods |
| */ |
| void setLeasePeriod(long soft, long hard) { |
| final FSNamesystem namesystem = getNamesystem(); |
| namesystem.leaseManager.setLeasePeriod(soft, hard); |
| namesystem.lmthread.interrupt(); |
| } |
| |
| /** |
| * Returns the current set of datanodes |
| */ |
| DataNode[] listDataNodes() { |
| DataNode[] list = new DataNode[dataNodes.size()]; |
| for (int i = 0; i < dataNodes.size(); i++) { |
| list[i] = dataNodes.get(i).datanode; |
| } |
| return list; |
| } |
| |
| /** |
| * Access to the data directory used for Datanodes |
| */ |
| public String getDataDirectory() { |
| return data_dir.getAbsolutePath(); |
| } |
| |
| public static String getBaseDirectory() { |
| return System.getProperty("test.build.data", "build/test/data") + "/dfs/"; |
| } |
| |
| /** |
| * Get a storage directory for a datanode. There are two storage directories |
| * per datanode: |
| * <ol> |
| * <li><base directory>/data/data<2*dnIndex + 1></li> |
| * <li><base directory>/data/data<2*dnIndex + 2></li> |
| * </ol> |
| * |
| * @param dnIndex datanode index (starts from 0) |
| * @param dirIndex directory index (0 or 1). Index 0 provides access to the |
| * first storage directory. Index 1 provides access to the second |
| * storage directory. |
| * @return Storage directory |
| */ |
| public static File getStorageDir(int dnIndex, int dirIndex) { |
| return new File(getBaseDirectory() + "data/data" + (2*dnIndex + 1 + dirIndex)); |
| } |
| |
| /** |
| * Get current directory corresponding to the datanode |
| * @param storageDir |
| * @return current directory |
| */ |
| public static String getDNCurrentDir(File storageDir) { |
| return storageDir + "/" + Storage.STORAGE_DIR_CURRENT + "/"; |
| } |
| |
| /** |
| * Get directory corresponding to block pool directory in the datanode |
| * @param storageDir |
| * @return current directory |
| */ |
| public static String getBPDir(File storageDir, String bpid) { |
| return getDNCurrentDir(storageDir) + bpid + "/"; |
| } |
| /** |
| * Get directory relative to block pool directory in the datanode |
| * @param storageDir |
| * @return current directory |
| */ |
| public static String getBPDir(File storageDir, String bpid, String dirName) { |
| return getBPDir(storageDir, bpid) + dirName + "/"; |
| } |
| |
| /** |
| * Get finalized directory for a block pool |
| * @param storageDir storage directory |
| * @param bpid Block pool Id |
| * @return finalized directory for a block pool |
| */ |
| public static File getRbwDir(File storageDir, String bpid) { |
| return new File(getBPDir(storageDir, bpid, Storage.STORAGE_DIR_CURRENT) |
| + DataStorage.STORAGE_DIR_RBW ); |
| } |
| |
| /** |
| * Get finalized directory for a block pool |
| * @param storageDir storage directory |
| * @param bpid Block pool Id |
| * @return finalized directory for a block pool |
| */ |
| public static File getFinalizedDir(File storageDir, String bpid) { |
| return new File(getBPDir(storageDir, bpid, Storage.STORAGE_DIR_CURRENT) |
| + DataStorage.STORAGE_DIR_FINALIZED ); |
| } |
| |
| /** |
| * Get file correpsonding to a block |
| * @param storageDir storage directory |
| * @param blk block to be corrupted |
| * @return file corresponding to the block |
| */ |
| public static File getBlockFile(File storageDir, ExtendedBlock blk) { |
| return new File(getFinalizedDir(storageDir, blk.getBlockPoolId()), |
| blk.getBlockName()); |
| } |
| |
| /** |
| * Get all files related to a block from all the datanodes |
| * @param block block for which corresponding files are needed |
| */ |
| public File[] getAllBlockFiles(ExtendedBlock block) { |
| if (dataNodes.size() == 0) return new File[0]; |
| ArrayList<File> list = new ArrayList<File>(); |
| for (int i=0; i < dataNodes.size(); i++) { |
| File blockFile = getBlockFile(i, block); |
| if (blockFile != null) { |
| list.add(blockFile); |
| } |
| } |
| return list.toArray(new File[list.size()]); |
| } |
| |
| /** |
| * Get files related to a block for a given datanode |
| * @param dnIndex Index of the datanode to get block files for |
| * @param block block for which corresponding files are needed |
| */ |
| public static File getBlockFile(int dnIndex, ExtendedBlock block) { |
| // Check for block file in the two storage directories of the datanode |
| for (int i = 0; i <=1 ; i++) { |
| File storageDir = MiniDFSCluster.getStorageDir(dnIndex, i); |
| File blockFile = getBlockFile(storageDir, block); |
| if (blockFile.exists()) { |
| return blockFile; |
| } |
| } |
| return null; |
| } |
| |
| /** |
| * Throw an exception if the MiniDFSCluster is not started with a single |
| * namenode |
| */ |
| private void checkSingleNameNode() { |
| if (nameNodes.length != 1) { |
| throw new IllegalArgumentException("Namenode index is needed"); |
| } |
| } |
| |
| /** |
| * Add a namenode to a federated cluster and start it. Configuration of |
| * datanodes in the cluster is refreshed to register with the new namenode. |
| * |
| * @return newly started namenode |
| */ |
| public NameNode addNameNode(Configuration conf, int namenodePort) |
| throws IOException { |
| if(!federation) |
| throw new IOException("cannot add namenode to non-federated cluster"); |
| |
| int nnIndex = nameNodes.length; |
| int numNameNodes = nameNodes.length + 1; |
| NameNodeInfo[] newlist = new NameNodeInfo[numNameNodes]; |
| System.arraycopy(nameNodes, 0, newlist, 0, nameNodes.length); |
| nameNodes = newlist; |
| String nameserviceId = NAMESERVICE_ID_PREFIX + (nnIndex + 1); |
| |
| String nameserviceIds = conf.get(DFSConfigKeys.DFS_FEDERATION_NAMESERVICES); |
| nameserviceIds += "," + nameserviceId; |
| conf.set(DFSConfigKeys.DFS_FEDERATION_NAMESERVICES, nameserviceIds); |
| |
| initFederatedNamenodeAddress(conf, nameserviceId, namenodePort); |
| createFederatedNameNode(nnIndex, conf, numDataNodes, true, true, null, |
| null, nameserviceId); |
| |
| // Refresh datanodes with the newly started namenode |
| for (DataNodeProperties dn : dataNodes) { |
| DataNode datanode = dn.datanode; |
| datanode.refreshNamenodes(conf); |
| } |
| |
| // Wait for new namenode to get registrations from all the datanodes |
| waitActive(nnIndex); |
| return nameNodes[nnIndex].nameNode; |
| } |
| |
| private int getFreeSocketPort() { |
| int port = 0; |
| try { |
| ServerSocket s = new ServerSocket(0); |
| port = s.getLocalPort(); |
| s.close(); |
| return port; |
| } catch (IOException e) { |
| // Could not get a free port. Return default port 0. |
| } |
| return port; |
| } |
| |
| private void setupDatanodeAddress(Configuration conf, boolean setupHostsFile) throws IOException { |
| if (setupHostsFile) { |
| String hostsFile = conf.get(DFSConfigKeys.DFS_HOSTS, "").trim(); |
| if (hostsFile.length() == 0) { |
| throw new IOException("Parameter dfs.hosts is not setup in conf"); |
| } |
| // Setup datanode in the include file, if it is defined in the conf |
| String address = "127.0.0.1:" + getFreeSocketPort(); |
| conf.set("dfs.datanode.address", address); |
| addToFile(hostsFile, address); |
| LOG.info("Adding datanode " + address + " to hosts file " + hostsFile); |
| } else { |
| conf.set("dfs.datanode.address", "127.0.0.1:0"); |
| conf.set("dfs.datanode.http.address", "127.0.0.1:0"); |
| conf.set("dfs.datanode.ipc.address", "127.0.0.1:0"); |
| } |
| } |
| |
| private void addToFile(String p, String address) throws IOException { |
| File f = new File(p); |
| f.createNewFile(); |
| PrintWriter writer = new PrintWriter(new FileWriter(f, true)); |
| try { |
| writer.println(address); |
| } finally { |
| writer.close(); |
| } |
| } |
| } |