blob: f6fb657fd65409c33f8732cb7d32631b75bf1acb [file] [log] [blame]
* Licensed to the Apache Software Foundation (ASF) under one
* or more contributor license agreements. See the NOTICE file
* distributed with this work for additional information
* regarding copyright ownership. The ASF licenses this file
* to you under the Apache License, Version 2.0 (the
* "License"); you may not use this file except in compliance
* with the License. You may obtain a copy of the License at
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* See the License for the specific language governing permissions and
* limitations under the License.
import static;
import static;
import java.nio.ByteBuffer;
import java.nio.channels.ReadableByteChannel;
import java.util.NoSuchElementException;
import javax.annotation.concurrent.GuardedBy;
import org.apache.beam.sdk.annotations.Experimental;
import org.apache.beam.sdk.annotations.Experimental.Kind;
import org.apache.beam.sdk.coders.Coder;
import org.apache.beam.sdk.options.PipelineOptions;
import org.apache.beam.sdk.transforms.display.DisplayData;
import org.checkerframework.checker.nullness.qual.Nullable;
import org.joda.time.Instant;
* A Source that reads from compressed files. A {@code CompressedSources} wraps a delegate {@link
* FileBasedSource} that is able to read the decompressed file format.
* <p>For example, use the following to read from a gzip-compressed file-based source:
* <pre>{@code
* FileBasedSource<T> mySource = ...;
* PCollection<T> collection = p.apply(Read.from(CompressedSource
* .from(mySource)
* .withCompression(Compression.GZIP)));
* }</pre>
* <p>Supported compression algorithms are {@link Compression#GZIP}, {@link Compression#BZIP2},
* {@link Compression#ZIP}, {@link Compression#ZSTD}, {@link Compression#LZO}, {@link
* Compression#LZOP}, {@link Compression#SNAPPY}, and {@link Compression#DEFLATE}. User-defined
* compression types are supported by implementing a {@link DecompressingChannelFactory}.
* <p>By default, the compression algorithm is selected from those supported in {@link Compression}
* based on the file name provided to the source, namely {@code ".bz2"} indicates {@link
* Compression#BZIP2}, {@code ".gz"} indicates {@link Compression#GZIP}, {@code ".zip"} indicates
* {@link Compression#ZIP}, {@code ".zst"} indicates {@link Compression#ZSTD}, {@code
* ".lzo_deflate"} indicates {@link Compression#LZO}, {@code ".lzo"} indicates {@link
* Compression#LZOP}, {@code ".snappy"} indicted {@link Compression#SNAPPY}, and {@code ".deflate"}
* indicates {@link Compression#DEFLATE}. If the file name does not match any of the supported
* algorithms, it is assumed to be uncompressed data.
* @param <T> The type to read from the compressed file.
"nullness" // TODO(
public class CompressedSource<T> extends FileBasedSource<T> {
* Factory interface for creating channels that decompress the content of an underlying channel.
public interface DecompressingChannelFactory extends Serializable {
/** Given a channel, create a channel that decompresses the content read from the channel. */
ReadableByteChannel createDecompressingChannel(ReadableByteChannel channel) throws IOException;
/** @deprecated Use {@link Compression} instead */
public enum CompressionMode implements DecompressingChannelFactory {
/** @see Compression#UNCOMPRESSED */
/** @see Compression#AUTO */
/** @see Compression#GZIP */
/** @see Compression#BZIP2 */
/** @see Compression#ZIP */
/** @see Compression#ZSTD */
/** @see Compression#LZO */
/** @see Compression#LZOP */
/** @see Compression#DEFLATE */
/** @see Compression#SNAPPY */
private final Compression canonical;
CompressionMode(Compression canonical) {
this.canonical = canonical;
* Returns {@code true} if the given file name implies that the contents are compressed
* according to the compression embodied by this factory.
public boolean matches(String fileName) {
return canonical.matches(fileName);
public ReadableByteChannel createDecompressingChannel(ReadableByteChannel channel)
throws IOException {
return canonical.readDecompressed(channel);
/** Returns whether the file's extension matches of one of the known compression formats. */
public static boolean isCompressed(String filename) {
return Compression.AUTO.isCompressed(filename);
static DecompressingChannelFactory fromCanonical(Compression compression) {
switch (compression) {
case AUTO:
return AUTO;
case GZIP:
return GZIP;
case BZIP2:
return BZIP2;
case ZIP:
return ZIP;
case ZSTD:
return ZSTD;
case LZO:
return LZO;
case LZOP:
return LZOP;
return DEFLATE;
case SNAPPY:
return SNAPPY;
throw new IllegalArgumentException("Unsupported compression type: " + compression);
private final FileBasedSource<T> sourceDelegate;
private final DecompressingChannelFactory channelFactory;
* Creates a {@code CompressedSource} from an underlying {@code FileBasedSource}. The type of
* compression used will be based on the file name extension unless explicitly configured via
* {@link CompressedSource#withDecompression}.
public static <T> CompressedSource<T> from(FileBasedSource<T> sourceDelegate) {
return new CompressedSource<>(sourceDelegate, CompressionMode.AUTO);
* Return a {@code CompressedSource} that is like this one but will decompress its underlying file
* with the given {@link DecompressingChannelFactory}.
public CompressedSource<T> withDecompression(DecompressingChannelFactory channelFactory) {
return new CompressedSource<>(this.sourceDelegate, channelFactory);
/** Like {@link #withDecompression} but takes a canonical {@link Compression}. */
public CompressedSource<T> withCompression(Compression compression) {
return withDecompression(CompressionMode.fromCanonical(compression));
* Creates a {@code CompressedSource} from a delegate file based source and a decompressing
* channel factory.
private CompressedSource(
FileBasedSource<T> sourceDelegate, DecompressingChannelFactory channelFactory) {
this.sourceDelegate = sourceDelegate;
this.channelFactory = channelFactory;
* Creates a {@code CompressedSource} for an individual file. Used by {@link
* CompressedSource#createForSubrangeOfFile}.
private CompressedSource(
FileBasedSource<T> sourceDelegate,
DecompressingChannelFactory channelFactory,
Metadata metadata,
long minBundleSize,
long startOffset,
long endOffset) {
super(metadata, minBundleSize, startOffset, endOffset);
this.sourceDelegate = sourceDelegate;
this.channelFactory = channelFactory;
boolean splittable;
try {
splittable = isSplittable();
} catch (Exception e) {
throw new RuntimeException("Failed to determine if the source is splittable", e);
splittable || startOffset == 0,
"CompressedSources must start reading at offset 0. Requested offset: %s",
* Validates that the delegate source is a valid source and that the channel factory is not null.
public void validate() {
* Creates a {@code CompressedSource} for a subrange of a file. Called by superclass to create a
* source for a single file.
protected FileBasedSource<T> createForSubrangeOfFile(Metadata metadata, long start, long end) {
return new CompressedSource<>(
sourceDelegate.createForSubrangeOfFile(metadata, start, end),
* Determines whether a single file represented by this source is splittable. Returns true if we
* are using the default decompression factory and and it determines from the requested file name
* that the file is not compressed.
protected final boolean isSplittable() {
try {
if (!sourceDelegate.isSplittable()) {
return false;
} catch (Exception e) {
throw new RuntimeException(e);
if (channelFactory == CompressionMode.UNCOMPRESSED) {
return true;
if (channelFactory == CompressionMode.AUTO) {
return !Compression.AUTO.isCompressed(getFileOrPatternSpec());
return false;
* Creates a {@code FileBasedReader} to read a single file.
* <p>Uses the delegate source to create a single file reader for the delegate source. Utilizes
* the default decompression channel factory to not wrap the source reader if the file name does
* not represent a compressed file allowing for splitting of the source.
protected final FileBasedReader<T> createSingleFileReader(PipelineOptions options) {
if (isSplittable()) {
return sourceDelegate.createSingleFileReader(options);
return new CompressedReader<>(this, sourceDelegate.createSingleFileReader(options));
public void populateDisplayData(DisplayData.Builder builder) {
// We explicitly do not register base-class data, instead we use the delegate inner source.
.include("source", sourceDelegate)
.add(DisplayData.item("source", sourceDelegate.getClass()).withLabel("Read Source"));
if (channelFactory instanceof Enum) {
// GZIP, BZIP, ZIP and DEFLATE are implemented as enums; Enum classes are anonymous, so use
// the .name() value instead
DisplayData.item("compressionMode", ((Enum) channelFactory).name())
.withLabel("Compression Mode"));
} else {
DisplayData.item("compressionMode", channelFactory.getClass())
.withLabel("Compression Mode"));
/** Returns the delegate source's output coder. */
public final Coder<T> getOutputCoder() {
return sourceDelegate.getOutputCoder();
public final DecompressingChannelFactory getChannelFactory() {
return channelFactory;
* Reader for a {@link CompressedSource}. Decompresses its input and uses a delegate reader to
* read elements from the decompressed input.
* @param <T> The type of records read from the source.
public static class CompressedReader<T> extends FileBasedReader<T> {
private final FileBasedReader<T> readerDelegate;
private final Object progressLock = new Object();
private long numRecordsRead;
// Initialized in startReading
private @Nullable CountingChannel channel;
private DecompressingChannelFactory channelFactory;
/** Create a {@code CompressedReader} from a {@code CompressedSource} and delegate reader. */
public CompressedReader(CompressedSource<T> source, FileBasedReader<T> readerDelegate) {
this.channelFactory = source.getChannelFactory();
this.readerDelegate = readerDelegate;
/** Gets the current record from the delegate reader. */
public T getCurrent() throws NoSuchElementException {
return readerDelegate.getCurrent();
public boolean allowsDynamicSplitting() {
return false;
public final long getSplitPointsConsumed() {
synchronized (progressLock) {
return (isDone() && numRecordsRead > 0) ? 1 : 0;
public final long getSplitPointsRemaining() {
return isDone() ? 0 : 1;
/** Returns true only for the first record; compressed sources cannot be split. */
protected final boolean isAtSplitPoint() {
// We have to return true for the first record, but not for the state before reading it,
// and not for the state after reading any other record. Hence == rather than >= or <=.
// This is required because FileBasedReader is intended for readers that can read a range
// of offsets in a file and where the range can be split in parts. CompressedReader,
// however, is a degenerate case because it cannot be split, but it has to satisfy the
// semantics of offsets and split points anyway.
synchronized (progressLock) {
return numRecordsRead == 1;
private static class CountingChannel implements ReadableByteChannel {
long count;
private final ReadableByteChannel inner;
public CountingChannel(ReadableByteChannel inner, long count) {
this.inner = inner;
this.count = count;
public long getCount() {
return count;
public int read(ByteBuffer dst) throws IOException {
int bytes =;
if (bytes > 0) {
// Avoid the -1 from EOF.
count += bytes;
return bytes;
public boolean isOpen() {
return inner.isOpen();
public void close() throws IOException {
* Creates a decompressing channel from the input channel and passes it to its delegate reader's
* {@link FileBasedReader#startReading(ReadableByteChannel)}.
protected final void startReading(ReadableByteChannel channel) throws IOException {
synchronized (progressLock) { = new CountingChannel(channel, getCurrentSource().getStartOffset());
channel =;
if (channelFactory == CompressionMode.AUTO) {
} else {
/** Reads the next record via the delegate reader. */
protected final boolean readNextRecord() throws IOException {
if (!readerDelegate.readNextRecord()) {
return false;
synchronized (progressLock) {
return true;
// Unsplittable: returns the offset in the input stream that has been read by the input.
// these positions are likely to be coarse-grained (in the event of buffering) and
// over-estimates (because they reflect the number of bytes read to produce an element, not its
// start) but both of these provide better data than e.g., reporting the start of the file.
protected final long getCurrentOffset() throws NoSuchElementException {
synchronized (progressLock) {
if (numRecordsRead <= 1) {
// Since the first record is at a split point, it should start at the beginning of the
// file. This avoids the bad case where the decompressor read the entire file, which
// would cause the file to be treated as empty when returning channel.getCount() as it
// is outside the valid range.
return 0;
return channel.getCount();
public Instant getCurrentTimestamp() throws NoSuchElementException {
return readerDelegate.getCurrentTimestamp();