| /* |
| * Licensed to the Apache Software Foundation (ASF) under one |
| * or more contributor license agreements. See the NOTICE file |
| * distributed with this work for additional information |
| * regarding copyright ownership. The ASF licenses this file |
| * to you under the Apache License, Version 2.0 (the |
| * "License"); you may not use this file except in compliance |
| * with the License. You may obtain a copy of the License at |
| * |
| * http://www.apache.org/licenses/LICENSE-2.0 |
| * |
| * Unless required by applicable law or agreed to in writing, software |
| * distributed under the License is distributed on an "AS IS" BASIS, |
| * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. |
| * See the License for the specific language governing permissions and |
| * limitations under the License. |
| */ |
| package org.apache.beam.sdk.io; |
| |
| import static org.apache.beam.sdk.transforms.display.DisplayDataMatchers.hasDisplayItem; |
| import static org.apache.beam.sdk.transforms.display.DisplayDataMatchers.includesDisplayDataFor; |
| import static org.hamcrest.Matchers.instanceOf; |
| import static org.hamcrest.Matchers.not; |
| import static org.junit.Assert.assertEquals; |
| import static org.junit.Assert.assertFalse; |
| import static org.junit.Assert.assertNotNull; |
| import static org.junit.Assert.assertNull; |
| import static org.junit.Assert.assertThat; |
| import static org.junit.Assert.assertTrue; |
| |
| import java.io.ByteArrayOutputStream; |
| import java.io.File; |
| import java.io.FileOutputStream; |
| import java.io.IOException; |
| import java.io.OutputStream; |
| import java.nio.ByteBuffer; |
| import java.nio.channels.ReadableByteChannel; |
| import java.nio.charset.StandardCharsets; |
| import java.util.ArrayList; |
| import java.util.List; |
| import java.util.NoSuchElementException; |
| import java.util.Random; |
| import java.util.zip.GZIPOutputStream; |
| import java.util.zip.ZipEntry; |
| import java.util.zip.ZipOutputStream; |
| import javax.annotation.Nullable; |
| import org.apache.beam.sdk.coders.Coder; |
| import org.apache.beam.sdk.coders.SerializableCoder; |
| import org.apache.beam.sdk.io.BoundedSource.BoundedReader; |
| import org.apache.beam.sdk.io.CompressedSource.CompressedReader; |
| import org.apache.beam.sdk.io.CompressedSource.CompressionMode; |
| import org.apache.beam.sdk.io.CompressedSource.DecompressingChannelFactory; |
| import org.apache.beam.sdk.io.FileBasedSource.FileBasedReader; |
| import org.apache.beam.sdk.io.fs.MatchResult.Metadata; |
| import org.apache.beam.sdk.options.PipelineOptions; |
| import org.apache.beam.sdk.options.PipelineOptionsFactory; |
| import org.apache.beam.sdk.options.ValueProvider.StaticValueProvider; |
| import org.apache.beam.sdk.testing.SourceTestUtils; |
| import org.apache.beam.sdk.transforms.DoFn; |
| import org.apache.beam.sdk.transforms.display.DisplayData; |
| import org.apache.beam.sdk.values.KV; |
| import org.apache.beam.vendor.guava.v20_0.com.google.common.collect.HashMultiset; |
| import org.apache.beam.vendor.guava.v20_0.com.google.common.collect.Lists; |
| import org.apache.beam.vendor.guava.v20_0.com.google.common.collect.Sets; |
| import org.apache.beam.vendor.guava.v20_0.com.google.common.io.Files; |
| import org.apache.beam.vendor.guava.v20_0.com.google.common.primitives.Bytes; |
| import org.apache.commons.compress.compressors.bzip2.BZip2CompressorOutputStream; |
| import org.apache.commons.compress.compressors.deflate.DeflateCompressorOutputStream; |
| import org.apache.commons.compress.compressors.gzip.GzipCompressorOutputStream; |
| import org.apache.commons.compress.compressors.zstandard.ZstdCompressorOutputStream; |
| import org.joda.time.Instant; |
| import org.junit.Rule; |
| import org.junit.Test; |
| import org.junit.rules.ExpectedException; |
| import org.junit.rules.TemporaryFolder; |
| import org.junit.runner.RunWith; |
| import org.junit.runners.JUnit4; |
| |
| /** Tests for CompressedSource. */ |
| @RunWith(JUnit4.class) |
| public class CompressedSourceTest { |
| |
| @Rule public TemporaryFolder tmpFolder = new TemporaryFolder(); |
| |
| @Rule public ExpectedException thrown = ExpectedException.none(); |
| |
| /** Test reading nonempty input with gzip. */ |
| @Test |
| public void testReadGzip() throws Exception { |
| byte[] input = generateInput(5000); |
| runReadTest(input, CompressionMode.GZIP); |
| } |
| |
| /** Test splittability of files in AUTO mode. */ |
| @Test |
| public void testAutoSplittable() throws Exception { |
| CompressedSource<Byte> source; |
| |
| // GZip files are not splittable |
| source = CompressedSource.from(new ByteSource("input.gz", 1)); |
| assertFalse(source.isSplittable()); |
| source = CompressedSource.from(new ByteSource("input.GZ", 1)); |
| assertFalse(source.isSplittable()); |
| |
| // BZ2 files are not splittable |
| source = CompressedSource.from(new ByteSource("input.bz2", 1)); |
| assertFalse(source.isSplittable()); |
| source = CompressedSource.from(new ByteSource("input.BZ2", 1)); |
| assertFalse(source.isSplittable()); |
| |
| // ZIP files are not splittable |
| source = CompressedSource.from(new ByteSource("input.zip", 1)); |
| assertFalse(source.isSplittable()); |
| source = CompressedSource.from(new ByteSource("input.ZIP", 1)); |
| assertFalse(source.isSplittable()); |
| |
| // ZSTD files are not splittable |
| source = CompressedSource.from(new ByteSource("input.zst", 1)); |
| assertFalse(source.isSplittable()); |
| source = CompressedSource.from(new ByteSource("input.ZST", 1)); |
| assertFalse(source.isSplittable()); |
| source = CompressedSource.from(new ByteSource("input.zstd", 1)); |
| assertFalse(source.isSplittable()); |
| |
| // DEFLATE files are not splittable |
| source = CompressedSource.from(new ByteSource("input.deflate", 1)); |
| assertFalse(source.isSplittable()); |
| source = CompressedSource.from(new ByteSource("input.DEFLATE", 1)); |
| assertFalse(source.isSplittable()); |
| |
| // Other extensions are assumed to be splittable. |
| source = CompressedSource.from(new ByteSource("input.txt", 1)); |
| assertTrue(source.isSplittable()); |
| source = CompressedSource.from(new ByteSource("input.csv", 1)); |
| assertTrue(source.isSplittable()); |
| } |
| |
| /** Test splittability of files in GZIP mode -- none should be splittable. */ |
| @Test |
| public void testGzipSplittable() throws Exception { |
| CompressedSource<Byte> source; |
| |
| // GZip files are not splittable |
| source = |
| CompressedSource.from(new ByteSource("input.gz", 1)) |
| .withDecompression(CompressionMode.GZIP); |
| assertFalse(source.isSplittable()); |
| source = |
| CompressedSource.from(new ByteSource("input.GZ", 1)) |
| .withDecompression(CompressionMode.GZIP); |
| assertFalse(source.isSplittable()); |
| |
| // Other extensions are also not splittable. |
| source = |
| CompressedSource.from(new ByteSource("input.txt", 1)) |
| .withDecompression(CompressionMode.GZIP); |
| assertFalse(source.isSplittable()); |
| source = |
| CompressedSource.from(new ByteSource("input.csv", 1)) |
| .withDecompression(CompressionMode.GZIP); |
| assertFalse(source.isSplittable()); |
| } |
| |
| /** Test reading nonempty input with bzip2. */ |
| @Test |
| public void testReadBzip2() throws Exception { |
| byte[] input = generateInput(5000); |
| runReadTest(input, CompressionMode.BZIP2); |
| } |
| |
| /** Test reading nonempty input with zip. */ |
| @Test |
| public void testReadZip() throws Exception { |
| byte[] input = generateInput(5000); |
| runReadTest(input, CompressionMode.ZIP); |
| } |
| |
| /** Test reading nonempty input with deflate. */ |
| @Test |
| public void testReadDeflate() throws Exception { |
| byte[] input = generateInput(5000); |
| runReadTest(input, CompressionMode.DEFLATE); |
| } |
| |
| /** Test reading empty input with gzip. */ |
| @Test |
| public void testEmptyReadGzip() throws Exception { |
| byte[] input = generateInput(0); |
| runReadTest(input, CompressionMode.GZIP); |
| } |
| |
| /** Test reading empty input with zstd. */ |
| @Test |
| public void testEmptyReadZstd() throws Exception { |
| byte[] input = generateInput(0); |
| runReadTest(input, CompressionMode.ZSTD); |
| } |
| |
| private static byte[] compressGzip(byte[] input) throws IOException { |
| ByteArrayOutputStream res = new ByteArrayOutputStream(); |
| try (GZIPOutputStream gzipStream = new GZIPOutputStream(res)) { |
| gzipStream.write(input); |
| } |
| return res.toByteArray(); |
| } |
| |
| private static byte[] concat(byte[] first, byte[] second) { |
| byte[] res = new byte[first.length + second.length]; |
| System.arraycopy(first, 0, res, 0, first.length); |
| System.arraycopy(second, 0, res, first.length, second.length); |
| return res; |
| } |
| |
| /** |
| * Test a concatenation of gzip files is correctly decompressed. |
| * |
| * <p>A concatenation of gzip files as one file is a valid gzip file and should decompress to be |
| * the concatenation of those individual files. |
| */ |
| @Test |
| public void testReadConcatenatedGzip() throws IOException { |
| byte[] header = "a,b,c\n".getBytes(StandardCharsets.UTF_8); |
| byte[] body = "1,2,3\n4,5,6\n7,8,9\n".getBytes(StandardCharsets.UTF_8); |
| byte[] expected = concat(header, body); |
| byte[] totalGz = concat(compressGzip(header), compressGzip(body)); |
| File tmpFile = tmpFolder.newFile(); |
| try (FileOutputStream os = new FileOutputStream(tmpFile)) { |
| os.write(totalGz); |
| } |
| |
| CompressedSource<Byte> source = |
| CompressedSource.from(new ByteSource(tmpFile.getAbsolutePath(), 1)) |
| .withDecompression(CompressionMode.GZIP); |
| List<Byte> actual = SourceTestUtils.readFromSource(source, PipelineOptionsFactory.create()); |
| assertEquals(Bytes.asList(expected), actual); |
| } |
| |
| /** |
| * Test a bzip2 file containing multiple streams is correctly decompressed. |
| * |
| * <p>A bzip2 file may contain multiple streams and should decompress as the concatenation of |
| * those streams. |
| */ |
| @Test |
| public void testReadMultiStreamBzip2() throws IOException { |
| CompressionMode mode = CompressionMode.BZIP2; |
| byte[] input1 = generateInput(5, 587973); |
| byte[] input2 = generateInput(5, 387374); |
| |
| ByteArrayOutputStream stream1 = new ByteArrayOutputStream(); |
| try (OutputStream os = getOutputStreamForMode(mode, stream1)) { |
| os.write(input1); |
| } |
| |
| ByteArrayOutputStream stream2 = new ByteArrayOutputStream(); |
| try (OutputStream os = getOutputStreamForMode(mode, stream2)) { |
| os.write(input2); |
| } |
| |
| File tmpFile = tmpFolder.newFile(); |
| try (OutputStream os = new FileOutputStream(tmpFile)) { |
| os.write(stream1.toByteArray()); |
| os.write(stream2.toByteArray()); |
| } |
| |
| byte[] output = Bytes.concat(input1, input2); |
| verifyReadContents(output, tmpFile, mode); |
| } |
| |
| /** Test reading empty input with bzip2. */ |
| @Test |
| public void testCompressedReadBzip2() throws Exception { |
| byte[] input = generateInput(0); |
| runReadTest(input, CompressionMode.BZIP2); |
| } |
| |
| /** Test reading empty input with zstd. */ |
| @Test |
| public void testCompressedReadZstd() throws Exception { |
| byte[] input = generateInput(0); |
| runReadTest(input, CompressionMode.ZSTD); |
| } |
| |
| /** Test reading according to filepattern when the file is gzipped. */ |
| @Test |
| public void testCompressedAccordingToFilepatternGzip() throws Exception { |
| byte[] input = generateInput(100); |
| File tmpFile = tmpFolder.newFile("test.gz"); |
| writeFile(tmpFile, input, CompressionMode.GZIP); |
| verifyReadContents(input, tmpFile, null /* default auto decompression factory */); |
| } |
| |
| /** Test reading according to filepattern when the file is bzipped. */ |
| @Test |
| public void testCompressedAccordingToFilepatternBzip2() throws Exception { |
| byte[] input = generateInput(100); |
| File tmpFile = tmpFolder.newFile("test.bz2"); |
| writeFile(tmpFile, input, CompressionMode.BZIP2); |
| verifyReadContents(input, tmpFile, null /* default auto decompression factory */); |
| } |
| |
| /** Test reading according to filepattern when the file is zstd compressed. */ |
| @Test |
| public void testCompressedAccordingToFilepatternZstd() throws Exception { |
| byte[] input = generateInput(100); |
| File tmpFile = tmpFolder.newFile("test.zst"); |
| writeFile(tmpFile, input, CompressionMode.ZSTD); |
| verifyReadContents(input, tmpFile, null /* default auto decompression factory */); |
| } |
| |
| /** Test reading multiple files with different compression. */ |
| @Test |
| public void testHeterogeneousCompression() throws Exception { |
| String baseName = "test-input"; |
| |
| // Expected data |
| byte[] generated = generateInput(1000); |
| List<Byte> expected = new ArrayList<>(); |
| |
| // Every sort of compression |
| File uncompressedFile = tmpFolder.newFile(baseName + ".bin"); |
| generated = generateInput(1000, 1); |
| Files.write(generated, uncompressedFile); |
| expected.addAll(Bytes.asList(generated)); |
| |
| File gzipFile = tmpFolder.newFile(baseName + ".gz"); |
| generated = generateInput(1000, 2); |
| writeFile(gzipFile, generated, CompressionMode.GZIP); |
| expected.addAll(Bytes.asList(generated)); |
| |
| File bzip2File = tmpFolder.newFile(baseName + ".bz2"); |
| generated = generateInput(1000, 3); |
| writeFile(bzip2File, generated, CompressionMode.BZIP2); |
| expected.addAll(Bytes.asList(generated)); |
| |
| File zstdFile = tmpFolder.newFile(baseName + ".zst"); |
| generated = generateInput(1000, 4); |
| writeFile(zstdFile, generated, CompressionMode.ZSTD); |
| expected.addAll(Bytes.asList(generated)); |
| |
| String filePattern = new File(tmpFolder.getRoot().toString(), baseName + ".*").toString(); |
| |
| CompressedSource<Byte> source = CompressedSource.from(new ByteSource(filePattern, 1)); |
| List<Byte> actual = SourceTestUtils.readFromSource(source, PipelineOptionsFactory.create()); |
| assertEquals(HashMultiset.create(actual), HashMultiset.create(expected)); |
| } |
| |
| @Test |
| public void testUncompressedFileWithAutoIsSplittable() throws Exception { |
| String baseName = "test-input"; |
| |
| File uncompressedFile = tmpFolder.newFile(baseName + ".bin"); |
| Files.write(generateInput(10), uncompressedFile); |
| |
| CompressedSource<Byte> source = |
| CompressedSource.from(new ByteSource(uncompressedFile.getPath(), 1)); |
| assertTrue(source.isSplittable()); |
| SourceTestUtils.assertSplitAtFractionExhaustive(source, PipelineOptionsFactory.create()); |
| } |
| |
| @Test |
| public void testUncompressedFileWithUncompressedIsSplittable() throws Exception { |
| String baseName = "test-input"; |
| |
| File uncompressedFile = tmpFolder.newFile(baseName + ".bin"); |
| Files.write(generateInput(10), uncompressedFile); |
| |
| CompressedSource<Byte> source = |
| CompressedSource.from(new ByteSource(uncompressedFile.getPath(), 1)) |
| .withDecompression(CompressionMode.UNCOMPRESSED); |
| assertTrue(source.isSplittable()); |
| SourceTestUtils.assertSplitAtFractionExhaustive(source, PipelineOptionsFactory.create()); |
| } |
| |
| @Test |
| public void testGzipFileIsNotSplittable() throws Exception { |
| String baseName = "test-input"; |
| |
| File compressedFile = tmpFolder.newFile(baseName + ".gz"); |
| writeFile(compressedFile, generateInput(10), CompressionMode.GZIP); |
| |
| CompressedSource<Byte> source = |
| CompressedSource.from(new ByteSource(compressedFile.getPath(), 1)); |
| assertFalse(source.isSplittable()); |
| } |
| |
| @Test |
| public void testBzip2FileIsNotSplittable() throws Exception { |
| String baseName = "test-input"; |
| |
| File compressedFile = tmpFolder.newFile(baseName + ".bz2"); |
| writeFile(compressedFile, generateInput(10), CompressionMode.BZIP2); |
| |
| CompressedSource<Byte> source = |
| CompressedSource.from(new ByteSource(compressedFile.getPath(), 1)); |
| assertFalse(source.isSplittable()); |
| } |
| |
| @Test |
| public void testZstdFileIsNotSplittable() throws Exception { |
| String baseName = "test-input"; |
| |
| File compressedFile = tmpFolder.newFile(baseName + ".zst"); |
| writeFile(compressedFile, generateInput(10), CompressionMode.ZSTD); |
| |
| CompressedSource<Byte> source = |
| CompressedSource.from(new ByteSource(compressedFile.getPath(), 1)); |
| assertFalse(source.isSplittable()); |
| } |
| |
| /** |
| * Test reading an uncompressed file with {@link CompressionMode#GZIP}, since we must support this |
| * due to properties of services that we read from. |
| */ |
| @Test |
| public void testFalseGzipStream() throws Exception { |
| byte[] input = generateInput(1000); |
| File tmpFile = tmpFolder.newFile("test.gz"); |
| Files.write(input, tmpFile); |
| verifyReadContents(input, tmpFile, CompressionMode.GZIP); |
| } |
| |
| /** |
| * Test reading an uncompressed file with {@link CompressionMode#BZIP2}, and show that we fail. |
| */ |
| @Test |
| public void testFalseBzip2Stream() throws Exception { |
| byte[] input = generateInput(1000); |
| File tmpFile = tmpFolder.newFile("test.bz2"); |
| Files.write(input, tmpFile); |
| thrown.expectMessage("Stream is not in the BZip2 format"); |
| verifyReadContents(input, tmpFile, CompressionMode.BZIP2); |
| } |
| |
| /** Test reading an uncompressed file with {@link Compression#ZSTD}, and show that we fail. */ |
| @Test |
| public void testFalseZstdStream() throws Exception { |
| byte[] input = generateInput(1000); |
| File tmpFile = tmpFolder.newFile("test.zst"); |
| Files.write(input, tmpFile); |
| thrown.expectMessage("Decompression error: Unknown frame descriptor"); |
| verifyReadContents(input, tmpFile, CompressionMode.ZSTD); |
| } |
| |
| /** |
| * Test reading an empty input file with gzip; it must be interpreted as uncompressed because the |
| * gzip header is two bytes. |
| */ |
| @Test |
| public void testEmptyReadGzipUncompressed() throws Exception { |
| byte[] input = generateInput(0); |
| File tmpFile = tmpFolder.newFile("test.gz"); |
| Files.write(input, tmpFile); |
| verifyReadContents(input, tmpFile, CompressionMode.GZIP); |
| } |
| |
| /** |
| * Test reading single byte input with gzip; it must be interpreted as uncompressed because the |
| * gzip header is two bytes. |
| */ |
| @Test |
| public void testOneByteReadGzipUncompressed() throws Exception { |
| byte[] input = generateInput(1); |
| File tmpFile = tmpFolder.newFile("test.gz"); |
| Files.write(input, tmpFile); |
| verifyReadContents(input, tmpFile, CompressionMode.GZIP); |
| } |
| |
| /** Test reading multiple files. */ |
| @Test |
| public void testCompressedReadMultipleFiles() throws Exception { |
| int numFiles = 3; |
| String baseName = "test_input-"; |
| String filePattern = new File(tmpFolder.getRoot().toString(), baseName + "*").toString(); |
| List<Byte> expected = new ArrayList<>(); |
| |
| for (int i = 0; i < numFiles; i++) { |
| byte[] generated = generateInput(100); |
| File tmpFile = tmpFolder.newFile(baseName + i); |
| writeFile(tmpFile, generated, CompressionMode.GZIP); |
| expected.addAll(Bytes.asList(generated)); |
| } |
| |
| CompressedSource<Byte> source = |
| CompressedSource.from(new ByteSource(filePattern, 1)) |
| .withDecompression(CompressionMode.GZIP); |
| List<Byte> actual = SourceTestUtils.readFromSource(source, PipelineOptionsFactory.create()); |
| assertEquals(HashMultiset.create(expected), HashMultiset.create(actual)); |
| } |
| |
| @Test |
| public void testDisplayData() { |
| ByteSource inputSource = |
| new ByteSource("foobar.txt", 1) { |
| @Override |
| public void populateDisplayData(DisplayData.Builder builder) { |
| builder.add(DisplayData.item("foo", "bar")); |
| } |
| }; |
| |
| CompressedSource<?> compressedSource = CompressedSource.from(inputSource); |
| CompressedSource<?> gzipSource = compressedSource.withDecompression(CompressionMode.GZIP); |
| |
| DisplayData compressedSourceDisplayData = DisplayData.from(compressedSource); |
| DisplayData gzipDisplayData = DisplayData.from(gzipSource); |
| |
| assertThat(compressedSourceDisplayData, hasDisplayItem("compressionMode")); |
| assertThat(gzipDisplayData, hasDisplayItem("compressionMode", CompressionMode.GZIP.toString())); |
| assertThat(compressedSourceDisplayData, hasDisplayItem("source", inputSource.getClass())); |
| assertThat(compressedSourceDisplayData, includesDisplayDataFor("source", inputSource)); |
| } |
| |
| /** Generate byte array of given size. */ |
| private byte[] generateInput(int size) { |
| // Arbitrary but fixed seed |
| return generateInput(size, 285930); |
| } |
| |
| /** Generate byte array of given size. */ |
| private byte[] generateInput(int size, int seed) { |
| // Arbitrary but fixed seed |
| Random random = new Random(seed); |
| byte[] buff = new byte[size]; |
| random.nextBytes(buff); |
| return buff; |
| } |
| |
| /** Get a compressing stream for a given compression mode. */ |
| private OutputStream getOutputStreamForMode(CompressionMode mode, OutputStream stream) |
| throws IOException { |
| switch (mode) { |
| case GZIP: |
| return new GzipCompressorOutputStream(stream); |
| case BZIP2: |
| return new BZip2CompressorOutputStream(stream); |
| case ZIP: |
| return new TestZipOutputStream(stream); |
| case ZSTD: |
| return new ZstdCompressorOutputStream(stream); |
| case DEFLATE: |
| return new DeflateCompressorOutputStream(stream); |
| default: |
| throw new RuntimeException("Unexpected compression mode"); |
| } |
| } |
| |
| /** Extend of {@link ZipOutputStream} that splits up bytes into multiple entries. */ |
| private static class TestZipOutputStream extends OutputStream { |
| |
| private ZipOutputStream zipOutputStream; |
| private long offset = 0; |
| private int entry = 0; |
| |
| public TestZipOutputStream(OutputStream stream) throws IOException { |
| super(); |
| zipOutputStream = new ZipOutputStream(stream); |
| zipOutputStream.putNextEntry(new ZipEntry(String.format("entry-%05d", entry))); |
| } |
| |
| @Override |
| public void write(int b) throws IOException { |
| zipOutputStream.write(b); |
| offset++; |
| if (offset % 100 == 0) { |
| entry++; |
| zipOutputStream.putNextEntry(new ZipEntry(String.format("entry-%05d", entry))); |
| } |
| } |
| |
| @Override |
| public void close() throws IOException { |
| zipOutputStream.closeEntry(); |
| super.close(); |
| } |
| } |
| |
| /** Writes a single output file. */ |
| private void writeFile(File file, byte[] input, CompressionMode mode) throws IOException { |
| try (OutputStream os = getOutputStreamForMode(mode, new FileOutputStream(file))) { |
| os.write(input); |
| } |
| } |
| |
| /** Run a single read test, writing and reading back input with the given compression mode. */ |
| private void runReadTest( |
| byte[] input, |
| CompressionMode inputCompressionMode, |
| @Nullable DecompressingChannelFactory decompressionFactory) |
| throws IOException { |
| File tmpFile = tmpFolder.newFile(); |
| writeFile(tmpFile, input, inputCompressionMode); |
| verifyReadContents(input, tmpFile, decompressionFactory); |
| } |
| |
| private void verifyReadContents( |
| byte[] expected, File inputFile, @Nullable DecompressingChannelFactory decompressionFactory) |
| throws IOException { |
| CompressedSource<Byte> source = |
| CompressedSource.from(new ByteSource(inputFile.toPath().toString(), 1)); |
| if (decompressionFactory != null) { |
| source = source.withDecompression(decompressionFactory); |
| } |
| List<KV<Long, Byte>> actualOutput = Lists.newArrayList(); |
| try (BoundedReader<Byte> reader = source.createReader(PipelineOptionsFactory.create())) { |
| for (boolean more = reader.start(); more; more = reader.advance()) { |
| actualOutput.add(KV.of(reader.getCurrentTimestamp().getMillis(), reader.getCurrent())); |
| } |
| } |
| List<KV<Long, Byte>> expectedOutput = Lists.newArrayList(); |
| for (int i = 0; i < expected.length; i++) { |
| expectedOutput.add(KV.of((long) i, expected[i])); |
| } |
| assertEquals(expectedOutput, actualOutput); |
| } |
| |
| /** Run a single read test, writing and reading back input with the given compression mode. */ |
| private void runReadTest(byte[] input, CompressionMode mode) throws IOException { |
| runReadTest(input, mode, mode); |
| } |
| |
| /** Dummy source for use in tests. */ |
| private static class ByteSource extends FileBasedSource<Byte> { |
| public ByteSource(String fileOrPatternSpec, long minBundleSize) { |
| super(StaticValueProvider.of(fileOrPatternSpec), minBundleSize); |
| } |
| |
| public ByteSource(Metadata metadata, long minBundleSize, long startOffset, long endOffset) { |
| super(metadata, minBundleSize, startOffset, endOffset); |
| } |
| |
| @Override |
| protected ByteSource createForSubrangeOfFile(Metadata metadata, long start, long end) { |
| return new ByteSource(metadata, getMinBundleSize(), start, end); |
| } |
| |
| @Override |
| protected FileBasedReader<Byte> createSingleFileReader(PipelineOptions options) { |
| return new ByteReader(this); |
| } |
| |
| @Override |
| public Coder<Byte> getOutputCoder() { |
| return SerializableCoder.of(Byte.class); |
| } |
| |
| private static class ByteReader extends FileBasedReader<Byte> { |
| ByteBuffer buff = ByteBuffer.allocate(1); |
| Byte current; |
| long offset; |
| ReadableByteChannel channel; |
| |
| public ByteReader(ByteSource source) { |
| super(source); |
| offset = source.getStartOffset() - 1; |
| } |
| |
| @Override |
| public Byte getCurrent() throws NoSuchElementException { |
| return current; |
| } |
| |
| @Override |
| protected boolean isAtSplitPoint() { |
| return true; |
| } |
| |
| @Override |
| protected void startReading(ReadableByteChannel channel) throws IOException { |
| this.channel = channel; |
| } |
| |
| @Override |
| protected boolean readNextRecord() throws IOException { |
| buff.clear(); |
| if (channel.read(buff) != 1) { |
| return false; |
| } |
| current = buff.get(0); |
| offset += 1; |
| return true; |
| } |
| |
| @Override |
| protected long getCurrentOffset() { |
| return offset; |
| } |
| |
| @Override |
| public Instant getCurrentTimestamp() throws NoSuchElementException { |
| return new Instant(getCurrentOffset()); |
| } |
| } |
| } |
| |
| private static class ExtractIndexFromTimestamp extends DoFn<Byte, KV<Long, Byte>> { |
| @ProcessElement |
| public void processElement(ProcessContext context) { |
| context.output(KV.of(context.timestamp().getMillis(), context.element())); |
| } |
| } |
| |
| @Test |
| public void testEmptyGzipProgress() throws IOException { |
| File tmpFile = tmpFolder.newFile("empty.gz"); |
| String filename = tmpFile.toPath().toString(); |
| writeFile(tmpFile, new byte[0], CompressionMode.GZIP); |
| |
| PipelineOptions options = PipelineOptionsFactory.create(); |
| CompressedSource<Byte> source = CompressedSource.from(new ByteSource(filename, 1)); |
| try (BoundedReader<Byte> readerOrig = source.createReader(options)) { |
| assertThat(readerOrig, instanceOf(CompressedReader.class)); |
| CompressedReader<Byte> reader = (CompressedReader<Byte>) readerOrig; |
| // before starting |
| assertEquals(0.0, reader.getFractionConsumed(), 1e-6); |
| assertEquals(0, reader.getSplitPointsConsumed()); |
| assertEquals(1, reader.getSplitPointsRemaining()); |
| |
| // confirm empty |
| assertFalse(reader.start()); |
| |
| // after reading empty source |
| assertEquals(1.0, reader.getFractionConsumed(), 1e-6); |
| assertEquals(0, reader.getSplitPointsConsumed()); |
| assertEquals(0, reader.getSplitPointsRemaining()); |
| } |
| } |
| |
| @Test |
| public void testGzipProgress() throws IOException { |
| int numRecords = 3; |
| File tmpFile = tmpFolder.newFile("nonempty.gz"); |
| String filename = tmpFile.toPath().toString(); |
| writeFile(tmpFile, new byte[numRecords], CompressionMode.GZIP); |
| |
| PipelineOptions options = PipelineOptionsFactory.create(); |
| CompressedSource<Byte> source = CompressedSource.from(new ByteSource(filename, 1)); |
| try (BoundedReader<Byte> readerOrig = source.createReader(options)) { |
| assertThat(readerOrig, instanceOf(CompressedReader.class)); |
| CompressedReader<Byte> reader = (CompressedReader<Byte>) readerOrig; |
| // before starting |
| assertEquals(0.0, reader.getFractionConsumed(), 1e-6); |
| assertEquals(0, reader.getSplitPointsConsumed()); |
| assertEquals(1, reader.getSplitPointsRemaining()); |
| |
| // confirm has three records |
| for (int i = 0; i < numRecords; ++i) { |
| if (i == 0) { |
| assertTrue(reader.start()); |
| } else { |
| assertTrue(reader.advance()); |
| } |
| assertEquals(0, reader.getSplitPointsConsumed()); |
| assertEquals(1, reader.getSplitPointsRemaining()); |
| } |
| assertFalse(reader.advance()); |
| |
| // after reading empty source |
| assertEquals(1.0, reader.getFractionConsumed(), 1e-6); |
| assertEquals(1, reader.getSplitPointsConsumed()); |
| assertEquals(0, reader.getSplitPointsRemaining()); |
| } |
| } |
| |
| @Test |
| public void testUnsplittable() throws IOException { |
| String baseName = "test-input"; |
| File compressedFile = tmpFolder.newFile(baseName + ".gz"); |
| byte[] input = generateInput(10000); |
| writeFile(compressedFile, input, CompressionMode.GZIP); |
| |
| CompressedSource<Byte> source = |
| CompressedSource.from(new ByteSource(compressedFile.getPath(), 1)); |
| List<Byte> expected = Lists.newArrayList(); |
| for (byte i : input) { |
| expected.add(i); |
| } |
| |
| PipelineOptions options = PipelineOptionsFactory.create(); |
| BoundedReader<Byte> reader = source.createReader(options); |
| |
| List<Byte> actual = Lists.newArrayList(); |
| for (boolean hasNext = reader.start(); hasNext; hasNext = reader.advance()) { |
| actual.add(reader.getCurrent()); |
| // checkpoint every 9 elements |
| if (actual.size() % 9 == 0) { |
| Double fractionConsumed = reader.getFractionConsumed(); |
| assertNotNull(fractionConsumed); |
| assertNull(reader.splitAtFraction(fractionConsumed)); |
| } |
| } |
| assertEquals(expected.size(), actual.size()); |
| assertEquals(Sets.newHashSet(expected), Sets.newHashSet(actual)); |
| } |
| |
| @Test |
| public void testSplittableProgress() throws IOException { |
| File tmpFile = tmpFolder.newFile("nonempty.txt"); |
| String filename = tmpFile.toPath().toString(); |
| Files.write(new byte[2], tmpFile); |
| |
| PipelineOptions options = PipelineOptionsFactory.create(); |
| CompressedSource<Byte> source = CompressedSource.from(new ByteSource(filename, 1)); |
| try (BoundedReader<Byte> readerOrig = source.createReader(options)) { |
| assertThat(readerOrig, not(instanceOf(CompressedReader.class))); |
| assertThat(readerOrig, instanceOf(FileBasedReader.class)); |
| FileBasedReader<Byte> reader = (FileBasedReader<Byte>) readerOrig; |
| |
| // Check preconditions before starting |
| assertEquals(0.0, reader.getFractionConsumed(), 1e-6); |
| assertEquals(0, reader.getSplitPointsConsumed()); |
| assertEquals(BoundedReader.SPLIT_POINTS_UNKNOWN, reader.getSplitPointsRemaining()); |
| |
| // First record: none consumed, unknown remaining. |
| assertTrue(reader.start()); |
| assertEquals(0, reader.getSplitPointsConsumed()); |
| assertEquals(BoundedReader.SPLIT_POINTS_UNKNOWN, reader.getSplitPointsRemaining()); |
| |
| // Second record: 1 consumed, know that we're on the last record. |
| assertTrue(reader.advance()); |
| assertEquals(1, reader.getSplitPointsConsumed()); |
| assertEquals(1, reader.getSplitPointsRemaining()); |
| |
| // Confirm empty and check post-conditions |
| assertFalse(reader.advance()); |
| assertEquals(1.0, reader.getFractionConsumed(), 1e-6); |
| assertEquals(2, reader.getSplitPointsConsumed()); |
| assertEquals(0, reader.getSplitPointsRemaining()); |
| } |
| } |
| } |