| /** |
| * Licensed to the Apache Software Foundation (ASF) under one |
| * or more contributor license agreements. See the NOTICE file |
| * distributed with this work for additional information |
| * regarding copyright ownership. The ASF licenses this file |
| * to you under the Apache License, Version 2.0 (the |
| * "License"); you may not use this file except in compliance |
| * with the License. You may obtain a copy of the License at |
| * <p> |
| * http://www.apache.org/licenses/LICENSE-2.0 |
| * <p> |
| * Unless required by applicable law or agreed to in writing, software |
| * distributed under the License is distributed on an "AS IS" BASIS, |
| * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. |
| * See the License for the specific language governing permissions and |
| * limitations under the License. |
| */ |
| |
| package org.apache.hadoop.hdfs.server.datanode.checker; |
| |
| import static org.apache.hadoop.hdfs.DFSConfigKeys.*; |
| |
| import com.google.common.base.Optional; |
| import com.google.common.collect.Maps; |
| import com.google.common.util.concurrent.ListenableFuture; |
| import com.google.common.util.concurrent.ThreadFactoryBuilder; |
| import org.apache.hadoop.classification.InterfaceAudience; |
| import org.apache.hadoop.classification.InterfaceStability; |
| import org.apache.hadoop.conf.Configuration; |
| import org.apache.hadoop.fs.FileSystem; |
| import org.apache.hadoop.fs.LocalFileSystem; |
| import org.apache.hadoop.fs.permission.FsPermission; |
| import org.apache.hadoop.hdfs.DFSConfigKeys; |
| import org.apache.hadoop.hdfs.server.datanode.DataNode; |
| import org.apache.hadoop.hdfs.server.datanode.StorageLocation; |
| import org.apache.hadoop.hdfs.server.datanode.StorageLocation.CheckContext; |
| import org.apache.hadoop.util.DiskChecker.DiskErrorException; |
| import org.apache.hadoop.util.Timer; |
| import org.slf4j.Logger; |
| import org.slf4j.LoggerFactory; |
| |
| import java.io.IOException; |
| import java.util.ArrayList; |
| import java.util.Collection; |
| import java.util.HashMap; |
| import java.util.HashSet; |
| import java.util.LinkedHashMap; |
| import java.util.List; |
| import java.util.Map; |
| import java.util.Set; |
| import java.util.concurrent.ExecutionException; |
| import java.util.concurrent.Executors; |
| import java.util.concurrent.TimeUnit; |
| import java.util.concurrent.TimeoutException; |
| |
| /** |
| * A utility class that encapsulates checking storage locations during DataNode |
| * startup. |
| * |
| * Some of this code was extracted from the DataNode class. |
| */ |
| @InterfaceAudience.Private |
| @InterfaceStability.Unstable |
| public class StorageLocationChecker { |
| public static final Logger LOG = LoggerFactory.getLogger( |
| StorageLocationChecker.class); |
| private final AsyncChecker<CheckContext, VolumeCheckResult> delegateChecker; |
| private final Timer timer; |
| |
| /** |
| * Max allowed time for a disk check in milliseconds. If the check |
| * doesn't complete within this time we declare the disk as dead. |
| */ |
| private final long maxAllowedTimeForCheckMs; |
| |
| |
| /** |
| * Expected filesystem permissions on the storage directory. |
| */ |
| private final FsPermission expectedPermission; |
| |
| /** |
| * Maximum number of volume failures that can be tolerated without |
| * declaring a fatal error. |
| */ |
| private final int maxVolumeFailuresTolerated; |
| |
| public StorageLocationChecker(Configuration conf, Timer timer) |
| throws DiskErrorException { |
| maxAllowedTimeForCheckMs = conf.getTimeDuration( |
| DFS_DATANODE_DISK_CHECK_TIMEOUT_KEY, |
| DFS_DATANODE_DISK_CHECK_TIMEOUT_DEFAULT, |
| TimeUnit.MILLISECONDS); |
| |
| if (maxAllowedTimeForCheckMs <= 0) { |
| throw new DiskErrorException("Invalid value configured for " |
| + DFS_DATANODE_DISK_CHECK_TIMEOUT_KEY + " - " |
| + maxAllowedTimeForCheckMs + " (should be > 0)"); |
| } |
| |
| expectedPermission = new FsPermission( |
| conf.get(DFS_DATANODE_DATA_DIR_PERMISSION_KEY, |
| DFS_DATANODE_DATA_DIR_PERMISSION_DEFAULT)); |
| |
| maxVolumeFailuresTolerated = conf.getInt( |
| DFS_DATANODE_FAILED_VOLUMES_TOLERATED_KEY, |
| DFS_DATANODE_FAILED_VOLUMES_TOLERATED_DEFAULT); |
| |
| if (maxVolumeFailuresTolerated < DataNode.MAX_VOLUME_FAILURE_TOLERATED_LIMIT) { |
| throw new DiskErrorException("Invalid value configured for " |
| + DFS_DATANODE_FAILED_VOLUMES_TOLERATED_KEY + " - " |
| + maxVolumeFailuresTolerated + " " |
| + DataNode.MAX_VOLUME_FAILURES_TOLERATED_MSG); |
| } |
| |
| this.timer = timer; |
| |
| delegateChecker = new ThrottledAsyncChecker<>( |
| timer, |
| conf.getTimeDuration( |
| DFSConfigKeys.DFS_DATANODE_DISK_CHECK_MIN_GAP_KEY, |
| DFSConfigKeys.DFS_DATANODE_DISK_CHECK_MIN_GAP_DEFAULT, |
| TimeUnit.MILLISECONDS), |
| 0, |
| Executors.newCachedThreadPool( |
| new ThreadFactoryBuilder() |
| .setNameFormat("StorageLocationChecker thread %d") |
| .setDaemon(true) |
| .build())); |
| } |
| |
| /** |
| * Initiate a check on the supplied storage volumes and return |
| * a list of healthy volumes. |
| * |
| * StorageLocations are returned in the same order as the input |
| * for compatibility with existing unit tests. |
| * |
| * @param conf HDFS configuration. |
| * @param dataDirs list of volumes to check. |
| * @return returns a list of healthy volumes. Returns an empty list if |
| * there are no healthy volumes. |
| * |
| * @throws InterruptedException if the check was interrupted. |
| * @throws IOException if the number of failed volumes exceeds the |
| * maximum allowed or if there are no good |
| * volumes. |
| */ |
| public List<StorageLocation> check( |
| final Configuration conf, |
| final Collection<StorageLocation> dataDirs) |
| throws InterruptedException, IOException { |
| |
| final HashMap<StorageLocation, Boolean> goodLocations = |
| new LinkedHashMap<>(); |
| final Set<StorageLocation> failedLocations = new HashSet<>(); |
| final Map<StorageLocation, ListenableFuture<VolumeCheckResult>> futures = |
| Maps.newHashMap(); |
| final LocalFileSystem localFS = FileSystem.getLocal(conf); |
| final CheckContext context = new CheckContext(localFS, expectedPermission); |
| |
| // Start parallel disk check operations on all StorageLocations. |
| for (StorageLocation location : dataDirs) { |
| goodLocations.put(location, true); |
| Optional<ListenableFuture<VolumeCheckResult>> olf = |
| delegateChecker.schedule(location, context); |
| if (olf.isPresent()) { |
| futures.put(location, olf.get()); |
| } |
| } |
| |
| if (maxVolumeFailuresTolerated >= dataDirs.size()) { |
| throw new DiskErrorException("Invalid value configured for " |
| + DFS_DATANODE_FAILED_VOLUMES_TOLERATED_KEY + " - " |
| + maxVolumeFailuresTolerated + ". Value configured is >= " |
| + "to the number of configured volumes (" + dataDirs.size() + ")."); |
| } |
| |
| final long checkStartTimeMs = timer.monotonicNow(); |
| |
| // Retrieve the results of the disk checks. |
| for (Map.Entry<StorageLocation, |
| ListenableFuture<VolumeCheckResult>> entry : futures.entrySet()) { |
| |
| // Determine how much time we can allow for this check to complete. |
| // The cumulative wait time cannot exceed maxAllowedTimeForCheck. |
| final long waitSoFarMs = (timer.monotonicNow() - checkStartTimeMs); |
| final long timeLeftMs = Math.max(0, |
| maxAllowedTimeForCheckMs - waitSoFarMs); |
| final StorageLocation location = entry.getKey(); |
| |
| try { |
| final VolumeCheckResult result = |
| entry.getValue().get(timeLeftMs, TimeUnit.MILLISECONDS); |
| switch (result) { |
| case HEALTHY: |
| break; |
| case DEGRADED: |
| LOG.warn("StorageLocation {} appears to be degraded.", location); |
| break; |
| case FAILED: |
| LOG.warn("StorageLocation {} detected as failed.", location); |
| failedLocations.add(location); |
| goodLocations.remove(location); |
| break; |
| default: |
| LOG.error("Unexpected health check result {} for StorageLocation {}", |
| result, location); |
| } |
| } catch (ExecutionException | TimeoutException e) { |
| LOG.warn("Exception checking StorageLocation " + location, |
| e.getCause()); |
| failedLocations.add(location); |
| goodLocations.remove(location); |
| } |
| } |
| |
| if (maxVolumeFailuresTolerated == DataNode.MAX_VOLUME_FAILURE_TOLERATED_LIMIT) { |
| if (dataDirs.size() == failedLocations.size()) { |
| throw new DiskErrorException("Too many failed volumes - " |
| + "current valid volumes: " + goodLocations.size() |
| + ", volumes configured: " + dataDirs.size() + ", volumes failed: " |
| + failedLocations.size() + ", volume failures tolerated: " |
| + maxVolumeFailuresTolerated); |
| } |
| } else { |
| if (failedLocations.size() > maxVolumeFailuresTolerated) { |
| throw new DiskErrorException("Too many failed volumes - " |
| + "current valid volumes: " + goodLocations.size() |
| + ", volumes configured: " + dataDirs.size() + ", volumes failed: " |
| + failedLocations.size() + ", volume failures tolerated: " |
| + maxVolumeFailuresTolerated); |
| } |
| } |
| |
| if (goodLocations.size() == 0) { |
| throw new DiskErrorException("All directories in " |
| + DFS_DATANODE_DATA_DIR_KEY + " are invalid: " |
| + failedLocations); |
| } |
| |
| return new ArrayList<>(goodLocations.keySet()); |
| } |
| |
| public void shutdownAndWait(int gracePeriod, TimeUnit timeUnit) { |
| try { |
| delegateChecker.shutdownAndWait(gracePeriod, timeUnit); |
| } catch (InterruptedException e) { |
| LOG.warn("StorageLocationChecker interrupted during shutdown."); |
| Thread.currentThread().interrupt(); |
| } |
| } |
| } |