blob: 735a7326d5c6254263552709141a59991b6731fc [file] [log] [blame]
/**
* Licensed to the Apache Software Foundation (ASF) under one
* or more contributor license agreements. See the NOTICE file
* distributed with this work for additional information
* regarding copyright ownership. The ASF licenses this file
* to you under the Apache License, Version 2.0 (the
* "License"); you may not use this file except in compliance
* with the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/
package org.apache.falcon.regression.core.util;
import org.apache.commons.lang.StringUtils;
import org.apache.hadoop.fs.FileStatus;
import org.apache.hadoop.fs.FileSystem;
import org.apache.hadoop.fs.Path;
import org.apache.log4j.Logger;
import java.io.File;
import java.io.FileNotFoundException;
import java.io.FileWriter;
import java.io.IOException;
import java.util.ArrayList;
import java.util.Arrays;
import java.util.List;
import java.util.regex.Pattern;
/**
* Util methods related to hadoop.
*/
public final class HadoopUtil {
public static final String SOMETHING_RANDOM = "somethingRandom";
private static final Logger LOGGER = Logger.getLogger(HadoopUtil.class);
private HadoopUtil() {
throw new AssertionError("Instantiating utility class...");
}
public static List<String> getAllFilesHDFS(FileSystem fs, Path location) throws IOException {
List<String> files = new ArrayList<String>();
if (!fs.exists(location)) {
return files;
}
FileStatus[] stats = fs.listStatus(location);
for (FileStatus stat : stats) {
if (!isDir(stat)) {
files.add(stat.getPath().toString());
}
}
return files;
}
public static List<Path> getAllDirsRecursivelyHDFS(
FileSystem fs, Path location, int depth) throws IOException {
List<Path> returnList = new ArrayList<Path>();
FileStatus[] stats = fs.listStatus(location);
for (FileStatus stat : stats) {
if (isDir(stat)) {
returnList.add(stat.getPath());
if (depth > 0) {
returnList.addAll(getAllDirsRecursivelyHDFS(fs, stat.getPath(), depth - 1));
}
}
}
return returnList;
}
public static List<Path> getAllFilesRecursivelyHDFS(
FileSystem fs, Path location) throws IOException {
List<Path> returnList = new ArrayList<Path>();
FileStatus[] stats;
try {
stats = fs.listStatus(location);
} catch (FileNotFoundException e) {
e.printStackTrace();
return new ArrayList<Path>();
}
if (stats == null) {
return returnList;
}
for (FileStatus stat : stats) {
if (!isDir(stat)) {
if (!stat.getPath().toUri().toString().contains("_SUCCESS")) {
returnList.add(stat.getPath());
}
} else {
returnList.addAll(getAllFilesRecursivelyHDFS(fs, stat.getPath()));
}
}
return returnList;
}
@SuppressWarnings("deprecation")
private static boolean isDir(FileStatus stat) {
return stat.isDir();
}
public static void copyDataToFolder(final FileSystem fs, final String dstHdfsDir,
final String srcFileLocation)
throws IOException {
LOGGER.info(String.format("Copying local dir %s to hdfs location %s on %s",
srcFileLocation, dstHdfsDir, fs.getUri()));
fs.copyFromLocalFile(new Path(srcFileLocation), new Path(dstHdfsDir));
}
public static void uploadDir(final FileSystem fs, final String dstHdfsDir,
final String localLocation)
throws IOException {
LOGGER.info(String.format("Uploading local dir %s to hdfs location %s", localLocation,
dstHdfsDir));
HadoopUtil.deleteDirIfExists(dstHdfsDir, fs);
HadoopUtil.copyDataToFolder(fs, dstHdfsDir, localLocation);
}
public static List<String> getHDFSSubFoldersName(FileSystem fs,
String baseDir) throws IOException {
List<String> returnList = new ArrayList<String>();
FileStatus[] stats = fs.listStatus(new Path(baseDir));
for (FileStatus stat : stats) {
if (isDir(stat)) {
returnList.add(stat.getPath().getName());
}
}
return returnList;
}
public static boolean isFilePresentHDFS(FileSystem fs, String hdfsPath, String fileToCheckFor)
throws IOException {
LOGGER.info("getting file from folder: " + hdfsPath);
List<String> fileNames = getAllFileNamesFromHDFS(fs, hdfsPath);
for (String filePath : fileNames) {
if (filePath.contains(fileToCheckFor)) {
return true;
}
}
return false;
}
private static List<String> getAllFileNamesFromHDFS(
FileSystem fs, String hdfsPath) throws IOException {
List<String> returnList = new ArrayList<String>();
LOGGER.info("getting file from folder: " + hdfsPath);
FileStatus[] stats = fs.listStatus(new Path(hdfsPath));
for (FileStatus stat : stats) {
String currentPath = stat.getPath().toUri().getPath(); // gives directory name
if (!isDir(stat)) {
returnList.add(currentPath);
}
}
return returnList;
}
public static void recreateDir(FileSystem fs, String path) throws IOException {
deleteDirIfExists(path, fs);
LOGGER.info("creating hdfs dir: " + path + " on " + fs.getConf().get("fs.default.name"));
fs.mkdirs(new Path(path));
}
public static void recreateDir(List<FileSystem> fileSystems, String path) throws IOException {
for (FileSystem fs : fileSystems) {
recreateDir(fs, path);
}
}
public static void deleteDirIfExists(String hdfsPath, FileSystem fs) throws IOException {
Path path = new Path(hdfsPath);
if (fs.exists(path)) {
LOGGER.info(String.format("Deleting HDFS path: %s on %s", path, fs.getUri()));
fs.delete(path, true);
} else {
LOGGER.info(String.format(
"Not deleting non-existing HDFS path: %s on %s", path, fs.getUri()));
}
}
public static void flattenAndPutDataInFolder(FileSystem fs, String inputPath,
List<String> remoteLocations) throws IOException {
flattenAndPutDataInFolder(fs, inputPath, "", remoteLocations);
}
public static List<String> flattenAndPutDataInFolder(FileSystem fs, String inputPath,
String remotePathPrefix,
List<String> remoteLocations) throws IOException {
if (StringUtils.isNotEmpty(remotePathPrefix)) {
deleteDirIfExists(remotePathPrefix, fs);
}
LOGGER.info("Creating data in folders: \n" + remoteLocations);
File input = new File(inputPath);
File[] files = input.isDirectory() ? input.listFiles() : new File[]{input};
List<Path> filePaths = new ArrayList<Path>();
assert files != null;
for (final File file : files) {
if (!file.isDirectory()) {
final Path filePath = new Path(file.getAbsolutePath());
filePaths.add(filePath);
}
}
if (!remotePathPrefix.endsWith("/") && !remoteLocations.get(0).startsWith("/")) {
remotePathPrefix += "/";
}
Pattern pattern = Pattern.compile(":[\\d]+/"); // remove 'hdfs(hftp)://server:port'
List<String> locations = new ArrayList<String>();
for (String remoteDir : remoteLocations) {
String remoteLocation = remotePathPrefix + remoteDir;
if (pattern.matcher(remoteLocation).find()) {
remoteLocation = remoteLocation.split(":[\\d]+")[1];
}
locations.add(remoteLocation);
LOGGER.info(String.format("copying to: %s files: %s",
fs.getUri() + remoteLocation, Arrays.toString(files)));
if (!fs.exists(new Path(remoteLocation))) {
fs.mkdirs(new Path(remoteLocation));
}
fs.copyFromLocalFile(false, true, filePaths.toArray(new Path[filePaths.size()]),
new Path(remoteLocation));
}
return locations;
}
public static void copyDataToFolders(FileSystem fs, final String folderPrefix,
List<String> folderList, String... fileLocations) throws IOException {
for (final String folder : folderList) {
boolean r;
String folderSpace = folder.replaceAll("/", "_");
File f = new File(OSUtil.NORMAL_INPUT + folderSpace + ".txt");
if (!f.exists()) {
r = f.createNewFile();
if (!r) {
LOGGER.info("file could not be created");
}
}
FileWriter fr = new FileWriter(f);
fr.append("folder");
fr.close();
fs.copyFromLocalFile(new Path(f.getAbsolutePath()), new Path(folderPrefix + folder));
r = f.delete();
if (!r) {
LOGGER.info("delete was not successful");
}
Path[] srcPaths = new Path[fileLocations.length];
for (int i = 0; i < srcPaths.length; ++i) {
srcPaths[i] = new Path(fileLocations[i]);
}
LOGGER.info(String.format("copying %s to %s%s on %s", Arrays.toString(srcPaths),
folderPrefix, folder, fs.getUri()));
fs.copyFromLocalFile(false, true, srcPaths, new Path(folderPrefix + folder));
}
}
public static void lateDataReplenish(FileSystem fs, int interval,
int minuteSkip, String folderPrefix) throws IOException {
List<String> folderData = TimeUtil.getMinuteDatesOnEitherSide(interval, minuteSkip);
folderData.add(SOMETHING_RANDOM);
flattenAndPutDataInFolder(fs, OSUtil.NORMAL_INPUT, folderPrefix, folderData);
}
public static void createFolders(FileSystem fs, final String folderPrefix,
List<String> folderList) throws IOException {
for (final String folder : folderList) {
fs.mkdirs(new Path(folderPrefix + folder));
}
}
public static void injectMoreData(FileSystem fs, final String remoteLocation,
String localLocation) throws IOException {
File[] files = new File(localLocation).listFiles();
assert files != null;
for (final File file : files) {
if (!file.isDirectory()) {
String path = remoteLocation + "/" + System.currentTimeMillis() / 1000 + "/";
LOGGER.info("inserting data@ " + path);
fs.copyFromLocalFile(new Path(file.getAbsolutePath()), new Path(path));
}
}
}
public static void putFileInFolderHDFS(FileSystem fs, int interval, int minuteSkip,
String folderPrefix, String fileToBePut)
throws IOException {
List<String> folderPaths = TimeUtil.getMinuteDatesOnEitherSide(interval, minuteSkip);
LOGGER.info("folderData: " + folderPaths.toString());
createFolders(fs, folderPrefix, folderPaths);
if (fileToBePut.equals("_SUCCESS")) {
copyDataToFolders(fs, folderPrefix, folderPaths, OSUtil.NORMAL_INPUT + "_SUCCESS");
} else {
copyDataToFolders(fs, folderPrefix, folderPaths, OSUtil.NORMAL_INPUT + "log_01.txt");
}
}
public static void lateDataReplenishWithoutSuccess(FileSystem fs, int interval,
int minuteSkip, String folderPrefix, String postFix) throws IOException {
List<String> folderPaths = TimeUtil.getMinuteDatesOnEitherSide(interval, minuteSkip);
LOGGER.info("folderData: " + folderPaths.toString());
if (postFix != null) {
for (int i = 0; i < folderPaths.size(); i++) {
folderPaths.set(i, folderPaths.get(i) + postFix);
}
}
createFolders(fs, folderPrefix, folderPaths);
copyDataToFolders(fs, folderPrefix, folderPaths,
OSUtil.NORMAL_INPUT + "log_01.txt");
}
public static void lateDataReplenish(FileSystem fs, int interval, int minuteSkip,
String folderPrefix, String postFix) throws IOException {
List<String> folderPaths = TimeUtil.getMinuteDatesOnEitherSide(interval, minuteSkip);
LOGGER.info("folderData: " + folderPaths.toString());
if (postFix != null) {
for (int i = 0; i < folderPaths.size(); i++) {
folderPaths.set(i, folderPaths.get(i) + postFix);
}
}
createFolders(fs, folderPrefix, folderPaths);
copyDataToFolders(fs, folderPrefix, folderPaths,
OSUtil.NORMAL_INPUT + "_SUCCESS", OSUtil.NORMAL_INPUT + "log_01.txt");
}
/**
* Stitch two or more hadoop paths. For eg: stitchHdfsPath("/tmp/", "/test") = "/tmp/test",
* stitchHdfsPath("/tmp/", "test") = "/tmp/test", stitchHdfsPath("/tmp", "test") = "/tmp/test"
* @param path1 the first path to be stitched
* @param path2 the second path to be stitched
* @param pathParts other paths to be stitched
* @return final stitched path
*/
public static String stitchHdfsPath(String path1, String path2, String... pathParts) {
String retValue = stitchTwoHdfsPaths(path1, path2);
for (String pathPart : pathParts) {
retValue = stitchTwoHdfsPaths(retValue, pathPart);
}
return retValue;
}
/**
* Stitch two hadoop paths. For eg: stitchTwoHdfsPaths("/tmp/", "/test") = "/tmp/test",
* stitchTwoHdfsPaths("/tmp/", "test") = "/tmp/test",
* stitchTwoHdfsPaths("/tmp", "test") = "/tmp/test"
* @param path1 the first path to be stitched
* @param path2 the second path to be stitched
* @return final stitched path
*/
private static String stitchTwoHdfsPaths(String path1, String path2) {
return path1.replaceAll("/*$", "") + "/" + path2.replaceAll("^/*", "");
}
}