blob: b990bc5a0d4842fd22218a00596d80d22520070c [file] [log] [blame]
/**
* Licensed to the Apache Software Foundation (ASF) under one
* or more contributor license agreements. See the NOTICE file
* distributed with this work for additional information
* regarding copyright ownership. The ASF licenses this file
* to you under the Apache License, Version 2.0 (the
* "License"); you may not use this file except in compliance
* with the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/
package org.apache.hadoop.hdfs.server.namenode.sps;
import java.io.IOException;
import java.util.HashMap;
import java.util.Iterator;
import java.util.LinkedList;
import java.util.List;
import java.util.Map;
import java.util.Queue;
import org.apache.hadoop.classification.InterfaceAudience;
import org.apache.hadoop.util.Daemon;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
import com.google.common.annotations.VisibleForTesting;
/**
* A Class to track the block collection IDs (Inode's ID) for which physical
* storage movement needed as per the Namespace and StorageReports from DN.
* It scan the pending directories for which storage movement is required and
* schedule the block collection IDs for movement. It track the info of
* scheduled items and remove the SPS xAttr from the file/Directory once
* movement is success.
*/
@InterfaceAudience.Private
public class BlockStorageMovementNeeded {
public static final Logger LOG =
LoggerFactory.getLogger(BlockStorageMovementNeeded.class);
private final Queue<ItemInfo> storageMovementNeeded =
new LinkedList<ItemInfo>();
/**
* Map of startPath and number of child's. Number of child's indicate the
* number of files pending to satisfy the policy.
*/
private final Map<Long, DirPendingWorkInfo> pendingWorkForDirectory =
new HashMap<>();
private final Context ctxt;
private Daemon pathIdCollector;
private SPSPathIdProcessor pathIDProcessor;
// Amount of time to cache the SUCCESS status of path before turning it to
// NOT_AVAILABLE.
private static long statusClearanceElapsedTimeMs = 300000;
public BlockStorageMovementNeeded(Context context) {
this.ctxt = context;
pathIDProcessor = new SPSPathIdProcessor();
}
/**
* Add the candidate to tracking list for which storage movement
* expected if necessary.
*
* @param trackInfo
* - track info for satisfy the policy
*/
public synchronized void add(ItemInfo trackInfo) {
storageMovementNeeded.add(trackInfo);
}
/**
* Add the itemInfo list to tracking list for which storage movement expected
* if necessary.
*
* @param startPath
* - start path
* @param itemInfoList
* - List of child in the directory
* @param scanCompleted
* -Indicates whether the start id directory has no more elements to
* scan.
*/
@VisibleForTesting
public synchronized void addAll(long startPath, List<ItemInfo> itemInfoList,
boolean scanCompleted) {
storageMovementNeeded.addAll(itemInfoList);
updatePendingDirScanStats(startPath, itemInfoList.size(), scanCompleted);
}
/**
* Add the itemInfo to tracking list for which storage movement expected if
* necessary.
*
* @param itemInfoList
* - List of child in the directory
* @param scanCompleted
* -Indicates whether the ItemInfo start id directory has no more
* elements to scan.
*/
@VisibleForTesting
public synchronized void add(ItemInfo itemInfo, boolean scanCompleted) {
storageMovementNeeded.add(itemInfo);
// This represents sps start id is file, so no need to update pending dir
// stats.
if (itemInfo.getStartPath() == itemInfo.getFile()) {
return;
}
updatePendingDirScanStats(itemInfo.getStartPath(), 1, scanCompleted);
}
private void updatePendingDirScanStats(long startPath, int numScannedFiles,
boolean scanCompleted) {
DirPendingWorkInfo pendingWork = pendingWorkForDirectory.get(startPath);
if (pendingWork == null) {
pendingWork = new DirPendingWorkInfo();
pendingWorkForDirectory.put(startPath, pendingWork);
}
pendingWork.addPendingWorkCount(numScannedFiles);
if (scanCompleted) {
pendingWork.markScanCompleted();
}
}
/**
* Gets the satisfier files for which block storage movements check necessary
* and make the movement if required.
*
* @return satisfier files
*/
public synchronized ItemInfo get() {
return storageMovementNeeded.poll();
}
/**
* Returns queue size.
*/
public synchronized int size() {
return storageMovementNeeded.size();
}
public synchronized void clearAll() {
ctxt.removeAllSPSPathIds();
storageMovementNeeded.clear();
pendingWorkForDirectory.clear();
}
/**
* Decrease the pending child count for directory once one file blocks moved
* successfully. Remove the SPS xAttr if pending child count is zero.
*/
public synchronized void removeItemTrackInfo(ItemInfo trackInfo,
boolean isSuccess) throws IOException {
if (trackInfo.isDir()) {
// If track is part of some start inode then reduce the pending
// directory work count.
long startId = trackInfo.getStartPath();
if (!ctxt.isFileExist(startId)) {
// directory deleted just remove it.
this.pendingWorkForDirectory.remove(startId);
} else {
DirPendingWorkInfo pendingWork = pendingWorkForDirectory.get(startId);
if (pendingWork != null) {
pendingWork.decrementPendingWorkCount();
if (pendingWork.isDirWorkDone()) {
ctxt.removeSPSHint(startId);
pendingWorkForDirectory.remove(startId);
}
}
}
} else {
// Remove xAttr if trackID doesn't exist in
// storageMovementAttemptedItems or file policy satisfied.
ctxt.removeSPSHint(trackInfo.getFile());
}
}
public synchronized void clearQueue(long trackId) {
ctxt.removeSPSPathId(trackId);
Iterator<ItemInfo> iterator = storageMovementNeeded.iterator();
while (iterator.hasNext()) {
ItemInfo next = iterator.next();
if (next.getFile() == trackId) {
iterator.remove();
}
}
pendingWorkForDirectory.remove(trackId);
}
/**
* Clean all the movements in spsDirsToBeTraveresed/storageMovementNeeded
* and notify to clean up required resources.
* @throws IOException
*/
public synchronized void clearQueuesWithNotification() {
// Remove xAttr from directories
Long trackId;
while ((trackId = ctxt.getNextSPSPath()) != null) {
try {
// Remove xAttr for file
ctxt.removeSPSHint(trackId);
} catch (IOException ie) {
LOG.warn("Failed to remove SPS xattr for track id " + trackId, ie);
}
}
// File's directly added to storageMovementNeeded, So try to remove
// xAttr for file
ItemInfo itemInfo;
while ((itemInfo = get()) != null) {
try {
// Remove xAttr for file
if (!itemInfo.isDir()) {
ctxt.removeSPSHint(itemInfo.getFile());
}
} catch (IOException ie) {
LOG.warn(
"Failed to remove SPS xattr for track id "
+ itemInfo.getFile(), ie);
}
}
this.clearAll();
}
/**
* Take dir tack ID from the spsDirsToBeTraveresed queue and collect child
* ID's to process for satisfy the policy.
*/
private class SPSPathIdProcessor implements Runnable {
@Override
public void run() {
LOG.info("Starting SPSPathIdProcessor!.");
Long startINode = null;
while (ctxt.isRunning()) {
try {
if (!ctxt.isInSafeMode()) {
if (startINode == null) {
startINode = ctxt.getNextSPSPath();
} // else same id will be retried
if (startINode == null) {
// Waiting for SPS path
Thread.sleep(3000);
} else {
ctxt.scanAndCollectFiles(startINode);
// check if directory was empty and no child added to queue
DirPendingWorkInfo dirPendingWorkInfo =
pendingWorkForDirectory.get(startINode);
if (dirPendingWorkInfo != null
&& dirPendingWorkInfo.isDirWorkDone()) {
ctxt.removeSPSHint(startINode);
pendingWorkForDirectory.remove(startINode);
}
}
startINode = null; // Current inode successfully scanned.
}
} catch (Throwable t) {
String reClass = t.getClass().getName();
if (InterruptedException.class.getName().equals(reClass)) {
LOG.info("SPSPathIdProcessor thread is interrupted. Stopping..");
break;
}
LOG.warn("Exception while scanning file inodes to satisfy the policy",
t);
try {
Thread.sleep(3000);
} catch (InterruptedException e) {
LOG.info("Interrupted while waiting in SPSPathIdProcessor", t);
break;
}
}
}
}
}
/**
* Info for directory recursive scan.
*/
public static class DirPendingWorkInfo {
private int pendingWorkCount = 0;
private boolean fullyScanned = false;
/**
* Increment the pending work count for directory.
*/
public synchronized void addPendingWorkCount(int count) {
this.pendingWorkCount = this.pendingWorkCount + count;
}
/**
* Decrement the pending work count for directory one track info is
* completed.
*/
public synchronized void decrementPendingWorkCount() {
this.pendingWorkCount--;
}
/**
* Return true if all the pending work is done and directory fully
* scanned, otherwise false.
*/
public synchronized boolean isDirWorkDone() {
return (pendingWorkCount <= 0 && fullyScanned);
}
/**
* Mark directory scan is completed.
*/
public synchronized void markScanCompleted() {
this.fullyScanned = true;
}
}
public void activate() {
pathIdCollector = new Daemon(pathIDProcessor);
pathIdCollector.setName("SPSPathIdProcessor");
pathIdCollector.start();
}
public void close() {
if (pathIdCollector != null) {
pathIdCollector.interrupt();
}
}
@VisibleForTesting
public static void setStatusClearanceElapsedTimeMs(
long statusClearanceElapsedTimeMs) {
BlockStorageMovementNeeded.statusClearanceElapsedTimeMs =
statusClearanceElapsedTimeMs;
}
@VisibleForTesting
public static long getStatusClearanceElapsedTimeMs() {
return statusClearanceElapsedTimeMs;
}
public void markScanCompletedForDir(long inode) {
DirPendingWorkInfo pendingWork = pendingWorkForDirectory.get(inode);
if (pendingWork != null) {
pendingWork.markScanCompleted();
}
}
}