blob: e4f3624e36558905b3ae392e4d57d2da0ef212b8 [file] [log] [blame]
/*
* Licensed to the Apache Software Foundation (ASF) under one
* or more contributor license agreements. See the NOTICE file
* distributed with this work for additional information
* regarding copyright ownership. The ASF licenses this file
* to you under the Apache License, Version 2.0 (the
* "License"); you may not use this file except in compliance
* with the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/
package org.apache.beam.sdk.io;
import java.io.IOException;
import java.io.InputStream;
import java.io.PushbackInputStream;
import java.nio.channels.Channels;
import java.nio.channels.ReadableByteChannel;
import java.nio.channels.WritableByteChannel;
import java.util.zip.GZIPInputStream;
import java.util.zip.GZIPOutputStream;
import java.util.zip.ZipEntry;
import java.util.zip.ZipInputStream;
import org.apache.beam.vendor.guava.v26_0_jre.com.google.common.collect.ImmutableList;
import org.apache.beam.vendor.guava.v26_0_jre.com.google.common.io.ByteStreams;
import org.apache.beam.vendor.guava.v26_0_jre.com.google.common.primitives.Ints;
import org.apache.commons.compress.compressors.bzip2.BZip2CompressorInputStream;
import org.apache.commons.compress.compressors.bzip2.BZip2CompressorOutputStream;
import org.apache.commons.compress.compressors.deflate.DeflateCompressorInputStream;
import org.apache.commons.compress.compressors.deflate.DeflateCompressorOutputStream;
import org.apache.commons.compress.compressors.gzip.GzipCompressorInputStream;
import org.apache.commons.compress.compressors.zstandard.ZstdCompressorInputStream;
import org.apache.commons.compress.compressors.zstandard.ZstdCompressorOutputStream;
/** Various compression types for reading/writing files. */
@SuppressWarnings("ImmutableEnumChecker")
public enum Compression {
/**
* When reading a file, automatically determine the compression type based on filename extension.
* Not applicable when writing files.
*/
AUTO("") {
@Override
public ReadableByteChannel readDecompressed(ReadableByteChannel channel) {
throw new UnsupportedOperationException(
"Must resolve compression into a concrete value before calling readDecompressed()");
}
@Override
public WritableByteChannel writeCompressed(WritableByteChannel channel) {
throw new UnsupportedOperationException("AUTO is applicable only to reading files");
}
},
/** No compression. */
UNCOMPRESSED("") {
@Override
public ReadableByteChannel readDecompressed(ReadableByteChannel channel) {
return channel;
}
@Override
public WritableByteChannel writeCompressed(WritableByteChannel channel) {
return channel;
}
},
/** GZip compression. */
GZIP(".gz", ".gz") {
@Override
public ReadableByteChannel readDecompressed(ReadableByteChannel channel) throws IOException {
// Determine if the input stream is gzipped. The input stream returned from the
// GCS connector may already be decompressed; GCS does this based on the
// content-encoding property.
PushbackInputStream stream = new PushbackInputStream(Channels.newInputStream(channel), 2);
byte[] headerBytes = new byte[2];
int bytesRead =
ByteStreams.read(
stream /* source */, headerBytes /* dest */, 0 /* offset */, 2 /* len */);
stream.unread(headerBytes, 0, bytesRead);
if (bytesRead >= 2) {
byte zero = 0x00;
int header = Ints.fromBytes(zero, zero, headerBytes[1], headerBytes[0]);
if (header == GZIPInputStream.GZIP_MAGIC) {
return Channels.newChannel(new GzipCompressorInputStream(stream, true));
}
}
return Channels.newChannel(stream);
}
@Override
public WritableByteChannel writeCompressed(WritableByteChannel channel) throws IOException {
return Channels.newChannel(new GZIPOutputStream(Channels.newOutputStream(channel), true));
}
},
/** BZip compression. */
BZIP2(".bz2", ".bz2") {
@Override
public ReadableByteChannel readDecompressed(ReadableByteChannel channel) throws IOException {
return Channels.newChannel(
new BZip2CompressorInputStream(Channels.newInputStream(channel), true));
}
@Override
public WritableByteChannel writeCompressed(WritableByteChannel channel) throws IOException {
return Channels.newChannel(
new BZip2CompressorOutputStream(Channels.newOutputStream(channel)));
}
},
/** Zip compression. */
ZIP(".zip", ".zip") {
@Override
public ReadableByteChannel readDecompressed(ReadableByteChannel channel) throws IOException {
FullZipInputStream zip = new FullZipInputStream(Channels.newInputStream(channel));
return Channels.newChannel(zip);
}
@Override
public WritableByteChannel writeCompressed(WritableByteChannel channel) throws IOException {
throw new UnsupportedOperationException("Writing ZIP files is currently unsupported");
}
},
/**
* ZStandard compression.
*
* <p>The {@code .zst} extension is specified in <a href=https://tools.ietf.org/html/rfc8478>RFC
* 8478</a>.
*
* <p>The Beam Java SDK does not pull in the Zstd library by default, so it is the user's
* responsibility to declare an explicit dependency on {@code zstd-jni}. Attempts to read or write
* .zst files without {@code zstd-jni} loaded will result in {@code NoClassDefFoundError} at
* runtime.
*/
ZSTD(".zst", ".zst", ".zstd") {
@Override
public ReadableByteChannel readDecompressed(ReadableByteChannel channel) throws IOException {
return Channels.newChannel(new ZstdCompressorInputStream(Channels.newInputStream(channel)));
}
@Override
public WritableByteChannel writeCompressed(WritableByteChannel channel) throws IOException {
return Channels.newChannel(new ZstdCompressorOutputStream(Channels.newOutputStream(channel)));
}
},
/** Deflate compression. */
DEFLATE(".deflate", ".deflate", ".zlib") {
@Override
public ReadableByteChannel readDecompressed(ReadableByteChannel channel) throws IOException {
return Channels.newChannel(
new DeflateCompressorInputStream(Channels.newInputStream(channel)));
}
@Override
public WritableByteChannel writeCompressed(WritableByteChannel channel) throws IOException {
return Channels.newChannel(
new DeflateCompressorOutputStream(Channels.newOutputStream(channel)));
}
};
private final String suggestedSuffix;
private final ImmutableList<String> detectedSuffixes;
Compression(String suggestedSuffix, String... detectedSuffixes) {
this.suggestedSuffix = suggestedSuffix;
this.detectedSuffixes = ImmutableList.copyOf(detectedSuffixes);
}
public String getSuggestedSuffix() {
return suggestedSuffix;
}
public boolean matches(String filename) {
for (String suffix : detectedSuffixes) {
if (filename.toLowerCase().endsWith(suffix)) {
return true;
}
}
return false;
}
public boolean isCompressed(String filename) {
Compression compression = this;
if (compression == AUTO) {
compression = detect(filename);
}
return compression != UNCOMPRESSED;
}
public static Compression detect(String filename) {
for (Compression value : values()) {
if (value.matches(filename)) {
return value;
}
}
return UNCOMPRESSED;
}
public abstract ReadableByteChannel readDecompressed(ReadableByteChannel channel)
throws IOException;
public abstract WritableByteChannel writeCompressed(WritableByteChannel channel)
throws IOException;
/** Concatenates all {@link ZipInputStream}s contained within the zip file. */
private static class FullZipInputStream extends InputStream {
private ZipInputStream zipInputStream;
private ZipEntry currentEntry;
public FullZipInputStream(InputStream is) throws IOException {
super();
zipInputStream = new ZipInputStream(is);
currentEntry = zipInputStream.getNextEntry();
}
@Override
public int read() throws IOException {
int result = zipInputStream.read();
while (result == -1) {
currentEntry = zipInputStream.getNextEntry();
if (currentEntry == null) {
return -1;
} else {
result = zipInputStream.read();
}
}
return result;
}
@Override
public int read(byte[] b, int off, int len) throws IOException {
int result = zipInputStream.read(b, off, len);
while (result == -1) {
currentEntry = zipInputStream.getNextEntry();
if (currentEntry == null) {
return -1;
} else {
result = zipInputStream.read(b, off, len);
}
}
return result;
}
}
}