| /** |
| * Licensed to the Apache Software Foundation (ASF) under one |
| |
| * or more contributor license agreements. See the NOTICE file |
| * distributed with this work for additional information |
| * regarding copyright ownership. The ASF licenses this file |
| * to you under the Apache License, Version 2.0 (the |
| * "License"); you may not use this file except in compliance |
| * with the License. You may obtain a copy of the License at |
| * |
| * http://www.apache.org/licenses/LICENSE-2.0 |
| * |
| * Unless required by applicable law or agreed to in writing, software |
| * distributed under the License is distributed on an "AS IS" BASIS, |
| * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. |
| * See the License for the specific language governing permissions and |
| * limitations under the License. |
| */ |
| |
| package org.apache.hadoop.hdfs.server.datanode; |
| |
| import java.io.File; |
| import java.io.IOException; |
| |
| import com.google.common.base.Supplier; |
| import org.apache.commons.logging.Log; |
| import org.apache.commons.logging.LogFactory; |
| import org.apache.hadoop.conf.Configuration; |
| import org.apache.hadoop.conf.ReconfigurationException; |
| import org.apache.hadoop.hdfs.DFSConfigKeys; |
| import org.apache.hadoop.hdfs.MiniDFSCluster; |
| import org.apache.hadoop.hdfs.protocol.DatanodeID; |
| import org.apache.hadoop.hdfs.server.datanode.fsdataset.FsDatasetSpi; |
| import org.apache.hadoop.hdfs.server.datanode.fsdataset.FsVolumeSpi; |
| import org.apache.hadoop.hdfs.server.datanode.fsdataset.impl.FsDatasetTestUtil; |
| import org.apache.hadoop.hdfs.server.datanode.fsdataset.impl.FsVolumeImpl; |
| import org.apache.hadoop.hdfs.server.protocol.DatanodeRegistration; |
| import org.apache.hadoop.hdfs.server.protocol.InterDatanodeProtocol; |
| import org.apache.hadoop.test.GenericTestUtils; |
| |
| import static org.hamcrest.core.Is.is; |
| import static org.junit.Assert.assertThat; |
| |
| /** |
| * Utility class for accessing package-private DataNode information during tests. |
| * Must not contain usage of classes that are not explicitly listed as |
| * dependencies to {@link MiniDFSCluster}. |
| */ |
| public class DataNodeTestUtils { |
| private static final Log LOG = |
| LogFactory.getLog(DataNodeTestUtils.class); |
| private static final String DIR_FAILURE_SUFFIX = ".origin"; |
| |
| public final static String TEST_CLUSTER_ID = "testClusterID"; |
| public final static String TEST_POOL_ID = "BP-TEST"; |
| |
| public static DatanodeRegistration |
| getDNRegistrationForBP(DataNode dn, String bpid) throws IOException { |
| return dn.getDNRegistrationForBP(bpid); |
| } |
| |
| public static void setHeartbeatsDisabledForTests(DataNode dn, |
| boolean heartbeatsDisabledForTests) { |
| dn.setHeartbeatsDisabledForTests(heartbeatsDisabledForTests); |
| } |
| |
| /** |
| * Set if cache reports are disabled for all DNs in a mini cluster. |
| */ |
| public static void setCacheReportsDisabledForTests(MiniDFSCluster cluster, |
| boolean disabled) { |
| for (DataNode dn : cluster.getDataNodes()) { |
| dn.setCacheReportsDisabledForTest(disabled); |
| } |
| } |
| |
| public static void triggerDeletionReport(DataNode dn) throws IOException { |
| for (BPOfferService bpos : dn.getAllBpOs()) { |
| bpos.triggerDeletionReportForTests(); |
| } |
| } |
| |
| public static void triggerHeartbeat(DataNode dn) throws IOException { |
| for (BPOfferService bpos : dn.getAllBpOs()) { |
| bpos.triggerHeartbeatForTests(); |
| } |
| } |
| |
| public static void triggerBlockReport(DataNode dn) throws IOException { |
| for (BPOfferService bpos : dn.getAllBpOs()) { |
| bpos.triggerBlockReportForTests(); |
| } |
| } |
| |
| public static InterDatanodeProtocol createInterDatanodeProtocolProxy( |
| DataNode dn, DatanodeID datanodeid, final Configuration conf, |
| boolean connectToDnViaHostname) throws IOException { |
| if (connectToDnViaHostname != dn.getDnConf().connectToDnViaHostname) { |
| throw new AssertionError("Unexpected DN hostname configuration"); |
| } |
| return DataNode.createInterDataNodeProtocolProxy(datanodeid, conf, |
| dn.getDnConf().socketTimeout, dn.getDnConf().connectToDnViaHostname); |
| } |
| |
| /** |
| * This method is used for testing. |
| * Examples are adding and deleting blocks directly. |
| * The most common usage will be when the data node's storage is simulated. |
| * |
| * @return the fsdataset that stores the blocks |
| */ |
| public static FsDatasetSpi<?> getFSDataset(DataNode dn) { |
| return dn.getFSDataset(); |
| } |
| |
| /** |
| * Fetch a copy of ReplicaInfo from a datanode by block id |
| * @param dn datanode to retrieve a replicainfo object from |
| * @param bpid Block pool Id |
| * @param blkId id of the replica's block |
| * @return copy of ReplicaInfo object @link{FSDataset#fetchReplicaInfo} |
| */ |
| public static ReplicaInfo fetchReplicaInfo(final DataNode dn, |
| final String bpid, final long blkId) { |
| return FsDatasetTestUtil.fetchReplicaInfo(dn.getFSDataset(), bpid, blkId); |
| } |
| |
| /** |
| * It injects disk failures to data dirs by replacing these data dirs with |
| * regular files. |
| * |
| * @param dirs data directories. |
| * @throws IOException on I/O error. |
| */ |
| public static void injectDataDirFailure(File... dirs) throws IOException { |
| for (File dir : dirs) { |
| File renamedTo = new File(dir.getPath() + DIR_FAILURE_SUFFIX); |
| if (renamedTo.exists()) { |
| throw new IOException(String.format( |
| "Can not inject failure to dir: %s because %s exists.", |
| dir, renamedTo)); |
| } |
| if (!dir.renameTo(renamedTo)) { |
| throw new IOException(String.format("Failed to rename %s to %s.", |
| dir, renamedTo)); |
| } |
| if (!dir.createNewFile()) { |
| throw new IOException(String.format( |
| "Failed to create file %s to inject disk failure.", dir)); |
| } |
| } |
| } |
| |
| /** |
| * Restore the injected data dir failures. |
| * |
| * @see {@link #injectDataDirFailures}. |
| * @param dirs data directories. |
| * @throws IOException |
| */ |
| public static void restoreDataDirFromFailure(File... dirs) |
| throws IOException { |
| for (File dir : dirs) { |
| File renamedDir = new File(dir.getPath() + DIR_FAILURE_SUFFIX); |
| if (renamedDir.exists()) { |
| if (dir.exists()) { |
| if (!dir.isFile()) { |
| throw new IOException( |
| "Injected failure data dir is supposed to be file: " + dir); |
| } |
| if (!dir.delete()) { |
| throw new IOException( |
| "Failed to delete injected failure data dir: " + dir); |
| } |
| } |
| if (!renamedDir.renameTo(dir)) { |
| throw new IOException(String.format( |
| "Failed to recover injected failure data dir %s to %s.", |
| renamedDir, dir)); |
| } |
| } |
| } |
| } |
| |
| public static void runDirectoryScanner(DataNode dn) throws IOException { |
| DirectoryScanner directoryScanner = dn.getDirectoryScanner(); |
| if (directoryScanner != null) { |
| dn.getDirectoryScanner().reconcile(); |
| } |
| } |
| |
| /** |
| * Reconfigure a DataNode by setting a new list of volumes. |
| * |
| * @param dn DataNode to reconfigure |
| * @param newVols new volumes to configure |
| * @throws Exception if there is any failure |
| */ |
| public static void reconfigureDataNode(DataNode dn, File... newVols) |
| throws Exception { |
| StringBuilder dnNewDataDirs = new StringBuilder(); |
| for (File newVol: newVols) { |
| if (dnNewDataDirs.length() > 0) { |
| dnNewDataDirs.append(','); |
| } |
| dnNewDataDirs.append(newVol.getAbsolutePath()); |
| } |
| try { |
| assertThat( |
| dn.reconfigurePropertyImpl( |
| DFSConfigKeys.DFS_DATANODE_DATA_DIR_KEY, |
| dnNewDataDirs.toString()), |
| is(dn.getConf().get(DFSConfigKeys.DFS_DATANODE_DATA_DIR_KEY))); |
| } catch (ReconfigurationException e) { |
| // This can be thrown if reconfiguration tries to use a failed volume. |
| // We need to swallow the exception, because some of our tests want to |
| // cover this case. |
| LOG.warn("Could not reconfigure DataNode.", e); |
| } |
| } |
| |
| /** Get the FsVolume on the given basePath. */ |
| public static FsVolumeImpl getVolume(DataNode dn, File basePath) throws |
| IOException { |
| try (FsDatasetSpi.FsVolumeReferences volumes = dn.getFSDataset() |
| .getFsVolumeReferences()) { |
| for (FsVolumeSpi vol : volumes) { |
| return (FsVolumeImpl) vol; |
| } |
| } |
| return null; |
| } |
| |
| /** |
| * Call and wait DataNode to detect disk failure. |
| * |
| * @param dn |
| * @param volume |
| * @throws Exception |
| */ |
| public static void waitForDiskError(final DataNode dn, FsVolumeSpi volume) |
| throws Exception { |
| LOG.info("Starting to wait for datanode to detect disk failure."); |
| final long lastDiskErrorCheck = dn.getLastDiskErrorCheck(); |
| dn.checkDiskErrorAsync(volume); |
| // Wait 10 seconds for checkDiskError thread to finish and discover volume |
| // failures. |
| GenericTestUtils.waitFor(new Supplier<Boolean>() { |
| |
| @Override |
| public Boolean get() { |
| return dn.getLastDiskErrorCheck() != lastDiskErrorCheck; |
| } |
| }, 100, 10000); |
| } |
| } |