blob: 932ec57a1542eeedd7c12af6eaa757bf6ac7cf9d [file] [log] [blame]
/*
* Licensed to the Apache Software Foundation (ASF) under one
* or more contributor license agreements. See the NOTICE file
* distributed with this work for additional information
* regarding copyright ownership. The ASF licenses this file
* to you under the Apache License, Version 2.0 (the
* "License"); you may not use this file except in compliance
* with the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing,
* software distributed under the License is distributed on an
* "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
* KIND, either express or implied. See the License for the
* specific language governing permissions and limitations
* under the License.
*/
package org.apache.druid.utils;
import com.google.common.base.Predicate;
import com.google.common.base.Strings;
import com.google.common.base.Throwables;
import com.google.common.io.ByteSink;
import com.google.common.io.ByteSource;
import com.google.common.io.ByteStreams;
import com.google.common.io.Files;
import org.apache.commons.compress.compressors.bzip2.BZip2CompressorInputStream;
import org.apache.commons.compress.compressors.snappy.FramedSnappyCompressorInputStream;
import org.apache.commons.compress.compressors.xz.XZCompressorInputStream;
import org.apache.commons.compress.compressors.zstandard.ZstdCompressorInputStream;
import org.apache.druid.guice.annotations.PublicApi;
import org.apache.druid.java.util.common.FileUtils;
import org.apache.druid.java.util.common.IAE;
import org.apache.druid.java.util.common.IOE;
import org.apache.druid.java.util.common.ISE;
import org.apache.druid.java.util.common.RetryUtils;
import org.apache.druid.java.util.common.StreamUtils;
import org.apache.druid.java.util.common.io.NativeIO;
import org.apache.druid.java.util.common.logger.Logger;
import java.io.BufferedInputStream;
import java.io.ByteArrayInputStream;
import java.io.File;
import java.io.FilterInputStream;
import java.io.IOException;
import java.io.InputStream;
import java.io.OutputStream;
import java.nio.channels.Channels;
import java.nio.channels.FileChannel;
import java.nio.charset.StandardCharsets;
import java.nio.file.StandardOpenOption;
import java.util.Arrays;
import java.util.Enumeration;
import java.util.stream.Collectors;
import java.util.zip.GZIPInputStream;
import java.util.zip.GZIPOutputStream;
import java.util.zip.ZipEntry;
import java.util.zip.ZipFile;
import java.util.zip.ZipInputStream;
import java.util.zip.ZipOutputStream;
@PublicApi
public class CompressionUtils
{
private static final Logger log = new Logger(CompressionUtils.class);
private static final int DEFAULT_RETRY_COUNT = 3;
private static final String BZ2_SUFFIX = ".bz2";
private static final String GZ_SUFFIX = ".gz";
private static final String XZ_SUFFIX = ".xz";
private static final String ZIP_SUFFIX = ".zip";
private static final String SNAPPY_SUFFIX = ".sz";
private static final String ZSTD_SUFFIX = ".zst";
/**
* Zip the contents of directory into the file indicated by outputZipFile. Sub directories are skipped
*
* @param directory The directory whose contents should be added to the zip in the output stream.
* @param outputZipFile The output file to write the zipped data to
* @param fsync True if the output file should be fsynced to disk
*
* @return The number of bytes (uncompressed) read from the input directory.
*
* @throws IOException
*/
public static long zip(File directory, File outputZipFile, boolean fsync) throws IOException
{
if (!isZip(outputZipFile.getName())) {
log.warn("No .zip suffix[%s], putting files from [%s] into it anyway.", outputZipFile, directory);
}
if (fsync) {
return FileUtils.writeAtomically(outputZipFile, out -> zip(directory, out));
} else {
try (
final FileChannel fileChannel = FileChannel.open(
outputZipFile.toPath(),
StandardOpenOption.WRITE,
StandardOpenOption.CREATE
);
final OutputStream out = Channels.newOutputStream(fileChannel)
) {
return zip(directory, out);
}
}
}
/**
* Zip the contents of directory into the file indicated by outputZipFile. Sub directories are skipped
*
* @param directory The directory whose contents should be added to the zip in the output stream.
* @param outputZipFile The output file to write the zipped data to
*
* @return The number of bytes (uncompressed) read from the input directory.
*
* @throws IOException
*/
public static long zip(File directory, File outputZipFile) throws IOException
{
return zip(directory, outputZipFile, false);
}
/**
* Zips the contents of the input directory to the output stream. Sub directories are skipped
*
* @param directory The directory whose contents should be added to the zip in the output stream.
* @param out The output stream to write the zip data to. Caller is responsible for closing this stream.
*
* @return The number of bytes (uncompressed) read from the input directory.
*
* @throws IOException
*/
public static long zip(File directory, OutputStream out) throws IOException
{
if (!directory.isDirectory()) {
throw new IOE("directory[%s] is not a directory", directory);
}
final ZipOutputStream zipOut = new ZipOutputStream(out);
long totalSize = 0;
// Sort entries to make life easier when writing streaming-decompression unit tests.
for (File file : Arrays.stream(directory.listFiles()).sorted().collect(Collectors.toList())) {
log.debug("Adding file[%s] with size[%,d]. Total size so far[%,d]", file, file.length(), totalSize);
if (file.length() > Integer.MAX_VALUE) {
zipOut.finish();
throw new IOE("file[%s] too large [%,d]", file, file.length());
}
zipOut.putNextEntry(new ZipEntry(file.getName()));
totalSize += Files.asByteSource(file).copyTo(zipOut);
}
zipOut.closeEntry();
// Workaround for http://hg.openjdk.java.net/jdk8/jdk8/jdk/rev/759aa847dcaf
zipOut.flush();
zipOut.finish();
return totalSize;
}
/**
* Unzip the byteSource to the output directory. If cacheLocally is true, the byteSource is cached to local disk before unzipping.
* This may cause more predictable behavior than trying to unzip a large file directly off a network stream, for example.
* * @param byteSource The ByteSource which supplies the zip data
*
* @param byteSource The ByteSource which supplies the zip data
* @param outDir The output directory to put the contents of the zip
* @param shouldRetry A predicate expression to determine if a new InputStream should be acquired from ByteSource
* and the copy attempted again. If you want to retry on any exception, use
* {@link FileUtils#IS_EXCEPTION}.
* @param cacheLocally A boolean flag to indicate if the data should be cached locally
*
* @return A FileCopyResult containing the result of writing the zip entries to disk
*
* @throws IOException
*/
public static FileUtils.FileCopyResult unzip(
final ByteSource byteSource,
final File outDir,
final Predicate<Throwable> shouldRetry,
boolean cacheLocally
) throws IOException
{
if (!cacheLocally) {
try {
return RetryUtils.retry(
() -> unzip(byteSource.openStream(), outDir),
shouldRetry,
DEFAULT_RETRY_COUNT
);
}
catch (IOException e) {
throw e;
}
catch (Exception e) {
throw Throwables.propagate(e);
}
} else {
final File tmpFile = File.createTempFile("compressionUtilZipCache", ZIP_SUFFIX);
try {
FileUtils.retryCopy(
byteSource,
tmpFile,
shouldRetry,
DEFAULT_RETRY_COUNT
);
return unzip(tmpFile, outDir);
}
finally {
if (!tmpFile.delete()) {
log.warn("Could not delete zip cache at [%s]", tmpFile.toString());
}
}
}
}
/**
* Unzip the pulled file to an output directory. This is only expected to work on zips with lone files, and is not intended for zips with directory structures.
*
* @param pulledFile The file to unzip
* @param outDir The directory to store the contents of the file.
*
* @return a FileCopyResult of the files which were written to disk
*
* @throws IOException
*/
public static FileUtils.FileCopyResult unzip(final File pulledFile, final File outDir) throws IOException
{
if (!(outDir.exists() && outDir.isDirectory())) {
throw new ISE("outDir[%s] must exist and be a directory", outDir);
}
log.info("Unzipping file[%s] to [%s]", pulledFile, outDir);
final FileUtils.FileCopyResult result = new FileUtils.FileCopyResult();
try (final ZipFile zipFile = new ZipFile(pulledFile)) {
final Enumeration<? extends ZipEntry> enumeration = zipFile.entries();
while (enumeration.hasMoreElements()) {
final ZipEntry entry = enumeration.nextElement();
final File outFile = new File(outDir, entry.getName());
validateZipOutputFile(pulledFile.getCanonicalPath(), outFile, outDir);
result.addFiles(
FileUtils.retryCopy(
new ByteSource()
{
@Override
public InputStream openStream() throws IOException
{
return new BufferedInputStream(zipFile.getInputStream(entry));
}
},
outFile,
FileUtils.IS_EXCEPTION,
DEFAULT_RETRY_COUNT
).getFiles()
);
}
}
return result;
}
public static void validateZipOutputFile(
String sourceFilename,
final File outFile,
final File outDir
) throws IOException
{
// check for evil zip exploit that allows writing output to arbitrary directories
final File canonicalOutFile = outFile.getCanonicalFile();
final String canonicalOutDir = outDir.getCanonicalPath();
if (!canonicalOutFile.toPath().startsWith(canonicalOutDir)) {
throw new ISE(
"Unzipped output path[%s] of sourceFile[%s] does not start with outDir[%s].",
canonicalOutFile,
sourceFilename,
canonicalOutDir
);
}
}
/**
* Unzip from the input stream to the output directory, using the entry's file name as the file name in the output directory.
* The behavior of directories in the input stream's zip is undefined.
* If possible, it is recommended to use unzip(ByteStream, File) instead
*
* @param in The input stream of the zip data. This stream is closed
* @param outDir The directory to copy the unzipped data to
*
* @return The FileUtils.FileCopyResult containing information on all the files which were written
*
* @throws IOException
*/
public static FileUtils.FileCopyResult unzip(InputStream in, File outDir) throws IOException
{
try (final ZipInputStream zipIn = new ZipInputStream(in)) {
final FileUtils.FileCopyResult result = new FileUtils.FileCopyResult();
ZipEntry entry;
while ((entry = zipIn.getNextEntry()) != null) {
final File file = new File(outDir, entry.getName());
validateZipOutputFile("", file, outDir);
NativeIO.chunkedCopy(zipIn, file);
result.addFile(file);
zipIn.closeEntry();
}
// Skip the rest of the zip file to work around https://github.com/apache/druid/issues/6905
final byte[] buf = new byte[512];
while (in.read(buf) != -1) {
// Intentionally left empty.
}
return result;
}
}
/**
* gunzip the file to the output file.
*
* @param pulledFile The source of the gz data
* @param outFile A target file to put the contents
*
* @return The result of the file copy
*
* @throws IOException
*/
public static FileUtils.FileCopyResult gunzip(final File pulledFile, File outFile)
{
return gunzip(Files.asByteSource(pulledFile), outFile);
}
/**
* Unzips the input stream via a gzip filter. use gunzip(ByteSource, File, Predicate) if possible
*
* @param in The input stream to run through the gunzip filter. This stream is closed
* @param outFile The file to output to
*
* @throws IOException
*/
public static FileUtils.FileCopyResult gunzip(InputStream in, File outFile) throws IOException
{
try (GZIPInputStream gzipInputStream = gzipInputStream(in)) {
NativeIO.chunkedCopy(gzipInputStream, outFile);
return new FileUtils.FileCopyResult(outFile);
}
}
/**
* Fixes java bug 7036144 http://bugs.java.com/bugdatabase/view_bug.do?bug_id=7036144 which affects concatenated GZip
*
* @param in The raw input stream
*
* @return A GZIPInputStream that can handle concatenated gzip streams in the input
*
* @see #decompress(InputStream, String) which should be used instead for streams coming from files
*/
public static GZIPInputStream gzipInputStream(final InputStream in) throws IOException
{
return new GZIPInputStream(
new FilterInputStream(in)
{
@Override
public int available() throws IOException
{
final int otherAvailable = super.available();
// Hack. Docs say available() should return an estimate,
// so we estimate about 1KB to work around available == 0 bug in GZIPInputStream
return otherAvailable == 0 ? 1 << 10 : otherAvailable;
}
}
);
}
/**
* gunzip from the source stream to the destination stream.
*
* @param in The input stream which is to be decompressed. This stream is closed.
* @param out The output stream to write to. This stream is closed
*
* @return The number of bytes written to the output stream.
*
* @throws IOException
*/
public static long gunzip(InputStream in, OutputStream out) throws IOException
{
try (GZIPInputStream gzipInputStream = gzipInputStream(in)) {
final long result = ByteStreams.copy(gzipInputStream, out);
out.flush();
return result;
}
finally {
out.close();
}
}
/**
* A gunzip function to store locally
*
* @param in The factory to produce input streams
* @param outFile The file to store the result into
* @param shouldRetry A predicate to indicate if the Throwable is recoverable
*
* @return The count of bytes written to outFile
*/
public static FileUtils.FileCopyResult gunzip(
final ByteSource in,
final File outFile,
Predicate<Throwable> shouldRetry
)
{
return FileUtils.retryCopy(
new ByteSource()
{
@Override
public InputStream openStream() throws IOException
{
return gzipInputStream(in.openStream());
}
},
outFile,
shouldRetry,
DEFAULT_RETRY_COUNT
);
}
/**
* Gunzip from the input stream to the output file
*
* @param in The compressed input stream to read from
* @param outFile The file to write the uncompressed results to
*
* @return A FileCopyResult of the file written
*/
public static FileUtils.FileCopyResult gunzip(final ByteSource in, File outFile)
{
return gunzip(in, outFile, FileUtils.IS_EXCEPTION);
}
/**
* Copy inputStream to out while wrapping out in a GZIPOutputStream
* Closes both input and output
*
* @param inputStream The input stream to copy data from. This stream is closed
* @param out The output stream to wrap in a GZIPOutputStream before copying. This stream is closed
*
* @return The size of the data copied
*
* @throws IOException
*/
public static long gzip(InputStream inputStream, OutputStream out) throws IOException
{
try (GZIPOutputStream outputStream = new GZIPOutputStream(out)) {
final long result = ByteStreams.copy(inputStream, outputStream);
out.flush();
return result;
}
finally {
inputStream.close();
}
}
/**
* Gzips the input file to the output
*
* @param inFile The file to gzip
* @param outFile A target file to copy the uncompressed contents of inFile to
* @param shouldRetry Predicate on a potential throwable to determine if the copy should be attempted again.
*
* @return The result of the file copy
*
* @throws IOException
*/
public static FileUtils.FileCopyResult gzip(final File inFile, final File outFile, Predicate<Throwable> shouldRetry)
{
gzip(Files.asByteSource(inFile), Files.asByteSink(outFile), shouldRetry);
return new FileUtils.FileCopyResult(outFile);
}
public static long gzip(final ByteSource in, final ByteSink out, Predicate<Throwable> shouldRetry)
{
return StreamUtils.retryCopy(
in,
new ByteSink()
{
@Override
public OutputStream openStream() throws IOException
{
return new GZIPOutputStream(out.openStream());
}
},
shouldRetry,
DEFAULT_RETRY_COUNT
);
}
/**
* GZip compress the contents of inFile into outFile
*
* @param inFile The source of data
* @param outFile The destination for compressed data
*
* @return A FileCopyResult of the resulting file at outFile
*
* @throws IOException
*/
public static FileUtils.FileCopyResult gzip(final File inFile, final File outFile)
{
return gzip(inFile, outFile, FileUtils.IS_EXCEPTION);
}
/**
* Checks to see if fName is a valid name for a "*.zip" file
*
* @param fName The name of the file in question
*
* @return True if fName is properly named for a .zip file, false otherwise
*/
public static boolean isZip(String fName)
{
if (Strings.isNullOrEmpty(fName)) {
return false;
}
return fName.endsWith(ZIP_SUFFIX); // Technically a file named `.zip` would be fine
}
/**
* Checks to see if fName is a valid name for a "*.gz" file
*
* @param fName The name of the file in question
*
* @return True if fName is a properly named .gz file, false otherwise
*/
public static boolean isGz(String fName)
{
if (Strings.isNullOrEmpty(fName)) {
return false;
}
return fName.endsWith(GZ_SUFFIX) && fName.length() > GZ_SUFFIX.length();
}
/**
* Get the file name without the .gz extension
*
* @param fname The name of the gzip file
*
* @return fname without the ".gz" extension
*
* @throws IAE if fname is not a valid "*.gz" file name
*/
public static String getGzBaseName(String fname)
{
final String reducedFname = Files.getNameWithoutExtension(fname);
if (isGz(fname) && !reducedFname.isEmpty()) {
return reducedFname;
}
throw new IAE("[%s] is not a valid gz file name", fname);
}
/**
* Decompress an input stream from a file, based on the filename.
*/
public static InputStream decompress(final InputStream in, final String fileName) throws IOException
{
if (fileName.endsWith(GZ_SUFFIX)) {
return gzipInputStream(in);
} else if (fileName.endsWith(BZ2_SUFFIX)) {
return new BZip2CompressorInputStream(in, true);
} else if (fileName.endsWith(XZ_SUFFIX)) {
return new XZCompressorInputStream(in, true);
} else if (fileName.endsWith(SNAPPY_SUFFIX)) {
return new FramedSnappyCompressorInputStream(in);
} else if (fileName.endsWith(ZSTD_SUFFIX)) {
return new ZstdCompressorInputStream(in);
} else if (fileName.endsWith(ZIP_SUFFIX)) {
// This reads the first file in the archive.
final ZipInputStream zipIn = new ZipInputStream(in, StandardCharsets.UTF_8);
try {
final ZipEntry nextEntry = zipIn.getNextEntry();
if (nextEntry == null) {
zipIn.close();
// No files in the archive - return an empty stream.
return new ByteArrayInputStream(new byte[0]);
}
return zipIn;
}
catch (IOException e) {
try {
zipIn.close();
}
catch (IOException e2) {
e.addSuppressed(e2);
}
throw e;
}
} else {
return in;
}
}
}