blob: f46f466057ffc02676b0d4b78ea84c3538e06161 [file] [log] [blame]
/*
* Licensed to the Apache Software Foundation (ASF) under one
* or more contributor license agreements. See the NOTICE file
* distributed with this work for additional information
* regarding copyright ownership. The ASF licenses this file
* to you under the Apache License, Version 2.0 (the
* "License"); you may not use this file except in compliance
* with the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/
package org.apache.beam.sdk.io;
import static org.apache.beam.vendor.guava.v20_0.com.google.common.base.Preconditions.checkArgument;
import static org.apache.beam.vendor.guava.v20_0.com.google.common.base.Preconditions.checkNotNull;
import java.io.IOException;
import java.io.Serializable;
import java.nio.ByteBuffer;
import java.nio.channels.ReadableByteChannel;
import java.util.NoSuchElementException;
import javax.annotation.Nullable;
import javax.annotation.concurrent.GuardedBy;
import org.apache.beam.sdk.annotations.Experimental;
import org.apache.beam.sdk.coders.Coder;
import org.apache.beam.sdk.io.fs.MatchResult.Metadata;
import org.apache.beam.sdk.options.PipelineOptions;
import org.apache.beam.sdk.transforms.display.DisplayData;
import org.joda.time.Instant;
/**
* A Source that reads from compressed files. A {@code CompressedSources} wraps a delegate {@link
* FileBasedSource} that is able to read the decompressed file format.
*
* <p>For example, use the following to read from a gzip-compressed file-based source:
*
* <pre>{@code
* FileBasedSource<T> mySource = ...;
* PCollection<T> collection = p.apply(Read.from(CompressedSource
* .from(mySource)
* .withCompression(Compression.GZIP)));
* }</pre>
*
* <p>Supported compression algorithms are {@link Compression#GZIP}, {@link Compression#BZIP2},
* {@link Compression#ZIP}, {@link Compression#ZSTD}, and {@link Compression#DEFLATE}. User-defined
* compression types are supported by implementing a {@link DecompressingChannelFactory}.
*
* <p>By default, the compression algorithm is selected from those supported in {@link Compression}
* based on the file name provided to the source, namely {@code ".bz2"} indicates {@link
* Compression#BZIP2}, {@code ".gz"} indicates {@link Compression#GZIP}, {@code ".zip"} indicates
* {@link Compression#ZIP}, {@code ".zst"} indicates {@link Compression#ZSTD}, and {@code
* ".deflate"} indicates {@link Compression#DEFLATE}. If the file name does not match any of the
* supported algorithms, it is assumed to be uncompressed data.
*
* @param <T> The type to read from the compressed file.
*/
@Experimental(Experimental.Kind.SOURCE_SINK)
public class CompressedSource<T> extends FileBasedSource<T> {
/**
* Factory interface for creating channels that decompress the content of an underlying channel.
*/
public interface DecompressingChannelFactory extends Serializable {
/** Given a channel, create a channel that decompresses the content read from the channel. */
ReadableByteChannel createDecompressingChannel(ReadableByteChannel channel) throws IOException;
}
/** @deprecated Use {@link Compression} instead */
@Deprecated
public enum CompressionMode implements DecompressingChannelFactory {
/** @see Compression#UNCOMPRESSED */
UNCOMPRESSED(Compression.UNCOMPRESSED),
/** @see Compression#AUTO */
AUTO(Compression.AUTO),
/** @see Compression#GZIP */
GZIP(Compression.GZIP),
/** @see Compression#BZIP2 */
BZIP2(Compression.BZIP2),
/** @see Compression#ZIP */
ZIP(Compression.ZIP),
/** @see Compression#ZSTD */
ZSTD(Compression.ZSTD),
/** @see Compression#DEFLATE */
DEFLATE(Compression.DEFLATE);
private final Compression canonical;
CompressionMode(Compression canonical) {
this.canonical = canonical;
}
/**
* Returns {@code true} if the given file name implies that the contents are compressed
* according to the compression embodied by this factory.
*/
public boolean matches(String fileName) {
return canonical.matches(fileName);
}
@Override
public ReadableByteChannel createDecompressingChannel(ReadableByteChannel channel)
throws IOException {
return canonical.readDecompressed(channel);
}
/** Returns whether the file's extension matches of one of the known compression formats. */
public static boolean isCompressed(String filename) {
return Compression.AUTO.isCompressed(filename);
}
static DecompressingChannelFactory fromCanonical(Compression compression) {
switch (compression) {
case AUTO:
return AUTO;
case UNCOMPRESSED:
return UNCOMPRESSED;
case GZIP:
return GZIP;
case BZIP2:
return BZIP2;
case ZIP:
return ZIP;
case ZSTD:
return ZSTD;
case DEFLATE:
return DEFLATE;
default:
throw new IllegalArgumentException("Unsupported compression type: " + compression);
}
}
}
private final FileBasedSource<T> sourceDelegate;
private final DecompressingChannelFactory channelFactory;
/**
* Creates a {@code CompressedSource} from an underlying {@code FileBasedSource}. The type of
* compression used will be based on the file name extension unless explicitly configured via
* {@link CompressedSource#withDecompression}.
*/
public static <T> CompressedSource<T> from(FileBasedSource<T> sourceDelegate) {
return new CompressedSource<>(sourceDelegate, CompressionMode.AUTO);
}
/**
* Return a {@code CompressedSource} that is like this one but will decompress its underlying file
* with the given {@link DecompressingChannelFactory}.
*/
public CompressedSource<T> withDecompression(DecompressingChannelFactory channelFactory) {
return new CompressedSource<>(this.sourceDelegate, channelFactory);
}
/** Like {@link #withDecompression} but takes a canonical {@link Compression}. */
public CompressedSource<T> withCompression(Compression compression) {
return withDecompression(CompressionMode.fromCanonical(compression));
}
/**
* Creates a {@code CompressedSource} from a delegate file based source and a decompressing
* channel factory.
*/
private CompressedSource(
FileBasedSource<T> sourceDelegate, DecompressingChannelFactory channelFactory) {
super(
sourceDelegate.getFileOrPatternSpecProvider(),
sourceDelegate.getEmptyMatchTreatment(),
Long.MAX_VALUE);
this.sourceDelegate = sourceDelegate;
this.channelFactory = channelFactory;
}
/**
* Creates a {@code CompressedSource} for an individual file. Used by {@link
* CompressedSource#createForSubrangeOfFile}.
*/
private CompressedSource(
FileBasedSource<T> sourceDelegate,
DecompressingChannelFactory channelFactory,
Metadata metadata,
long minBundleSize,
long startOffset,
long endOffset) {
super(metadata, minBundleSize, startOffset, endOffset);
this.sourceDelegate = sourceDelegate;
this.channelFactory = channelFactory;
boolean splittable;
try {
splittable = isSplittable();
} catch (Exception e) {
throw new RuntimeException("Failed to determine if the source is splittable", e);
}
checkArgument(
splittable || startOffset == 0,
"CompressedSources must start reading at offset 0. Requested offset: %s",
startOffset);
}
/**
* Validates that the delegate source is a valid source and that the channel factory is not null.
*/
@Override
public void validate() {
super.validate();
checkNotNull(sourceDelegate);
sourceDelegate.validate();
checkNotNull(channelFactory);
}
/**
* Creates a {@code CompressedSource} for a subrange of a file. Called by superclass to create a
* source for a single file.
*/
@Override
protected FileBasedSource<T> createForSubrangeOfFile(Metadata metadata, long start, long end) {
return new CompressedSource<>(
sourceDelegate.createForSubrangeOfFile(metadata, start, end),
channelFactory,
metadata,
sourceDelegate.getMinBundleSize(),
start,
end);
}
/**
* Determines whether a single file represented by this source is splittable. Returns true if we
* are using the default decompression factory and and it determines from the requested file name
* that the file is not compressed.
*/
@Override
protected final boolean isSplittable() {
try {
if (!sourceDelegate.isSplittable()) {
return false;
}
} catch (Exception e) {
throw new RuntimeException(e);
}
if (channelFactory == CompressionMode.UNCOMPRESSED) {
return true;
}
if (channelFactory == CompressionMode.AUTO) {
return !Compression.AUTO.isCompressed(getFileOrPatternSpec());
}
return false;
}
/**
* Creates a {@code FileBasedReader} to read a single file.
*
* <p>Uses the delegate source to create a single file reader for the delegate source. Utilizes
* the default decompression channel factory to not wrap the source reader if the file name does
* not represent a compressed file allowing for splitting of the source.
*/
@Override
protected final FileBasedReader<T> createSingleFileReader(PipelineOptions options) {
if (isSplittable()) {
return sourceDelegate.createSingleFileReader(options);
}
return new CompressedReader<>(this, sourceDelegate.createSingleFileReader(options));
}
@Override
public void populateDisplayData(DisplayData.Builder builder) {
// We explicitly do not register base-class data, instead we use the delegate inner source.
builder
.include("source", sourceDelegate)
.add(DisplayData.item("source", sourceDelegate.getClass()).withLabel("Read Source"));
if (channelFactory instanceof Enum) {
// GZIP, BZIP, ZIP and DEFLATE are implemented as enums; Enum classes are anonymous, so use
// the .name() value instead
builder.add(
DisplayData.item("compressionMode", ((Enum) channelFactory).name())
.withLabel("Compression Mode"));
} else {
builder.add(
DisplayData.item("compressionMode", channelFactory.getClass())
.withLabel("Compression Mode"));
}
}
/** Returns the delegate source's output coder. */
@Override
public final Coder<T> getOutputCoder() {
return sourceDelegate.getOutputCoder();
}
public final DecompressingChannelFactory getChannelFactory() {
return channelFactory;
}
/**
* Reader for a {@link CompressedSource}. Decompresses its input and uses a delegate reader to
* read elements from the decompressed input.
*
* @param <T> The type of records read from the source.
*/
public static class CompressedReader<T> extends FileBasedReader<T> {
private final FileBasedReader<T> readerDelegate;
private final Object progressLock = new Object();
@GuardedBy("progressLock")
private long numRecordsRead;
@Nullable // Initialized in startReading
@GuardedBy("progressLock")
private CountingChannel channel;
private DecompressingChannelFactory channelFactory;
/** Create a {@code CompressedReader} from a {@code CompressedSource} and delegate reader. */
public CompressedReader(CompressedSource<T> source, FileBasedReader<T> readerDelegate) {
super(source);
this.channelFactory = source.getChannelFactory();
this.readerDelegate = readerDelegate;
}
/** Gets the current record from the delegate reader. */
@Override
public T getCurrent() throws NoSuchElementException {
return readerDelegate.getCurrent();
}
@Override
public boolean allowsDynamicSplitting() {
return false;
}
@Override
public final long getSplitPointsConsumed() {
synchronized (progressLock) {
return (isDone() && numRecordsRead > 0) ? 1 : 0;
}
}
@Override
public final long getSplitPointsRemaining() {
return isDone() ? 0 : 1;
}
/** Returns true only for the first record; compressed sources cannot be split. */
@Override
protected final boolean isAtSplitPoint() {
// We have to return true for the first record, but not for the state before reading it,
// and not for the state after reading any other record. Hence == rather than >= or <=.
// This is required because FileBasedReader is intended for readers that can read a range
// of offsets in a file and where the range can be split in parts. CompressedReader,
// however, is a degenerate case because it cannot be split, but it has to satisfy the
// semantics of offsets and split points anyway.
synchronized (progressLock) {
return numRecordsRead == 1;
}
}
private static class CountingChannel implements ReadableByteChannel {
long count;
private final ReadableByteChannel inner;
public CountingChannel(ReadableByteChannel inner, long count) {
this.inner = inner;
this.count = count;
}
public long getCount() {
return count;
}
@Override
public int read(ByteBuffer dst) throws IOException {
int bytes = inner.read(dst);
if (bytes > 0) {
// Avoid the -1 from EOF.
count += bytes;
}
return bytes;
}
@Override
public boolean isOpen() {
return inner.isOpen();
}
@Override
public void close() throws IOException {
inner.close();
}
}
/**
* Creates a decompressing channel from the input channel and passes it to its delegate reader's
* {@link FileBasedReader#startReading(ReadableByteChannel)}.
*/
@Override
protected final void startReading(ReadableByteChannel channel) throws IOException {
synchronized (progressLock) {
this.channel = new CountingChannel(channel, getCurrentSource().getStartOffset());
channel = this.channel;
}
if (channelFactory == CompressionMode.AUTO) {
readerDelegate.startReading(
Compression.detect(getCurrentSource().getFileOrPatternSpec())
.readDecompressed(channel));
} else {
readerDelegate.startReading(channelFactory.createDecompressingChannel(channel));
}
}
/** Reads the next record via the delegate reader. */
@Override
protected final boolean readNextRecord() throws IOException {
if (!readerDelegate.readNextRecord()) {
return false;
}
synchronized (progressLock) {
++numRecordsRead;
}
return true;
}
// Unsplittable: returns the offset in the input stream that has been read by the input.
// these positions are likely to be coarse-grained (in the event of buffering) and
// over-estimates (because they reflect the number of bytes read to produce an element, not its
// start) but both of these provide better data than e.g., reporting the start of the file.
@Override
protected final long getCurrentOffset() throws NoSuchElementException {
synchronized (progressLock) {
if (numRecordsRead <= 1) {
// Since the first record is at a split point, it should start at the beginning of the
// file. This avoids the bad case where the decompressor read the entire file, which
// would cause the file to be treated as empty when returning channel.getCount() as it
// is outside the valid range.
return 0;
}
return channel.getCount();
}
}
@Override
public Instant getCurrentTimestamp() throws NoSuchElementException {
return readerDelegate.getCurrentTimestamp();
}
}
}