blob: 7b10174a38f879b39a5e417f3deaeb568dd91866 [file] [log] [blame]
/*
* Licensed to the Apache Software Foundation (ASF) under one
* or more contributor license agreements. See the NOTICE file
* distributed with this work for additional information
* regarding copyright ownership. The ASF licenses this file
* to you under the Apache License, Version 2.0 (the
* "License"); you may not use this file except in compliance
* with the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/
package org.apache.beam.sdk.io;
import static org.apache.beam.vendor.guava.v26_0_jre.com.google.common.base.Preconditions.checkArgument;
import java.io.IOException;
import java.util.ArrayList;
import java.util.List;
import java.util.NoSuchElementException;
import org.apache.beam.sdk.io.range.OffsetRange;
import org.apache.beam.sdk.io.range.OffsetRangeTracker;
import org.apache.beam.sdk.io.range.RangeTracker;
import org.apache.beam.sdk.options.PipelineOptions;
import org.apache.beam.sdk.transforms.display.DisplayData;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
/**
* A {@link BoundedSource} that uses offsets to define starting and ending positions.
*
* <p>{@link OffsetBasedSource} is a common base class for all bounded sources where the input can
* be represented as a single range, and an input can be efficiently processed in parallel by
* splitting the range into a set of disjoint ranges whose union is the original range. This class
* should be used for sources that can be cheaply read starting at any given offset. {@link
* OffsetBasedSource} stores the range and implements splitting into bundles.
*
* <p>Extend {@link OffsetBasedSource} to implement your own offset-based custom source. {@link
* FileBasedSource}, which is a subclass of this, adds additional functionality useful for custom
* sources that are based on files. If possible implementors should start from {@link
* FileBasedSource} instead of {@link OffsetBasedSource}.
*
* <p>Consult {@link RangeTracker} for important semantics common to all sources defined by a range
* of positions of a certain type, including the semantics of split points ({@link
* OffsetBasedReader#isAtSplitPoint}).
*
* @param <T> Type of records represented by the source.
* @see BoundedSource
* @see FileBasedSource
* @see RangeTracker
*/
public abstract class OffsetBasedSource<T> extends BoundedSource<T> {
private final long startOffset;
private final long endOffset;
private final long minBundleSize;
/**
* @param startOffset starting offset (inclusive) of the source. Must be non-negative.
* @param endOffset ending offset (exclusive) of the source. Use {@link Long#MAX_VALUE} to
* indicate that the entire source after {@code startOffset} should be read. Must be {@code >
* startOffset}.
* @param minBundleSize minimum bundle size in offset units that should be used when splitting the
* source into sub-sources. This value may not be respected if the total range of the source
* is smaller than the specified {@code minBundleSize}. Must be non-negative.
*/
public OffsetBasedSource(long startOffset, long endOffset, long minBundleSize) {
this.startOffset = startOffset;
this.endOffset = endOffset;
this.minBundleSize = minBundleSize;
}
/** Returns the starting offset of the source. */
public long getStartOffset() {
return startOffset;
}
/**
* Returns the specified ending offset of the source. Any returned value greater than or equal to
* {@link #getMaxEndOffset(PipelineOptions)} should be treated as {@link
* #getMaxEndOffset(PipelineOptions)}.
*/
public long getEndOffset() {
return endOffset;
}
/**
* Returns the minimum bundle size that should be used when splitting the source into sub-sources.
* This value may not be respected if the total range of the source is smaller than the specified
* {@code minBundleSize}.
*/
public long getMinBundleSize() {
return minBundleSize;
}
@Override
public long getEstimatedSizeBytes(PipelineOptions options) throws Exception {
long trueEndOffset = (endOffset == Long.MAX_VALUE) ? getMaxEndOffset(options) : endOffset;
return getBytesPerOffset() * (trueEndOffset - getStartOffset());
}
@Override
public List<? extends OffsetBasedSource<T>> split(
long desiredBundleSizeBytes, PipelineOptions options) throws Exception {
// Split the range into bundles based on the desiredBundleSizeBytes. If the desired bundle
// size is smaller than the minBundleSize of the source then minBundleSize will be used instead.
long desiredBundleSizeOffsetUnits =
Math.max(Math.max(1, desiredBundleSizeBytes / getBytesPerOffset()), minBundleSize);
List<OffsetBasedSource<T>> subSources = new ArrayList<>();
for (OffsetRange range :
new OffsetRange(startOffset, Math.min(endOffset, getMaxEndOffset(options)))
.split(desiredBundleSizeOffsetUnits, minBundleSize)) {
subSources.add(createSourceForSubrange(range.getFrom(), range.getTo()));
}
return subSources;
}
@Override
public void validate() {
checkArgument(
this.startOffset >= 0, "Start offset has value %s, must be non-negative", this.startOffset);
checkArgument(
this.endOffset >= 0, "End offset has value %s, must be non-negative", this.endOffset);
checkArgument(
this.startOffset <= this.endOffset,
"Start offset %s may not be larger than end offset %s",
this.startOffset,
this.endOffset);
checkArgument(
this.minBundleSize >= 0,
"minBundleSize has value %s, must be non-negative",
this.minBundleSize);
}
@Override
public String toString() {
return "[" + startOffset + ", " + endOffset + ")";
}
/**
* Returns approximately how many bytes of data correspond to a single offset in this source. Used
* for translation between this source's range and methods defined in terms of bytes, such as
* {@link #getEstimatedSizeBytes} and {@link #split}.
*
* <p>Defaults to {@code 1} byte, which is the common case for, e.g., file sources.
*/
public long getBytesPerOffset() {
return 1L;
}
/**
* Returns the actual ending offset of the current source. The value returned by this function
* will be used to clip the end of the range {@code [startOffset, endOffset)} such that the range
* used is {@code [startOffset, min(endOffset, maxEndOffset))}.
*
* <p>As an example in which {@link OffsetBasedSource} is used to implement a file source, suppose
* that this source was constructed with an {@code endOffset} of {@link Long#MAX_VALUE} to
* indicate that a file should be read to the end. Then this function should determine the actual,
* exact size of the file in bytes and return it.
*/
public abstract long getMaxEndOffset(PipelineOptions options) throws Exception;
/**
* Returns an {@link OffsetBasedSource} for a subrange of the current source. The subrange {@code
* [start, end)} must be within the range {@code [startOffset, endOffset)} of the current source,
* i.e. {@code startOffset <= start < end <= endOffset}.
*/
public abstract OffsetBasedSource<T> createSourceForSubrange(long start, long end);
@Override
public void populateDisplayData(DisplayData.Builder builder) {
super.populateDisplayData(builder);
builder
.addIfNotDefault(
DisplayData.item("minBundleSize", minBundleSize).withLabel("Minimum Bundle Size"), 1L)
.addIfNotDefault(
DisplayData.item("startOffset", startOffset).withLabel("Start Read Offset"), 0L)
.addIfNotDefault(
DisplayData.item("endOffset", endOffset).withLabel("End Read Offset"), Long.MAX_VALUE);
}
/**
* A {@link Source.Reader} that implements code common to readers of all {@link
* OffsetBasedSource}s.
*
* <p>Subclasses have to implement:
*
* <ul>
* <li>The methods {@link #startImpl} and {@link #advanceImpl} for reading the first or
* subsequent records.
* <li>The methods {@link #getCurrent}, {@link #getCurrentOffset}, and optionally {@link
* #isAtSplitPoint} and {@link #getCurrentTimestamp} to access properties of the last record
* successfully read by {@link #startImpl} or {@link #advanceImpl}.
* </ul>
*/
public abstract static class OffsetBasedReader<T> extends BoundedReader<T> {
private static final Logger LOG = LoggerFactory.getLogger(OffsetBasedReader.class);
private OffsetBasedSource<T> source;
/** Returns true if the last call to {@link #start} or {@link #advance} returned false. */
public final boolean isDone() {
return rangeTracker.isDone();
}
/** Returns true if there has been a call to {@link #start}. */
public final boolean isStarted() {
return rangeTracker.isStarted();
}
/** The {@link OffsetRangeTracker} managing the range and current position of the source. */
private final OffsetRangeTracker rangeTracker;
/** @param source the {@link OffsetBasedSource} to be read by the current reader. */
public OffsetBasedReader(OffsetBasedSource<T> source) {
this.source = source;
this.rangeTracker = new OffsetRangeTracker(source.getStartOffset(), source.getEndOffset());
}
/**
* Returns the <i>starting</i> offset of the {@link Source.Reader#getCurrent current record},
* which has been read by the last successful {@link Source.Reader#start} or {@link
* Source.Reader#advance} call.
*
* <p>If no such call has been made yet, the return value is unspecified.
*
* <p>See {@link RangeTracker} for description of offset semantics.
*/
protected abstract long getCurrentOffset() throws NoSuchElementException;
/**
* Returns whether the current record is at a split point (i.e., whether the current record
* would be the first record to be read by a source with a specified start offset of {@link
* #getCurrentOffset}).
*
* <p>See detailed documentation about split points in {@link RangeTracker}.
*/
protected boolean isAtSplitPoint() throws NoSuchElementException {
return true;
}
@Override
public final boolean start() throws IOException {
return (startImpl() && rangeTracker.tryReturnRecordAt(isAtSplitPoint(), getCurrentOffset()))
|| rangeTracker.markDone();
}
@Override
public final boolean advance() throws IOException {
return (advanceImpl() && rangeTracker.tryReturnRecordAt(isAtSplitPoint(), getCurrentOffset()))
|| rangeTracker.markDone();
}
/**
* Initializes the {@link OffsetBasedSource.OffsetBasedReader} and advances to the first record,
* returning {@code true} if there is a record available to be read. This method will be invoked
* exactly once and may perform expensive setup operations that are needed to initialize the
* reader.
*
* <p>This function is the {@code OffsetBasedReader} implementation of {@link
* BoundedReader#start}. The key difference is that the implementor can ignore the possibility
* that it should no longer produce the first record, either because it has exceeded the
* original {@code endOffset} assigned to the reader, or because a concurrent call to {@link
* #splitAtFraction} has changed the source to shrink the offset range being read.
*
* @see BoundedReader#start
*/
protected abstract boolean startImpl() throws IOException;
/**
* Advances to the next record and returns {@code true}, or returns false if there is no next
* record.
*
* <p>This function is the {@code OffsetBasedReader} implementation of {@link
* BoundedReader#advance}. The key difference is that the implementor can ignore the possibility
* that it should no longer produce the next record, either because it has exceeded the original
* {@code endOffset} assigned to the reader, or because a concurrent call to {@link
* #splitAtFraction} has changed the source to shrink the offset range being read.
*
* @see BoundedReader#advance
*/
protected abstract boolean advanceImpl() throws IOException;
@Override
public synchronized OffsetBasedSource<T> getCurrentSource() {
return source;
}
@Override
public Double getFractionConsumed() {
return rangeTracker.getFractionConsumed();
}
@Override
public long getSplitPointsConsumed() {
return rangeTracker.getSplitPointsProcessed();
}
@Override
public long getSplitPointsRemaining() {
if (isDone()) {
return 0;
} else if (!isStarted()) {
// Note that even if the current source does not allow splitting, we don't know that
// it's non-empty so we return UNKNOWN instead of 1.
return BoundedReader.SPLIT_POINTS_UNKNOWN;
} else if (!allowsDynamicSplitting()) {
// Started (so non-empty) and unsplittable, so only the current task.
return 1;
} else if (getCurrentOffset() >= rangeTracker.getStopPosition() - 1) {
// If this is true, the next element is outside the range. Note that even getCurrentOffset()
// might be larger than the stop position when the current record is not a split point.
return 1;
} else {
// Use the default.
return super.getSplitPointsRemaining();
}
}
/**
* Whether this reader should allow dynamic splitting of the offset ranges.
*
* <p>True by default. Override this to return false if the reader cannot support dynamic
* splitting correctly. If this returns false, {@link OffsetBasedReader#splitAtFraction} will
* refuse all split requests.
*/
public boolean allowsDynamicSplitting() {
return true;
}
@Override
public final synchronized OffsetBasedSource<T> splitAtFraction(double fraction) {
if (!allowsDynamicSplitting()) {
return null;
}
if (rangeTracker.getStopPosition() == Long.MAX_VALUE) {
LOG.debug(
"Refusing to split unbounded OffsetBasedReader {} at fraction {}",
rangeTracker,
fraction);
return null;
}
long splitOffset = rangeTracker.getPositionForFractionConsumed(fraction);
LOG.debug(
"Proposing to split OffsetBasedReader {} at fraction {} (offset {})",
rangeTracker,
fraction,
splitOffset);
long start = source.getStartOffset();
long end = source.getEndOffset();
OffsetBasedSource<T> primary = source.createSourceForSubrange(start, splitOffset);
OffsetBasedSource<T> residual = source.createSourceForSubrange(splitOffset, end);
if (!rangeTracker.trySplitAtPosition(splitOffset)) {
return null;
}
this.source = primary;
return residual;
}
}
}