| /* |
| * Licensed to the Apache Software Foundation (ASF) under one |
| * or more contributor license agreements. See the NOTICE file |
| * distributed with this work for additional information |
| * regarding copyright ownership. The ASF licenses this file |
| * to you under the Apache License, Version 2.0 (the |
| * "License"); you may not use this file except in compliance |
| * with the License. You may obtain a copy of the License at |
| * |
| * http://www.apache.org/licenses/LICENSE-2.0 |
| * |
| * Unless required by applicable law or agreed to in writing, software |
| * distributed under the License is distributed on an "AS IS" BASIS, |
| * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. |
| * See the License for the specific language governing permissions and |
| * limitations under the License. |
| */ |
| package org.apache.drill.exec.store.dfs; |
| |
| import org.apache.drill.common.exceptions.DrillRuntimeException; |
| import org.apache.drill.exec.util.DrillFileSystemUtil; |
| import org.apache.drill.shaded.guava.com.google.common.base.Preconditions; |
| import org.apache.drill.shaded.guava.com.google.common.base.Stopwatch; |
| import org.apache.drill.shaded.guava.com.google.common.collect.Lists; |
| import org.apache.hadoop.fs.FileStatus; |
| import org.apache.hadoop.fs.Path; |
| import org.slf4j.Logger; |
| import org.slf4j.LoggerFactory; |
| |
| import java.io.IOException; |
| import java.net.URI; |
| import java.util.ArrayList; |
| import java.util.Arrays; |
| import java.util.List; |
| import java.util.Objects; |
| import java.util.concurrent.TimeUnit; |
| import java.util.stream.Collectors; |
| |
| /** |
| * Jackson serializable description of a file selection. |
| */ |
| public class FileSelection { |
| |
| private static final Logger logger = LoggerFactory.getLogger(FileSelection.class); |
| private static final String WILD_CARD = "*"; |
| |
| private List<FileStatus> statuses; |
| |
| public List<Path> files; |
| /** |
| * root path for the selections |
| */ |
| public final Path selectionRoot; |
| /** |
| * root path for the metadata cache file (if any) |
| */ |
| public final Path cacheFileRoot; |
| |
| /** |
| * metadata context useful for metadata operations (if any) |
| */ |
| private MetadataContext metaContext = null; |
| |
| /** |
| * Indicates whether this selectionRoot is an empty directory |
| */ |
| private boolean emptyDirectory; |
| |
| private enum StatusType { |
| NOT_CHECKED, // initial state |
| NO_DIRS, // no directories in this selection |
| HAS_DIRS, // directories were found in the selection |
| EXPANDED_FULLY, // whether selection fully expanded to files |
| EXPANDED_PARTIAL // whether selection partially expanded to only directories (not files) |
| } |
| |
| private StatusType dirStatus; |
| // whether this selection previously had a wildcard, false by default |
| private boolean hadWildcard; |
| // whether all partitions were previously pruned for this selection, false by default |
| private boolean wasAllPartitionsPruned; |
| |
| /** |
| * Creates a {@link FileSelection selection} out of given file statuses/files and selection root. |
| * |
| * @param statuses list of file statuses |
| * @param files list of files |
| * @param selectionRoot root path for selections |
| */ |
| public FileSelection(List<FileStatus> statuses, List<Path> files, Path selectionRoot) { |
| this(statuses, files, selectionRoot, null, false, StatusType.NOT_CHECKED); |
| } |
| |
| public FileSelection(List<FileStatus> statuses, List<Path> files, Path selectionRoot, Path cacheFileRoot, |
| boolean wasAllPartitionsPruned) { |
| this(statuses, files, selectionRoot, cacheFileRoot, wasAllPartitionsPruned, StatusType.NOT_CHECKED); |
| } |
| |
| public FileSelection(List<FileStatus> statuses, List<Path> files, Path selectionRoot, Path cacheFileRoot, |
| boolean wasAllPartitionsPruned, StatusType dirStatus) { |
| this.statuses = statuses; |
| this.files = files; |
| this.selectionRoot = selectionRoot; |
| this.dirStatus = dirStatus; |
| this.cacheFileRoot = cacheFileRoot; |
| this.wasAllPartitionsPruned = wasAllPartitionsPruned; |
| } |
| |
| /** |
| * Copy constructor for convenience. |
| */ |
| protected FileSelection(FileSelection selection) { |
| Preconditions.checkNotNull(selection, "File selection cannot be null"); |
| this.statuses = selection.statuses; |
| this.files = selection.files; |
| this.selectionRoot = selection.selectionRoot; |
| this.dirStatus = selection.dirStatus; |
| this.cacheFileRoot = selection.cacheFileRoot; |
| this.metaContext = selection.metaContext; |
| this.hadWildcard = selection.hadWildcard; |
| this.wasAllPartitionsPruned = selection.wasAllPartitionsPruned; |
| this.emptyDirectory = selection.emptyDirectory; |
| } |
| |
| public Path getSelectionRoot() { |
| return selectionRoot; |
| } |
| |
| public List<FileStatus> getStatuses(DrillFileSystem fs) throws IOException { |
| Stopwatch timer = logger.isDebugEnabled() ? Stopwatch.createStarted() : null; |
| |
| if (statuses == null) { |
| List<FileStatus> newStatuses = new ArrayList<>(); |
| for (Path pathStr : Objects.requireNonNull(files, "Files can not be null if statuses are null")) { |
| newStatuses.add(fs.getFileStatus(pathStr)); |
| } |
| statuses = newStatuses; |
| } |
| if (timer != null) { |
| logger.debug("FileSelection.getStatuses() took {} ms, numFiles: {}", |
| timer.elapsed(TimeUnit.MILLISECONDS), statuses == null ? 0 : statuses.size()); |
| timer.stop(); |
| } |
| |
| return statuses; |
| } |
| |
| public List<Path> getFiles() { |
| if (files == null) { |
| files = Objects.requireNonNull(statuses, "Statuses can not be null if files are null").stream() |
| .map(FileStatus::getPath) |
| .collect(Collectors.toList()); |
| } |
| return files; |
| } |
| |
| public boolean containsDirectories(DrillFileSystem fs) throws IOException { |
| if (dirStatus == StatusType.NOT_CHECKED) { |
| dirStatus = StatusType.NO_DIRS; |
| for (FileStatus status : getStatuses(fs)) { |
| if (status.isDirectory()) { |
| dirStatus = StatusType.HAS_DIRS; |
| break; |
| } |
| } |
| } |
| return dirStatus == StatusType.HAS_DIRS; |
| } |
| |
| public FileSelection minusDirectories(DrillFileSystem fs) throws IOException { |
| if (isExpandedFully()) { |
| return this; |
| } |
| Stopwatch timer = logger.isDebugEnabled() ? Stopwatch.createStarted() : null; |
| List<FileStatus> statuses = getStatuses(fs); |
| |
| List<FileStatus> nonDirectories = Lists.newArrayList(); |
| for (FileStatus status : statuses) { |
| nonDirectories.addAll(DrillFileSystemUtil.listFiles(fs, status.getPath(), true)); |
| } |
| |
| FileSelection fileSel = create(nonDirectories, null, selectionRoot); |
| if (timer != null) { |
| logger.debug("FileSelection.minusDirectories() took {} ms, numFiles: {}", timer.elapsed(TimeUnit.MILLISECONDS), statuses.size()); |
| timer.stop(); |
| } |
| |
| // fileSel will be null if we query an empty folder |
| if (fileSel != null) { |
| fileSel.setExpandedFully(); |
| } |
| |
| return fileSel; |
| } |
| |
| public FileStatus getFirstPath(DrillFileSystem fs) throws IOException { |
| return getStatuses(fs).get(0); |
| } |
| |
| public void setExpandedFully() { |
| this.dirStatus = StatusType.EXPANDED_FULLY; |
| } |
| |
| public boolean isExpandedFully() { |
| return dirStatus == StatusType.EXPANDED_FULLY; |
| } |
| |
| public void setExpandedPartial() { |
| this.dirStatus = StatusType.EXPANDED_PARTIAL; |
| } |
| |
| public boolean isExpandedPartial() { |
| return dirStatus == StatusType.EXPANDED_PARTIAL; |
| } |
| |
| public StatusType getDirStatus() { |
| return dirStatus; |
| } |
| |
| public boolean wasAllPartitionsPruned() { |
| return this.wasAllPartitionsPruned; |
| } |
| |
| /** |
| * Returns longest common path for the given list of files. |
| * |
| * @param files list of files. |
| * @return longest common path |
| */ |
| private static Path commonPathForFiles(List<Path> files) { |
| if (files == null || files.isEmpty()) { |
| return new Path("/"); |
| } |
| |
| int total = files.size(); |
| String[][] folders = new String[total][]; |
| int shortest = Integer.MAX_VALUE; |
| for (int i = 0; i < total; i++) { |
| folders[i] = files.get(i).toUri().getPath().split(Path.SEPARATOR); |
| shortest = Math.min(shortest, folders[i].length); |
| } |
| |
| int latest; |
| out: |
| for (latest = 0; latest < shortest; latest++) { |
| String current = folders[0][latest]; |
| for (int i = 1; i < folders.length; i++) { |
| if (!current.equals(folders[i][latest])) { |
| break out; |
| } |
| } |
| } |
| URI uri = files.get(0).toUri(); |
| String pathString = buildPath(folders[0], latest); |
| return new Path(uri.getScheme(), uri.getAuthority(), pathString); |
| } |
| |
| private static String buildPath(String[] path, int folderIndex) { |
| StringBuilder builder = new StringBuilder(); |
| for (int i=0; i<folderIndex; i++) { |
| builder.append(path[i]).append(Path.SEPARATOR); |
| } |
| builder.deleteCharAt(builder.length()-1); |
| return builder.toString(); |
| } |
| |
| public static FileSelection create(DrillFileSystem fs, String parent, String path, |
| boolean allowAccessOutsideWorkspace) throws IOException { |
| Stopwatch timer = logger.isDebugEnabled() ? Stopwatch.createStarted() : null; |
| boolean hasWildcard = path.contains(WILD_CARD); |
| |
| Path combined = new Path(parent, removeLeadingSlash(path)); |
| if (!allowAccessOutsideWorkspace) { |
| checkBackPaths(new Path(parent).toUri().getPath(), combined.toUri().getPath(), path); |
| } |
| FileStatus[] statuses = fs.globStatus(combined); // note: this would expand wildcards |
| if (statuses == null) { |
| return null; |
| } |
| FileSelection fileSel = create(Arrays.asList(statuses), null, combined); |
| if (timer != null) { |
| logger.debug("FileSelection.create() took {} ms ", timer.elapsed(TimeUnit.MILLISECONDS)); |
| timer.stop(); |
| } |
| if (fileSel == null) { |
| return null; |
| } |
| fileSel.setHadWildcard(hasWildcard); |
| return fileSel; |
| |
| } |
| |
| /** |
| * Creates a {@link FileSelection selection} with the given file statuses/files and selection root. |
| * |
| * @param statuses list of file statuses |
| * @param files list of files |
| * @param root root path for selections |
| * @param cacheFileRoot root path for metadata cache (null for no metadata cache) |
| * @return null if creation of {@link FileSelection} fails with an {@link IllegalArgumentException} |
| * otherwise a new selection. |
| * |
| * @see FileSelection#FileSelection(List, List, Path) |
| */ |
| public static FileSelection create(List<FileStatus> statuses, List<Path> files, Path root, |
| Path cacheFileRoot, boolean wasAllPartitionsPruned) { |
| boolean bothNonEmptySelection = (statuses != null && statuses.size() > 0) && (files != null && files.size() > 0); |
| boolean bothEmptySelection = (statuses == null || statuses.size() == 0) && (files == null || files.size() == 0); |
| |
| if (bothNonEmptySelection || bothEmptySelection) { |
| return null; |
| } |
| |
| Path selectionRoot; |
| if (statuses == null || statuses.isEmpty()) { |
| selectionRoot = commonPathForFiles(files); |
| } else { |
| Objects.requireNonNull(root, "Selection root is null"); |
| Path rootPath = handleWildCard(root); |
| URI uri = statuses.get(0).getPath().toUri(); |
| selectionRoot = new Path(uri.getScheme(), uri.getAuthority(), rootPath.toUri().getPath()); |
| } |
| return new FileSelection(statuses, files, selectionRoot, cacheFileRoot, wasAllPartitionsPruned); |
| } |
| |
| public static FileSelection create(List<FileStatus> statuses, List<Path> files, Path root) { |
| return FileSelection.create(statuses, files, root, null, false); |
| } |
| |
| public static FileSelection createFromDirectories(List<Path> dirPaths, FileSelection selection, |
| Path cacheFileRoot) { |
| Stopwatch timer = logger.isDebugEnabled() ? Stopwatch.createStarted() : null; |
| Path root = selection.getSelectionRoot(); |
| Objects.requireNonNull(root, "Selection root is null"); |
| if (dirPaths == null || dirPaths.isEmpty()) { |
| throw new DrillRuntimeException("List of directories is null or empty"); |
| } |
| |
| // for wildcard the directory list should have already been expanded |
| List<Path> dirs = selection.hadWildcard() ? selection.getFileStatuses().stream() |
| .map(FileStatus::getPath) |
| .collect(Collectors.toList()) : new ArrayList<>(dirPaths); |
| |
| Path rootPath = handleWildCard(root); |
| URI uri = selection.getFileStatuses().get(0).getPath().toUri(); |
| Path path = new Path(uri.getScheme(), uri.getAuthority(), rootPath.toUri().getPath()); |
| FileSelection fileSel = new FileSelection(null, dirs, path, cacheFileRoot, false); |
| fileSel.setHadWildcard(selection.hadWildcard()); |
| if (timer != null) { |
| logger.debug("FileSelection.createFromDirectories() took {} ms ", timer.elapsed(TimeUnit.MILLISECONDS)); |
| timer.stop(); |
| } |
| return fileSel; |
| } |
| |
| private static Path handleWildCard(Path root) { |
| String stringRoot = root.toUri().getPath(); |
| if (stringRoot.contains(WILD_CARD)) { |
| int idx = stringRoot.indexOf(WILD_CARD); // first wild card in the path |
| idx = stringRoot.lastIndexOf('/', idx); // file separator right before the first wild card |
| String newRoot = stringRoot.substring(0, idx); |
| return DrillFileSystemUtil.createPathSafe(newRoot); |
| } else { |
| return new Path(stringRoot); |
| } |
| } |
| |
| public static String removeLeadingSlash(String path) { |
| if (!path.isEmpty() && path.charAt(0) == '/') { |
| String newPath = path.substring(1); |
| return removeLeadingSlash(newPath); |
| } else { |
| return path; |
| } |
| } |
| |
| /** |
| * Check if the path is a valid sub path under the parent after removing backpaths. Throw an exception if |
| * it is not. We pass subpath in as a parameter only for the error message |
| * |
| * @param parent The parent path (the workspace directory). |
| * @param combinedPath The workspace directory and (relative) subpath path combined. |
| * @param subpath For error message only, the subpath |
| */ |
| public static void checkBackPaths(String parent, String combinedPath, String subpath) { |
| Preconditions.checkArgument(!parent.isEmpty(), "Invalid root (" + parent + ") in file selection path."); |
| Preconditions.checkArgument(!combinedPath.isEmpty(), "Empty path (" + combinedPath + "( in file selection path."); |
| |
| if (!combinedPath.startsWith(parent)) { |
| throw new IllegalArgumentException( |
| String.format("Invalid path [%s] takes you outside the workspace.", subpath)); |
| } |
| } |
| |
| public List<FileStatus> getFileStatuses() { |
| return statuses; |
| } |
| |
| public boolean supportsDirPruning() { |
| if (isExpandedFully() || isExpandedPartial()) { |
| if (!wasAllPartitionsPruned) { |
| return true; |
| } |
| } |
| return false; |
| } |
| |
| public void setHadWildcard(boolean wc) { |
| this.hadWildcard = wc; |
| } |
| |
| public boolean hadWildcard() { |
| return this.hadWildcard; |
| } |
| |
| public Path getCacheFileRoot() { |
| return cacheFileRoot; |
| } |
| |
| public void setMetaContext(MetadataContext context) { |
| metaContext = context; |
| } |
| |
| public MetadataContext getMetaContext() { |
| return metaContext; |
| } |
| |
| /** |
| * @return true if this {@link FileSelection#selectionRoot} points to an empty directory, false otherwise |
| */ |
| public boolean isEmptyDirectory() { |
| return emptyDirectory; |
| } |
| |
| /** |
| * Setting {@link FileSelection#emptyDirectory} as true allows to identify this {@link FileSelection#selectionRoot} |
| * as an empty directory |
| */ |
| public void setEmptyDirectoryStatus() { |
| this.emptyDirectory = true; |
| } |
| |
| @Override |
| public String toString() { |
| StringBuilder sb = new StringBuilder(); |
| sb.append("root=").append(selectionRoot); |
| sb.append("files=["); |
| sb.append(getFiles().stream() |
| .map(Path::toString) |
| .collect(Collectors.joining(", "))); |
| sb.append("]"); |
| return sb.toString(); |
| } |
| } |