blob: 2af4ed9980bd8a4d678c46585baad71b093be19f [file] [log] [blame]
/**
* Licensed to the Apache Software Foundation (ASF) under one
* or more contributor license agreements. See the NOTICE file
* distributed with this work for additional information
* regarding copyright ownership. The ASF licenses this file
* to you under the Apache License, Version 2.0 (the
* "License"); you may not use this file except in compliance
* with the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/
package org.apache.hadoop.raid;
import java.io.IOException;
import java.util.ArrayList;
import java.util.Collections;
import java.util.Comparator;
import java.util.Iterator;
import java.util.List;
import org.apache.commons.logging.Log;
import org.apache.commons.logging.LogFactory;
import org.apache.hadoop.conf.Configuration;
import org.apache.hadoop.conf.Configured;
import org.apache.hadoop.fs.FileStatus;
import org.apache.hadoop.fs.FileSystem;
import org.apache.hadoop.fs.Path;
import org.apache.hadoop.util.StringUtils;
import org.apache.hadoop.raid.protocol.PolicyInfo;
public class RaidFilter {
static class Statistics {
long numRaided = 0;
long numTooNew = 0;
long sizeTooNew = 0;
long numTooSmall = 0;
long sizeTooSmall = 0;
public void aggregate(Statistics other) {
this.numRaided += other.numRaided;
this.numTooNew += other.numTooNew;
this.sizeTooNew += other.sizeTooNew;
this.numTooSmall += other.numTooSmall;
this.sizeTooSmall += other.sizeTooSmall;
}
public String toString() {
return "numRaided = " + numRaided +
", numTooNew = " + numTooNew +
", sizeTooNew = " + sizeTooNew +
", numTooSmall = " + numTooSmall +
", sizeTooSmall = " + sizeTooSmall;
}
}
static class TimeBasedFilter extends Configured
implements DirectoryTraversal.FileFilter {
int targetRepl;
Path raidDestPrefix;
long modTimePeriod;
long startTime;
Statistics stats = new Statistics();
String currentSrcPath = null;
long[] modTimePeriods = new long[0];
String[] otherSrcPaths = new String[0];
TimeBasedFilter(Configuration conf, Path destPrefix, int targetRepl,
long startTime, long modTimePeriod) {
super(conf);
this.raidDestPrefix = destPrefix;
this.targetRepl = targetRepl;
this.startTime = startTime;
this.modTimePeriod = modTimePeriod;
}
TimeBasedFilter(Configuration conf,
Path destPrefix, PolicyInfo info,
List<PolicyInfo> allPolicies, long startTime, Statistics stats) {
super(conf);
this.raidDestPrefix = destPrefix;
this.targetRepl = Integer.parseInt(info.getProperty("targetReplication"));
this.modTimePeriod = Long.parseLong(info.getProperty("modTimePeriod"));
this.startTime = startTime;
this.stats = stats;
this.currentSrcPath = info.getSrcPath().toUri().getPath();
initializeOtherPaths(allPolicies);
}
private void initializeOtherPaths(List<PolicyInfo> allPolicies) {
ArrayList<PolicyInfo> tmp = new ArrayList<PolicyInfo>(allPolicies);
// Remove all policies where srcPath <= currentSrcPath or
// matchingPrefixLength is < length(currentSrcPath)
// The policies remaining are the only ones that could better
// select a file chosen by the current policy.
for (Iterator<PolicyInfo> it = tmp.iterator(); it.hasNext(); ) {
String src = it.next().getSrcPath().toUri().getPath();
if (src.compareTo(currentSrcPath) <= 0) {
it.remove();
continue;
}
int matchLen = matchingPrefixLength(src, currentSrcPath);
if (matchLen < currentSrcPath.length()) {
it.remove();
}
}
// Sort in reverse lexicographic order.
Collections.sort(tmp, new Comparator() {
public int compare(Object o1, Object o2) {
return 0 -
((PolicyInfo)o1).getSrcPath().toUri().getPath().compareTo(
((PolicyInfo)o1).getSrcPath().toUri().getPath());
}
});
otherSrcPaths = new String[tmp.size()];
modTimePeriods = new long[otherSrcPaths.length];
for (int i = 0; i < otherSrcPaths.length; i++) {
otherSrcPaths[i] = tmp.get(i).getSrcPath().toUri().getPath();
modTimePeriods[i] = Long.parseLong(
tmp.get(i).getProperty("modTimePeriod"));
}
}
public boolean check(FileStatus f) throws IOException {
if (!canChooseForCurrentPolicy(f)) {
return false;
}
// If the source file has fewer than or equal to 2 blocks, then skip it.
long blockSize = f.getBlockSize();
if (2 * blockSize >= f.getLen()) {
stats.numTooSmall++;
stats.sizeTooSmall += f.getLen();
return false;
}
boolean select = false;
try {
Object ppair = RaidNode.getParityFile(
raidDestPrefix, f.getPath(), getConf());
// Is there is a valid parity file?
if (ppair != null) {
// Is the source at the target replication?
if (f.getReplication() != targetRepl) {
// Select the file so that its replication can be set.
select = true;
} else {
stats.numRaided++;
// Nothing to do, don't select the file.
select = false;
}
} else {
// No parity file.
if (f.getModificationTime() + modTimePeriod < startTime) {
// If the file is not too new, choose it for raiding.
select = true;
} else {
select = false;
stats.numTooNew++;
stats.sizeTooNew += f.getLen();
}
}
} catch (java.io.FileNotFoundException e) {
select = true; // destination file does not exist
} catch (java.io.IOException e) {
// If there is a problem with the har path, this will let us continue.
DirectoryTraversal.LOG.error(
"Error while selecting " + StringUtils.stringifyException(e));
}
return select;
}
/**
* Checks if a file can be chosen for the current policy.
*/
boolean canChooseForCurrentPolicy(FileStatus stat) {
boolean choose = true;
if (otherSrcPaths.length > 0) {
String fileStr = stat.getPath().toUri().getPath();
// For a given string, find the best matching srcPath.
int matchWithCurrent = matchingPrefixLength(fileStr, currentSrcPath);
for (int i = 0; i < otherSrcPaths.length; i++) {
// If the file is too new, move to the next.
if (stat.getModificationTime() > startTime - modTimePeriods[i]) {
continue;
}
int matchLen = matchingPrefixLength(fileStr, otherSrcPaths[i]);
if (matchLen > 0 &&
fileStr.charAt(matchLen - 1) == Path.SEPARATOR_CHAR) {
matchLen--;
}
if (matchLen > matchWithCurrent) {
choose = false;
break;
}
}
}
return choose;
}
int matchingPrefixLength(final String s1, final String s2) {
int len = 0;
for (int j = 0; j < s1.length() && j < s2.length(); j++) {
if (s1.charAt(j) == s2.charAt(j)) {
len++;
} else {
break;
}
}
return len;
}
}
static class PreferenceFilter extends Configured
implements DirectoryTraversal.FileFilter {
Path firstChoicePrefix;
DirectoryTraversal.FileFilter secondChoiceFilter;
PreferenceFilter(Configuration conf,
Path firstChoicePrefix, Path secondChoicePrefix,
int targetRepl, long startTime, long modTimePeriod) {
super(conf);
this.firstChoicePrefix = firstChoicePrefix;
this.secondChoiceFilter = new TimeBasedFilter(conf,
secondChoicePrefix, targetRepl, startTime, modTimePeriod);
}
PreferenceFilter(Configuration conf,
Path firstChoicePrefix, Path secondChoicePrefix,
PolicyInfo info, List<PolicyInfo> allPolicies, long startTime,
Statistics stats) {
super(conf);
this.firstChoicePrefix = firstChoicePrefix;
this.secondChoiceFilter = new TimeBasedFilter(
conf, secondChoicePrefix, info, allPolicies, startTime, stats);
}
public boolean check(FileStatus f) throws IOException {
Object firstChoicePPair =
RaidNode.getParityFile(firstChoicePrefix, f.getPath(), getConf());
if (firstChoicePPair == null) {
// The decision is upto the the second choice filter.
return secondChoiceFilter.check(f);
} else {
// There is already a parity file under the first choice path.
// We dont want to choose this file.
return false;
}
}
}
}