blob: 72e653b89c6ed1cd98a39e1d9178bc4219f3cf47 [file] [log] [blame]
/*
* Licensed to the Apache Software Foundation (ASF) under one
* or more contributor license agreements. See the NOTICE file
* distributed with this work for additional information
* regarding copyright ownership. The ASF licenses this file
* to you under the Apache License, Version 2.0 (the
* "License"); you may not use this file except in compliance
* with the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/
package org.apache.beam.sdk.io.hdfs;
import java.io.FileNotFoundException;
import java.io.IOException;
import java.net.URI;
import java.nio.ByteBuffer;
import java.nio.channels.Channels;
import java.nio.channels.ReadableByteChannel;
import java.nio.channels.SeekableByteChannel;
import java.nio.channels.WritableByteChannel;
import java.nio.file.FileAlreadyExistsException;
import java.util.ArrayList;
import java.util.Collection;
import java.util.Collections;
import java.util.HashSet;
import java.util.List;
import java.util.Set;
import org.apache.beam.sdk.io.FileSystem;
import org.apache.beam.sdk.io.fs.CreateOptions;
import org.apache.beam.sdk.io.fs.MatchResult;
import org.apache.beam.sdk.io.fs.MatchResult.Metadata;
import org.apache.beam.sdk.io.fs.MatchResult.Status;
import org.apache.beam.vendor.guava.v20_0.com.google.common.annotations.VisibleForTesting;
import org.apache.beam.vendor.guava.v20_0.com.google.common.collect.ImmutableList;
import org.apache.hadoop.conf.Configuration;
import org.apache.hadoop.fs.FSDataInputStream;
import org.apache.hadoop.fs.FSInputStream;
import org.apache.hadoop.fs.FileStatus;
import org.apache.hadoop.fs.FileUtil;
import org.apache.hadoop.fs.Path;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
/**
* Adapts {@link org.apache.hadoop.fs.FileSystem} connectors to be used as Apache Beam {@link
* FileSystem FileSystems}.
*
* <p>The following HDFS FileSystem(s) are known to be unsupported:
*
* <ul>
* <li>FTPFileSystem: Missing seek support within FTPInputStream
* </ul>
*
* <p>This implementation assumes that the underlying Hadoop {@link FileSystem} is seek efficient
* when reading. The source code for the following {@link FSInputStream} implementations (as of
* Hadoop 2.7.1) do provide seek implementations:
*
* <ul>
* <li>HarFsInputStream
* <li>S3InputStream
* <li>DFSInputStream
* <li>SwiftNativeInputStream
* <li>NativeS3FsInputStream
* <li>LocalFSFileInputStream
* <li>NativeAzureFsInputStream
* <li>S3AInputStream
* </ul>
*/
class HadoopFileSystem extends FileSystem<HadoopResourceId> {
private static final Logger LOG = LoggerFactory.getLogger(HadoopFileSystem.class);
@VisibleForTesting static final String LOG_CREATE_DIRECTORY = "Creating directory %s";
@VisibleForTesting static final String LOG_DELETING_EXISTING_FILE = "Deleting existing file %s";
@VisibleForTesting final org.apache.hadoop.fs.FileSystem fileSystem;
HadoopFileSystem(Configuration configuration) throws IOException {
this.fileSystem = org.apache.hadoop.fs.FileSystem.newInstance(configuration);
}
@Override
protected List<MatchResult> match(List<String> specs) {
ImmutableList.Builder<MatchResult> resultsBuilder = ImmutableList.builder();
for (String spec : specs) {
try {
Set<Metadata> metadata = new HashSet<>();
FileStatus[] fileStatuses = fileSystem.globStatus(new Path(spec));
if (fileStatuses != null) {
for (FileStatus fileStatus : fileStatuses) {
if (fileStatus.isFile()) {
metadata.add(toMetadata(fileStatus));
}
}
}
if (spec.contains("**")) {
int index = spec.indexOf("**");
metadata.addAll(
matchRecursiveGlob(spec.substring(0, index + 1), spec.substring(index + 1)));
}
if (metadata.isEmpty()) {
resultsBuilder.add(MatchResult.create(Status.NOT_FOUND, Collections.emptyList()));
} else {
resultsBuilder.add(MatchResult.create(Status.OK, new ArrayList<>(metadata)));
}
} catch (IOException e) {
resultsBuilder.add(MatchResult.create(Status.ERROR, e));
}
}
return resultsBuilder.build();
}
private Set<Metadata> matchRecursiveGlob(String directorySpec, String fileSpec)
throws IOException {
Set<Metadata> metadata = new HashSet<>();
if (directorySpec.contains("*")) {
// An abstract directory with a wildcard is converted to concrete directories to search.
FileStatus[] directoryStatuses = fileSystem.globStatus(new Path(directorySpec));
for (FileStatus directoryStatus : directoryStatuses) {
if (directoryStatus.isDirectory()) {
metadata.addAll(
matchRecursiveGlob(directoryStatus.getPath().toUri().toString(), fileSpec));
}
}
} else {
// A concrete directory is searched.
FileStatus[] fileStatuses = fileSystem.globStatus(new Path(directorySpec + "/" + fileSpec));
for (FileStatus fileStatus : fileStatuses) {
if (fileStatus.isFile()) {
metadata.add(toMetadata(fileStatus));
}
}
// All sub-directories of a concrete directory are searched.
FileStatus[] directoryStatuses = fileSystem.globStatus(new Path(directorySpec + "/*"));
for (FileStatus directoryStatus : directoryStatuses) {
if (directoryStatus.isDirectory()) {
metadata.addAll(
matchRecursiveGlob(directoryStatus.getPath().toUri().toString(), fileSpec));
}
}
// Handle additional instances of recursive globs.
if (fileSpec.contains("**")) {
int index = fileSpec.indexOf("**");
metadata.addAll(
matchRecursiveGlob(
directorySpec + "/" + fileSpec.substring(0, index + 1),
fileSpec.substring(index + 1)));
}
}
return metadata;
}
private Metadata toMetadata(FileStatus fileStatus) {
URI uri = dropEmptyAuthority(fileStatus.getPath().toUri().toString());
return Metadata.builder()
.setResourceId(new HadoopResourceId(uri))
.setIsReadSeekEfficient(true)
.setSizeBytes(fileStatus.getLen())
.setLastModifiedMillis(fileStatus.getModificationTime())
.build();
}
@Override
protected WritableByteChannel create(HadoopResourceId resourceId, CreateOptions createOptions)
throws IOException {
return Channels.newChannel(fileSystem.create(resourceId.toPath()));
}
@Override
protected ReadableByteChannel open(HadoopResourceId resourceId) throws IOException {
FileStatus fileStatus = fileSystem.getFileStatus(resourceId.toPath());
return new HadoopSeekableByteChannel(fileStatus, fileSystem.open(resourceId.toPath()));
}
@Override
protected void copy(List<HadoopResourceId> srcResourceIds, List<HadoopResourceId> destResourceIds)
throws IOException {
for (int i = 0; i < srcResourceIds.size(); ++i) {
// Unfortunately HDFS FileSystems don't support a native copy operation so we are forced
// to use the inefficient implementation found in FileUtil which copies all the bytes through
// the local machine.
//
// HDFS FileSystem does define a concat method but could only find the DFSFileSystem
// implementing it. The DFSFileSystem implemented concat by deleting the srcs after which
// is not what we want. Also, all the other FileSystem implementations I saw threw
// UnsupportedOperationException within concat.
boolean success =
FileUtil.copy(
fileSystem,
srcResourceIds.get(i).toPath(),
fileSystem,
destResourceIds.get(i).toPath(),
false,
true,
fileSystem.getConf());
if (!success) {
// Defensive coding as this should not happen in practice
throw new IOException(
String.format(
"Unable to copy resource %s to %s. No further information provided by underlying filesystem.",
srcResourceIds.get(i).toPath(), destResourceIds.get(i).toPath()));
}
}
}
/**
* Renames a {@link List} of file-like resources from one location to another.
*
* <p>The number of source resources must equal the number of destination resources. Destination
* resources will be created recursively.
*
* @param srcResourceIds the references of the source resources
* @param destResourceIds the references of the destination resources
* @throws FileNotFoundException if the source resources are missing. When rename throws, the
* state of the resources is unknown but safe: for every (source, destination) pair of
* resources, the following are possible: a) source exists, b) destination exists, c) source
* and destination both exist. Thus no data is lost, however, duplicated resource are
* possible. In such scenarios, callers can use {@code match()} to determine the state of the
* resource.
* @throws FileAlreadyExistsException if a target resource already exists and couldn't be
* overwritten.
* @throws IOException if the underlying filesystem indicates the rename was not performed but no
* other errors were thrown.
*/
@Override
protected void rename(
List<HadoopResourceId> srcResourceIds, List<HadoopResourceId> destResourceIds)
throws IOException {
for (int i = 0; i < srcResourceIds.size(); ++i) {
Path src = srcResourceIds.get(i).toPath();
Path dest = destResourceIds.get(i).toPath();
// rename in HDFS requires the target directory to exist or silently fails (BEAM-4861)
mkdirs(dest);
boolean success = fileSystem.rename(src, dest);
// If the failure was due to the file already existing, delete and retry (BEAM-5036).
// This should be the exceptional case, so handle here rather than incur the overhead of
// testing first
if (!success && fileSystem.exists(src) && fileSystem.exists(dest)) {
LOG.debug(
String.format(LOG_DELETING_EXISTING_FILE, Path.getPathWithoutSchemeAndAuthority(dest)));
fileSystem.delete(dest, false); // not recursive
success = fileSystem.rename(src, dest);
}
if (!success) {
if (!fileSystem.exists(src)) {
throw new FileNotFoundException(
String.format("Unable to rename resource %s to %s as source not found.", src, dest));
} else if (fileSystem.exists(dest)) {
throw new FileAlreadyExistsException(
String.format(
"Unable to rename resource %s to %s as destination already exists and couldn't be deleted.",
src, dest));
} else {
throw new IOException(
String.format(
"Unable to rename resource %s to %s. No further information provided by underlying filesystem.",
src, dest));
}
}
}
}
/** Ensures that the target directory exists for the given filePath. */
private void mkdirs(Path filePath) throws IOException {
Path targetDirectory = filePath.getParent();
if (!fileSystem.exists(targetDirectory)) {
LOG.debug(
String.format(
LOG_CREATE_DIRECTORY, Path.getPathWithoutSchemeAndAuthority(targetDirectory)));
boolean success = fileSystem.mkdirs(targetDirectory);
if (!success) {
throw new IOException(
String.format(
"Unable to create target directory %s. No further information provided by underlying filesystem.",
targetDirectory));
}
}
}
@Override
protected void delete(Collection<HadoopResourceId> resourceIds) throws IOException {
for (HadoopResourceId resourceId : resourceIds) {
// ignore response as issues are surfaced with exception
fileSystem.delete(resourceId.toPath(), false);
}
}
@Override
protected HadoopResourceId matchNewResource(String singleResourceSpec, boolean isDirectory) {
if (singleResourceSpec.endsWith("/") && !isDirectory) {
throw new IllegalArgumentException(
String.format("Expected file path but received directory path %s", singleResourceSpec));
}
return !singleResourceSpec.endsWith("/") && isDirectory
? new HadoopResourceId(dropEmptyAuthority(singleResourceSpec + "/"))
: new HadoopResourceId(dropEmptyAuthority(singleResourceSpec));
}
@Override
protected String getScheme() {
return fileSystem.getScheme();
}
/** An adapter around {@link FSDataInputStream} that implements {@link SeekableByteChannel}. */
private static class HadoopSeekableByteChannel implements SeekableByteChannel {
private final FileStatus fileStatus;
private final FSDataInputStream inputStream;
private boolean closed;
private HadoopSeekableByteChannel(FileStatus fileStatus, FSDataInputStream inputStream) {
this.fileStatus = fileStatus;
this.inputStream = inputStream;
this.closed = false;
}
@Override
public int read(ByteBuffer dst) throws IOException {
if (closed) {
throw new IOException("Channel is closed");
}
// O length read must be supported
int read = 0;
// We avoid using the ByteBuffer based read for Hadoop because some FSDataInputStream
// implementations are not ByteBufferReadable,
// See https://issues.apache.org/jira/browse/HADOOP-14603
if (dst.hasArray()) {
// does the same as inputStream.read(dst):
// stores up to dst.remaining() bytes into dst.array() starting at dst.position().
// But dst can have an offset with its backing array hence the + dst.arrayOffset()
read = inputStream.read(dst.array(), dst.position() + dst.arrayOffset(), dst.remaining());
} else {
// TODO: Add support for off heap ByteBuffers in case the underlying FSDataInputStream
// does not support reading from a ByteBuffer.
read = inputStream.read(dst);
}
if (read > 0) {
dst.position(dst.position() + read);
}
return read;
}
@Override
public int write(ByteBuffer src) {
throw new UnsupportedOperationException();
}
@Override
public long position() throws IOException {
if (closed) {
throw new IOException("Channel is closed");
}
return inputStream.getPos();
}
@Override
public SeekableByteChannel position(long newPosition) throws IOException {
if (closed) {
throw new IOException("Channel is closed");
}
inputStream.seek(newPosition);
return this;
}
@Override
public long size() throws IOException {
if (closed) {
throw new IOException("Channel is closed");
}
return fileStatus.getLen();
}
@Override
public SeekableByteChannel truncate(long size) {
throw new UnsupportedOperationException();
}
@Override
public boolean isOpen() {
return !closed;
}
@Override
public void close() throws IOException {
closed = true;
inputStream.close();
}
}
private static URI dropEmptyAuthority(String uriStr) {
URI uri = URI.create(uriStr);
String prefix = uri.getScheme() + ":///";
if (uriStr.startsWith(prefix)) {
return URI.create(uri.getScheme() + ":/" + uriStr.substring(prefix.length()));
} else {
return uri;
}
}
}