blob: 7f8271cdf3c8146dd94287c9018411b0c2733fe1 [file] [log] [blame]
/*
* Licensed to the Apache Software Foundation (ASF) under one
* or more contributor license agreements. See the NOTICE file
* distributed with this work for additional information
* regarding copyright ownership. The ASF licenses this file
* to you under the Apache License, Version 2.0 (the
* "License"); you may not use this file except in compliance
* with the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/
package org.apache.beam.sdk.io;
import static org.apache.beam.vendor.guava.v26_0_jre.com.google.common.base.Preconditions.checkArgument;
import static org.apache.beam.vendor.guava.v26_0_jre.com.google.common.base.Preconditions.checkNotNull;
import static org.apache.beam.vendor.guava.v26_0_jre.com.google.common.base.Preconditions.checkState;
import static org.apache.beam.vendor.guava.v26_0_jre.com.google.common.base.Verify.verify;
import java.io.IOException;
import java.nio.channels.ReadableByteChannel;
import java.nio.channels.SeekableByteChannel;
import java.util.ArrayList;
import java.util.List;
import java.util.ListIterator;
import java.util.NoSuchElementException;
import javax.annotation.Nullable;
import org.apache.beam.sdk.io.fs.EmptyMatchTreatment;
import org.apache.beam.sdk.io.fs.MatchResult;
import org.apache.beam.sdk.io.fs.MatchResult.Metadata;
import org.apache.beam.sdk.options.PipelineOptions;
import org.apache.beam.sdk.options.ValueProvider;
import org.apache.beam.sdk.options.ValueProvider.StaticValueProvider;
import org.apache.beam.sdk.transforms.display.DisplayData;
import org.apache.beam.vendor.guava.v26_0_jre.com.google.common.collect.ImmutableList;
import org.joda.time.Instant;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
/**
* A common base class for all file-based {@link Source}s. Extend this class to implement your own
* file-based custom source.
*
* <p>A file-based {@code Source} is a {@code Source} backed by a file pattern defined as a Java
* glob, a single file, or a offset range for a single file. See {@link OffsetBasedSource} and
* {@link org.apache.beam.sdk.io.range.RangeTracker} for semantics of offset ranges.
*
* <p>This source stores a {@code String} that is a {@link FileSystems} specification for a file or
* file pattern. There should be a {@link FileSystem} registered for the file specification
* provided. Please refer to {@link FileSystems} and {@link FileSystem} for more information on
* this.
*
* <p>In addition to the methods left abstract from {@code BoundedSource}, subclasses must implement
* methods to create a sub-source and a reader for a range of a single file - {@link
* #createForSubrangeOfFile} and {@link #createSingleFileReader}. Please refer to {@link TextIO
* TextIO.TextSource} for an example implementation of {@code FileBasedSource}.
*
* @param <T> Type of records represented by the source.
*/
public abstract class FileBasedSource<T> extends OffsetBasedSource<T> {
private static final Logger LOG = LoggerFactory.getLogger(FileBasedSource.class);
private final ValueProvider<String> fileOrPatternSpec;
private final EmptyMatchTreatment emptyMatchTreatment;
@Nullable private MatchResult.Metadata singleFileMetadata;
private final Mode mode;
/** A given {@code FileBasedSource} represents a file resource of one of these types. */
public enum Mode {
FILEPATTERN,
SINGLE_FILE_OR_SUBRANGE
}
/**
* Create a {@code FileBaseSource} based on a file or a file pattern specification, with the given
* strategy for treating filepatterns that do not match any files.
*/
protected FileBasedSource(
ValueProvider<String> fileOrPatternSpec,
EmptyMatchTreatment emptyMatchTreatment,
long minBundleSize) {
super(0, Long.MAX_VALUE, minBundleSize);
this.mode = Mode.FILEPATTERN;
this.emptyMatchTreatment = emptyMatchTreatment;
this.fileOrPatternSpec = fileOrPatternSpec;
}
/**
* Like {@link #FileBasedSource(ValueProvider, EmptyMatchTreatment, long)}, but uses the default
* value of {@link EmptyMatchTreatment#DISALLOW}.
*/
protected FileBasedSource(ValueProvider<String> fileOrPatternSpec, long minBundleSize) {
this(fileOrPatternSpec, EmptyMatchTreatment.DISALLOW, minBundleSize);
}
/**
* Create a {@code FileBasedSource} based on a single file. This constructor must be used when
* creating a new {@code FileBasedSource} for a subrange of a single file. Additionally, this
* constructor must be used to create new {@code FileBasedSource}s when subclasses implement the
* method {@link #createForSubrangeOfFile}.
*
* <p>See {@link OffsetBasedSource} for detailed descriptions of {@code minBundleSize}, {@code
* startOffset}, and {@code endOffset}.
*
* @param fileMetadata specification of the file represented by the {@link FileBasedSource}, in
* suitable form for use with {@link FileSystems#match(List)}.
* @param minBundleSize minimum bundle size in bytes.
* @param startOffset starting byte offset.
* @param endOffset ending byte offset. If the specified value {@code >= #getMaxEndOffset()} it
* implies {@code #getMaxEndOffSet()}.
*/
protected FileBasedSource(
Metadata fileMetadata, long minBundleSize, long startOffset, long endOffset) {
super(startOffset, endOffset, minBundleSize);
mode = Mode.SINGLE_FILE_OR_SUBRANGE;
this.singleFileMetadata = checkNotNull(fileMetadata, "fileMetadata");
this.fileOrPatternSpec = StaticValueProvider.of(fileMetadata.resourceId().toString());
// This field will be unused in this mode.
this.emptyMatchTreatment = EmptyMatchTreatment.DISALLOW;
}
/**
* Returns the information about the single file that this source is reading from.
*
* @throws IllegalArgumentException if this source is in {@link Mode#FILEPATTERN} mode.
*/
protected final MatchResult.Metadata getSingleFileMetadata() {
checkArgument(
mode == Mode.SINGLE_FILE_OR_SUBRANGE,
"This function should only be called for a single file, not %s",
this);
checkState(
singleFileMetadata != null,
"It should not be possible to construct a %s in mode %s with null metadata: %s",
FileBasedSource.class,
mode,
this);
return singleFileMetadata;
}
public final String getFileOrPatternSpec() {
return fileOrPatternSpec.get();
}
public final ValueProvider<String> getFileOrPatternSpecProvider() {
return fileOrPatternSpec;
}
public final EmptyMatchTreatment getEmptyMatchTreatment() {
return emptyMatchTreatment;
}
public final Mode getMode() {
return mode;
}
@Override
public final FileBasedSource<T> createSourceForSubrange(long start, long end) {
checkArgument(
mode != Mode.FILEPATTERN, "Cannot split a file pattern based source based on positions");
checkArgument(
start >= getStartOffset(),
"Start offset value %s of the subrange cannot be smaller than the start offset value %s"
+ " of the parent source",
start,
getStartOffset());
checkArgument(
end <= getEndOffset(),
"End offset value %s of the subrange cannot be larger than the end offset value %s",
end,
getEndOffset());
checkState(
singleFileMetadata != null, "A single file source should not have null metadata: %s", this);
FileBasedSource<T> source = createForSubrangeOfFile(singleFileMetadata, start, end);
if (start > 0 || end != Long.MAX_VALUE) {
checkArgument(
source.getMode() == Mode.SINGLE_FILE_OR_SUBRANGE,
"Source created for the range [%s,%s) must be a subrange source",
start,
end);
}
return source;
}
/**
* Creates and returns a new {@code FileBasedSource} of the same type as the current {@code
* FileBasedSource} backed by a given file and an offset range. When current source is being
* split, this method is used to generate new sub-sources. When creating the source subclasses
* must call the constructor {@link #FileBasedSource(Metadata, long, long, long)} of {@code
* FileBasedSource} with corresponding parameter values passed here.
*
* @param fileMetadata file backing the new {@code FileBasedSource}.
* @param start starting byte offset of the new {@code FileBasedSource}.
* @param end ending byte offset of the new {@code FileBasedSource}. May be Long.MAX_VALUE, in
* which case it will be inferred using {@link FileBasedSource#getMaxEndOffset}.
*/
protected abstract FileBasedSource<T> createForSubrangeOfFile(
Metadata fileMetadata, long start, long end);
/**
* Creates and returns an instance of a {@code FileBasedReader} implementation for the current
* source assuming the source represents a single file. File patterns will be handled by {@code
* FileBasedSource} implementation automatically.
*/
protected abstract FileBasedReader<T> createSingleFileReader(PipelineOptions options);
@Override
public final long getEstimatedSizeBytes(PipelineOptions options) throws IOException {
// This implementation of method getEstimatedSizeBytes is provided to simplify subclasses. Here
// we perform the size estimation of files and file patterns using the interface provided by
// FileSystem.
String fileOrPattern = fileOrPatternSpec.get();
if (mode == Mode.FILEPATTERN) {
long totalSize = 0;
List<Metadata> allMatches = FileSystems.match(fileOrPattern, emptyMatchTreatment).metadata();
for (Metadata metadata : allMatches) {
totalSize += metadata.sizeBytes();
}
LOG.info(
"Filepattern {} matched {} files with total size {}",
fileOrPattern,
allMatches.size(),
totalSize);
return totalSize;
} else {
long start = getStartOffset();
long end = Math.min(getEndOffset(), getMaxEndOffset(options));
return end - start;
}
}
@Override
public void populateDisplayData(DisplayData.Builder builder) {
super.populateDisplayData(builder);
if (mode == Mode.FILEPATTERN) {
builder.add(
DisplayData.item("filePattern", getFileOrPatternSpecProvider())
.withLabel("File Pattern"));
}
}
@Override
public final List<? extends FileBasedSource<T>> split(
long desiredBundleSizeBytes, PipelineOptions options) throws Exception {
// This implementation of method split is provided to simplify subclasses. Here we
// split a FileBasedSource based on a file pattern to FileBasedSources based on full single
// files. For files that can be efficiently seeked, we further split FileBasedSources based on
// those files to FileBasedSources based on sub ranges of single files.
String fileOrPattern = fileOrPatternSpec.get();
if (mode == Mode.FILEPATTERN) {
long startTime = System.currentTimeMillis();
List<Metadata> expandedFiles =
FileSystems.match(fileOrPattern, emptyMatchTreatment).metadata();
List<FileBasedSource<T>> splitResults = new ArrayList<>(expandedFiles.size());
for (Metadata metadata : expandedFiles) {
FileBasedSource<T> split = createForSubrangeOfFile(metadata, 0, metadata.sizeBytes());
verify(
split.getMode() == Mode.SINGLE_FILE_OR_SUBRANGE,
"%s.createForSubrangeOfFile must return a source in mode %s",
split,
Mode.SINGLE_FILE_OR_SUBRANGE);
// The split is NOT in FILEPATTERN mode, so we can call its split without fear
// of recursion. This will break a single file into multiple splits when the file is
// splittable and larger than the desired bundle size.
splitResults.addAll(split.split(desiredBundleSizeBytes, options));
}
LOG.info(
"Splitting filepattern {} into bundles of size {} took {} ms "
+ "and produced {} files and {} bundles",
fileOrPattern,
desiredBundleSizeBytes,
System.currentTimeMillis() - startTime,
expandedFiles.size(),
splitResults.size());
return splitResults;
} else {
if (isSplittable()) {
@SuppressWarnings("unchecked")
List<FileBasedSource<T>> splits =
(List<FileBasedSource<T>>) super.split(desiredBundleSizeBytes, options);
return splits;
} else {
LOG.debug(
"The source for file {} is not split into sub-range based sources since "
+ "the file is not seekable",
fileOrPattern);
return ImmutableList.of(this);
}
}
}
/**
* Determines whether a file represented by this source is can be split into bundles.
*
* <p>By default, a source in mode {@link Mode#FILEPATTERN} is always splittable, because
* splitting will involve expanding the file pattern and producing single-file/subrange sources,
* which may or may not be splittable themselves.
*
* <p>By default, a source in {@link Mode#SINGLE_FILE_OR_SUBRANGE} is splittable if it is on a
* file system that supports efficient read seeking.
*
* <p>Subclasses may override to provide different behavior.
*/
protected boolean isSplittable() throws Exception {
if (mode == Mode.FILEPATTERN) {
// split will expand file pattern and return single file or subrange sources that
// may or may not be splittable.
return true;
}
return getSingleFileMetadata().isReadSeekEfficient();
}
@Override
public final BoundedReader<T> createReader(PipelineOptions options) throws IOException {
// Validate the current source prior to creating a reader for it.
this.validate();
String fileOrPattern = fileOrPatternSpec.get();
if (mode == Mode.FILEPATTERN) {
long startTime = System.currentTimeMillis();
List<Metadata> fileMetadata =
FileSystems.match(fileOrPattern, emptyMatchTreatment).metadata();
LOG.info("Matched {} files for pattern {}", fileMetadata.size(), fileOrPattern);
List<FileBasedReader<T>> fileReaders = new ArrayList<>();
for (Metadata metadata : fileMetadata) {
long endOffset = metadata.sizeBytes();
fileReaders.add(
createForSubrangeOfFile(metadata, 0, endOffset).createSingleFileReader(options));
}
LOG.debug(
"Creating a reader for file pattern {} took {} ms",
fileOrPattern,
System.currentTimeMillis() - startTime);
if (fileReaders.size() == 1) {
return fileReaders.get(0);
}
return new FilePatternReader(this, fileReaders);
} else {
return createSingleFileReader(options);
}
}
@Override
public String toString() {
switch (mode) {
case FILEPATTERN:
return fileOrPatternSpec.toString();
case SINGLE_FILE_OR_SUBRANGE:
return fileOrPatternSpec + " range " + super.toString();
default:
throw new IllegalStateException("Unexpected mode: " + mode);
}
}
@Override
public void validate() {
super.validate();
switch (mode) {
case FILEPATTERN:
checkArgument(
getStartOffset() == 0,
"FileBasedSource is based on a file pattern or a full single file "
+ "but the starting offset proposed %s is not zero",
getStartOffset());
checkArgument(
getEndOffset() == Long.MAX_VALUE,
"FileBasedSource is based on a file pattern or a full single file "
+ "but the ending offset proposed %s is not Long.MAX_VALUE",
getEndOffset());
break;
case SINGLE_FILE_OR_SUBRANGE:
// Nothing more to validate.
break;
default:
throw new IllegalStateException("Unknown mode: " + mode);
}
}
@Override
public final long getMaxEndOffset(PipelineOptions options) throws IOException {
checkArgument(
mode != Mode.FILEPATTERN, "Cannot determine the exact end offset of a file pattern");
Metadata metadata = getSingleFileMetadata();
return metadata.sizeBytes();
}
/**
* A {@link Source.Reader reader} that implements code common to readers of {@code
* FileBasedSource}s.
*
* <h2>Seekability</h2>
*
* <p>This reader uses a {@link ReadableByteChannel} created for the file represented by the
* corresponding source to efficiently move to the correct starting position defined in the
* source. Subclasses of this reader should implement {@link #startReading} to get access to this
* channel. If the source corresponding to the reader is for a subrange of a file the {@code
* ReadableByteChannel} provided is guaranteed to be an instance of the type {@link
* SeekableByteChannel}, which may be used by subclass to traverse back in the channel to
* determine the correct starting position.
*
* <h2>Reading Records</h2>
*
* <p>Sequential reading is implemented using {@link #readNextRecord}.
*
* <p>Then {@code FileBasedReader} implements "reading a range [A, B)" in the following way.
*
* <ol>
* <li>{@link #start} opens the file
* <li>{@link #start} seeks the {@code SeekableByteChannel} to A (reading offset ranges for
* non-seekable files is not supported) and calls {@code startReading()}
* <li>{@link #start} calls {@link #advance} once, which, via {@link #readNextRecord}, locates
* the first record which is at a split point AND its offset is at or after A. If this
* record is at or after B, {@link #advance} returns false and reading is finished.
* <li>if the previous advance call returned {@code true} sequential reading starts and {@code
* advance()} will be called repeatedly
* </ol>
*
* {@code advance()} calls {@code readNextRecord()} on the subclass, and stops (returns false) if
* the new record is at a split point AND the offset of the new record is at or after B.
*
* <h2>Thread Safety</h2>
*
* <p>Since this class implements {@link Source.Reader} it guarantees thread safety. Abstract
* methods defined here will not be accessed by more than one thread concurrently.
*/
public abstract static class FileBasedReader<T> extends OffsetBasedReader<T> {
// Initialized in startImpl
@Nullable private ReadableByteChannel channel = null;
/**
* Subclasses should not perform IO operations at the constructor. All IO operations should be
* delayed until the {@link #startReading} method is invoked.
*/
public FileBasedReader(FileBasedSource<T> source) {
super(source);
checkArgument(
source.getMode() != Mode.FILEPATTERN,
"FileBasedReader does not support reading file patterns");
}
@Override
public synchronized FileBasedSource<T> getCurrentSource() {
return (FileBasedSource<T>) super.getCurrentSource();
}
@Override
protected final boolean startImpl() throws IOException {
FileBasedSource<T> source = getCurrentSource();
this.channel = FileSystems.open(source.getSingleFileMetadata().resourceId());
if (channel instanceof SeekableByteChannel) {
SeekableByteChannel seekChannel = (SeekableByteChannel) channel;
seekChannel.position(source.getStartOffset());
} else {
// Channel is not seekable. Must not be a subrange.
checkArgument(
source.mode != Mode.SINGLE_FILE_OR_SUBRANGE,
"Subrange-based sources must only be defined for file types that support seekable "
+ " read channels");
checkArgument(
source.getStartOffset() == 0,
"Start offset %s is not zero but channel for reading the file is not seekable.",
source.getStartOffset());
}
startReading(channel);
// Advance once to load the first record.
return advanceImpl();
}
@Override
protected final boolean advanceImpl() throws IOException {
return readNextRecord();
}
/**
* Closes any {@link ReadableByteChannel} created for the current reader. This implementation is
* idempotent. Any {@code close()} method introduced by a subclass must be idempotent and must
* call the {@code close()} method in the {@code FileBasedReader}.
*/
@Override
public void close() throws IOException {
if (channel != null) {
channel.close();
}
}
@Override
public boolean allowsDynamicSplitting() {
try {
return getCurrentSource().isSplittable();
} catch (Exception e) {
throw new RuntimeException(
String.format("Error determining if %s allows dynamic splitting", this), e);
}
}
/**
* Performs any initialization of the subclass of {@code FileBasedReader} that involves IO
* operations. Will only be invoked once and before that invocation the base class will seek the
* channel to the source's starting offset.
*
* <p>Provided {@link ReadableByteChannel} is for the file represented by the source of this
* reader. Subclass may use the {@code channel} to build a higher level IO abstraction, e.g., a
* BufferedReader or an XML parser.
*
* <p>If the corresponding source is for a subrange of a file, {@code channel} is guaranteed to
* be an instance of the type {@link SeekableByteChannel}.
*
* <p>After this method is invoked the base class will not be reading data from the channel or
* adjusting the position of the channel. But the base class is responsible for properly closing
* the channel.
*
* @param channel a byte channel representing the file backing the reader.
*/
protected abstract void startReading(ReadableByteChannel channel) throws IOException;
/**
* Reads the next record from the channel provided by {@link #startReading}. Methods {@link
* #getCurrent}, {@link #getCurrentOffset}, and {@link #isAtSplitPoint()} should return the
* corresponding information about the record read by the last invocation of this method.
*
* <p>Note that this method will be called the same way for reading the first record in the
* source (file or offset range in the file) and for reading subsequent records. It is up to the
* subclass to do anything special for locating and reading the first record, if necessary.
*
* @return {@code true} if a record was successfully read, {@code false} if the end of the
* channel was reached before successfully reading a new record.
*/
protected abstract boolean readNextRecord() throws IOException;
}
// An internal Reader implementation that concatenates a sequence of FileBasedReaders.
private class FilePatternReader extends BoundedReader<T> {
private final FileBasedSource<T> source;
private final List<FileBasedReader<T>> fileReaders;
final ListIterator<FileBasedReader<T>> fileReadersIterator;
// Initialized in start
@Nullable FileBasedReader<T> currentReader = null;
public FilePatternReader(FileBasedSource<T> source, List<FileBasedReader<T>> fileReaders) {
this.source = source;
this.fileReaders = fileReaders;
this.fileReadersIterator = fileReaders.listIterator();
}
@Override
public boolean start() throws IOException {
return startNextNonemptyReader();
}
@Override
public boolean advance() throws IOException {
checkState(currentReader != null, "Call start() before advance()");
if (currentReader.advance()) {
return true;
}
return startNextNonemptyReader();
}
private boolean startNextNonemptyReader() throws IOException {
while (fileReadersIterator.hasNext()) {
currentReader = fileReadersIterator.next();
if (currentReader.start()) {
return true;
}
currentReader.close();
}
return false;
}
@Override
public T getCurrent() throws NoSuchElementException {
// A NoSuchElement will be thrown by the last FileBasedReader if getCurrent() is called after
// advance() returns false.
return currentReader.getCurrent();
}
@Override
public Instant getCurrentTimestamp() throws NoSuchElementException {
// A NoSuchElement will be thrown by the last FileBasedReader if getCurrentTimestamp()
// is called after advance() returns false.
return currentReader.getCurrentTimestamp();
}
@Override
public void close() throws IOException {
// Close all readers that may have not yet been closed.
// If this reader has not been started, currentReader is null.
if (currentReader != null) {
currentReader.close();
}
while (fileReadersIterator.hasNext()) {
fileReadersIterator.next().close();
}
}
@Override
public FileBasedSource<T> getCurrentSource() {
return source;
}
@Override
public FileBasedSource<T> splitAtFraction(double fraction) {
// Unsupported. TODO: implement.
LOG.debug("Dynamic splitting of FilePatternReader is unsupported.");
return null;
}
@Override
public Double getFractionConsumed() {
if (currentReader == null) {
return 0.0;
}
if (fileReaders.isEmpty()) {
return 1.0;
}
int index = fileReadersIterator.previousIndex();
int numReaders = fileReaders.size();
if (index == numReaders) {
return 1.0;
}
double before = 1.0 * index / numReaders;
double after = 1.0 * (index + 1) / numReaders;
Double fractionOfCurrentReader = currentReader.getFractionConsumed();
if (fractionOfCurrentReader == null) {
return before;
}
return before + fractionOfCurrentReader * (after - before);
}
}
}