| /* |
| * Licensed to the Apache Software Foundation (ASF) under one |
| * or more contributor license agreements. See the NOTICE file |
| * distributed with this work for additional information |
| * regarding copyright ownership. The ASF licenses this file |
| * to you under the Apache License, Version 2.0 (the |
| * "License"); you may not use this file except in compliance |
| * with the License. You may obtain a copy of the License at |
| * |
| * http://www.apache.org/licenses/LICENSE-2.0 |
| * |
| * Unless required by applicable law or agreed to in writing, software |
| * distributed under the License is distributed on an "AS IS" BASIS, |
| * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. |
| * See the License for the specific language governing permissions and |
| * limitations under the License. |
| */ |
| package org.apache.beam.sdk.io; |
| |
| import static java.nio.charset.StandardCharsets.UTF_8; |
| import static org.apache.beam.sdk.TestUtils.LINES_ARRAY; |
| import static org.apache.beam.sdk.TestUtils.NO_LINES_ARRAY; |
| import static org.apache.beam.sdk.io.Compression.AUTO; |
| import static org.apache.beam.sdk.io.Compression.BZIP2; |
| import static org.apache.beam.sdk.io.Compression.DEFLATE; |
| import static org.apache.beam.sdk.io.Compression.GZIP; |
| import static org.apache.beam.sdk.io.Compression.UNCOMPRESSED; |
| import static org.apache.beam.sdk.io.Compression.ZIP; |
| import static org.apache.beam.sdk.transforms.display.DisplayDataMatchers.hasDisplayItem; |
| import static org.hamcrest.MatcherAssert.assertThat; |
| import static org.hamcrest.Matchers.containsInAnyOrder; |
| import static org.hamcrest.Matchers.equalTo; |
| import static org.hamcrest.Matchers.greaterThan; |
| import static org.hamcrest.Matchers.hasSize; |
| import static org.hamcrest.Matchers.startsWith; |
| import static org.junit.Assert.assertEquals; |
| import static org.junit.Assert.assertFalse; |
| import static org.junit.Assert.assertNotNull; |
| import static org.junit.Assert.assertTrue; |
| import static org.junit.Assume.assumeFalse; |
| |
| import java.io.BufferedWriter; |
| import java.io.File; |
| import java.io.FileOutputStream; |
| import java.io.IOException; |
| import java.io.OutputStream; |
| import java.io.PrintStream; |
| import java.io.Writer; |
| import java.nio.charset.Charset; |
| import java.nio.file.Files; |
| import java.nio.file.Path; |
| import java.util.ArrayList; |
| import java.util.Arrays; |
| import java.util.Collections; |
| import java.util.List; |
| import java.util.zip.GZIPOutputStream; |
| import java.util.zip.ZipEntry; |
| import java.util.zip.ZipOutputStream; |
| import org.apache.beam.sdk.Pipeline; |
| import org.apache.beam.sdk.coders.StringUtf8Coder; |
| import org.apache.beam.sdk.io.fs.EmptyMatchTreatment; |
| import org.apache.beam.sdk.options.PipelineOptions; |
| import org.apache.beam.sdk.options.PipelineOptionsFactory; |
| import org.apache.beam.sdk.options.ValueProvider; |
| import org.apache.beam.sdk.testing.NeedsRunner; |
| import org.apache.beam.sdk.testing.PAssert; |
| import org.apache.beam.sdk.testing.SourceTestUtils; |
| import org.apache.beam.sdk.testing.TestPipeline; |
| import org.apache.beam.sdk.testing.UsesUnboundedSplittableParDo; |
| import org.apache.beam.sdk.transforms.Create; |
| import org.apache.beam.sdk.transforms.DoFn; |
| import org.apache.beam.sdk.transforms.PTransform; |
| import org.apache.beam.sdk.transforms.ParDo; |
| import org.apache.beam.sdk.transforms.ToString; |
| import org.apache.beam.sdk.transforms.Watch; |
| import org.apache.beam.sdk.transforms.display.DisplayData; |
| import org.apache.beam.sdk.transforms.windowing.AfterPane; |
| import org.apache.beam.sdk.transforms.windowing.FixedWindows; |
| import org.apache.beam.sdk.transforms.windowing.Repeatedly; |
| import org.apache.beam.sdk.transforms.windowing.Window; |
| import org.apache.beam.sdk.util.CoderUtils; |
| import org.apache.beam.sdk.values.PCollection; |
| import org.apache.beam.vendor.guava.v26_0_jre.com.google.common.base.Charsets; |
| import org.apache.beam.vendor.guava.v26_0_jre.com.google.common.base.Joiner; |
| import org.apache.beam.vendor.guava.v26_0_jre.com.google.common.collect.ImmutableList; |
| import org.apache.beam.vendor.guava.v26_0_jre.com.google.common.collect.Iterables; |
| import org.apache.beam.vendor.guava.v26_0_jre.com.google.common.collect.Lists; |
| import org.apache.commons.compress.compressors.bzip2.BZip2CompressorOutputStream; |
| import org.apache.commons.compress.compressors.deflate.DeflateCompressorOutputStream; |
| import org.apache.commons.lang3.SystemUtils; |
| import org.joda.time.Duration; |
| import org.junit.Rule; |
| import org.junit.Test; |
| import org.junit.experimental.categories.Category; |
| import org.junit.experimental.runners.Enclosed; |
| import org.junit.rules.TemporaryFolder; |
| import org.junit.runner.RunWith; |
| import org.junit.runners.JUnit4; |
| import org.junit.runners.Parameterized; |
| |
| /** Tests for {@link TextIO.Read}. */ |
| @RunWith(Enclosed.class) |
| public class TextIOReadTest { |
| private static final int LINES_NUMBER_FOR_LARGE = 1000; |
| private static final List<String> EMPTY = Collections.emptyList(); |
| private static final List<String> TINY = |
| Arrays.asList("Irritable eagle", "Optimistic jay", "Fanciful hawk"); |
| |
| private static final List<String> LARGE = makeLines(LINES_NUMBER_FOR_LARGE); |
| |
| private static File writeToFile( |
| List<String> lines, TemporaryFolder folder, String fileName, Compression compression) |
| throws IOException { |
| File file = folder.getRoot().toPath().resolve(fileName).toFile(); |
| OutputStream output = new FileOutputStream(file); |
| switch (compression) { |
| case UNCOMPRESSED: |
| break; |
| case GZIP: |
| output = new GZIPOutputStream(output); |
| break; |
| case BZIP2: |
| output = new BZip2CompressorOutputStream(output); |
| break; |
| case ZIP: |
| ZipOutputStream zipOutput = new ZipOutputStream(output); |
| zipOutput.putNextEntry(new ZipEntry("entry")); |
| output = zipOutput; |
| break; |
| case DEFLATE: |
| output = new DeflateCompressorOutputStream(output); |
| break; |
| default: |
| throw new UnsupportedOperationException(compression.toString()); |
| } |
| writeToStreamAndClose(lines, output); |
| return file; |
| } |
| |
| /** |
| * Helper that writes the given lines (adding a newline in between) to a stream, then closes the |
| * stream. |
| */ |
| private static void writeToStreamAndClose(List<String> lines, OutputStream outputStream) { |
| try (PrintStream writer = new PrintStream(outputStream)) { |
| for (String line : lines) { |
| writer.println(line); |
| } |
| } |
| } |
| |
| /** Helper to make an array of compressible strings. Returns ["word"i] for i in range(0,n). */ |
| private static List<String> makeLines(int n) { |
| List<String> ret = new ArrayList<>(); |
| for (int i = 0; i < n; ++i) { |
| ret.add("word" + i); |
| } |
| return ret; |
| } |
| |
| /** |
| * Helper method that runs a variety of ways to read a single file using TextIO and checks that |
| * they all match the given expected output. |
| * |
| * <p>The transforms being verified are: |
| * |
| * <ul> |
| * <li>TextIO.read().from(filename).withCompression(compressionType) |
| * <li>TextIO.read().from(filename).withCompression(compressionType) .withHintMatchesManyFiles() |
| * <li>TextIO.readFiles().withCompression(compressionType) |
| * <li>TextIO.readAll().withCompression(compressionType) |
| * </ul> |
| */ |
| private static void assertReadingCompressedFileMatchesExpected( |
| File file, Compression compression, List<String> expected, Pipeline p) { |
| |
| TextIO.Read read = TextIO.read().from(file.getPath()).withCompression(compression); |
| |
| PAssert.that(p.apply("Read_" + file + "_" + compression.toString(), read)) |
| .containsInAnyOrder(expected); |
| PAssert.that( |
| p.apply( |
| "Read_" + file + "_" + compression.toString() + "_many", |
| read.withHintMatchesManyFiles())) |
| .containsInAnyOrder(expected); |
| |
| PAssert.that( |
| p.apply("Create_Paths_ReadFiles_" + file, Create.of(file.getPath())) |
| .apply("Match_" + file, FileIO.matchAll()) |
| .apply("ReadMatches_" + file, FileIO.readMatches().withCompression(compression)) |
| .apply("ReadFiles_" + compression.toString(), TextIO.readFiles())) |
| .containsInAnyOrder(expected); |
| |
| PAssert.that( |
| p.apply("Create_Paths_ReadAll_" + file, Create.of(file.getPath())) |
| .apply( |
| "ReadAll_" + compression.toString(), |
| TextIO.readAll().withCompression(compression))) |
| .containsInAnyOrder(expected); |
| } |
| |
| /** |
| * Create a zip file with the given lines. |
| * |
| * @param expected A list of expected lines, populated in the zip file. |
| * @param folder A temporary folder used to create files. |
| * @param filename Optionally zip file name (can be null). |
| * @param fieldsEntries Fields to write in zip entries. |
| * @return The zip filename. |
| * @throws Exception In case of a failure during zip file creation. |
| */ |
| private static File createZipFile( |
| List<String> expected, TemporaryFolder folder, String filename, String[]... fieldsEntries) |
| throws Exception { |
| File tmpFile = folder.getRoot().toPath().resolve(filename).toFile(); |
| |
| ZipOutputStream out = new ZipOutputStream(new FileOutputStream(tmpFile)); |
| PrintStream writer = new PrintStream(out, true /* auto-flush on write */); |
| |
| int index = 0; |
| for (String[] entry : fieldsEntries) { |
| out.putNextEntry(new ZipEntry(Integer.toString(index))); |
| for (String field : entry) { |
| writer.println(field); |
| expected.add(field); |
| } |
| out.closeEntry(); |
| index++; |
| } |
| |
| writer.close(); |
| out.close(); |
| |
| return tmpFile; |
| } |
| |
| private static TextSource prepareSource( |
| TemporaryFolder temporaryFolder, byte[] data, byte[] delimiter) throws IOException { |
| Path path = temporaryFolder.newFile().toPath(); |
| Files.write(path, data); |
| return new TextSource( |
| ValueProvider.StaticValueProvider.of(path.toString()), |
| EmptyMatchTreatment.DISALLOW, |
| delimiter); |
| } |
| |
| private static String getFileSuffix(Compression compression) { |
| switch (compression) { |
| case UNCOMPRESSED: |
| return ".txt"; |
| case GZIP: |
| return ".gz"; |
| case BZIP2: |
| return ".bz2"; |
| case ZIP: |
| return ".zip"; |
| case DEFLATE: |
| return ".deflate"; |
| default: |
| return ""; |
| } |
| } |
| |
| /** Tests for reading from different size of files with various Compression. */ |
| @RunWith(Parameterized.class) |
| public static class CompressedReadTest { |
| @Rule public TemporaryFolder tempFolder = new TemporaryFolder(); |
| @Rule public TestPipeline p = TestPipeline.create(); |
| |
| @Parameterized.Parameters(name = "{index}: {1}") |
| public static Iterable<Object[]> data() { |
| return ImmutableList.<Object[]>builder() |
| .add(new Object[] {EMPTY, UNCOMPRESSED}) |
| .add(new Object[] {EMPTY, GZIP}) |
| .add(new Object[] {EMPTY, BZIP2}) |
| .add(new Object[] {EMPTY, ZIP}) |
| .add(new Object[] {EMPTY, DEFLATE}) |
| .add(new Object[] {TINY, UNCOMPRESSED}) |
| .add(new Object[] {TINY, GZIP}) |
| .add(new Object[] {TINY, BZIP2}) |
| .add(new Object[] {TINY, ZIP}) |
| .add(new Object[] {TINY, DEFLATE}) |
| .add(new Object[] {LARGE, UNCOMPRESSED}) |
| .add(new Object[] {LARGE, GZIP}) |
| .add(new Object[] {LARGE, BZIP2}) |
| .add(new Object[] {LARGE, ZIP}) |
| .add(new Object[] {LARGE, DEFLATE}) |
| .build(); |
| } |
| |
| @Parameterized.Parameter(0) |
| public List<String> lines; |
| |
| @Parameterized.Parameter(1) |
| public Compression compression; |
| |
| /** Tests reading from a small, compressed file with no extension. */ |
| @Test |
| @Category(NeedsRunner.class) |
| public void testCompressedReadWithoutExtension() throws Exception { |
| String fileName = lines.size() + "_" + compression + "_no_extension"; |
| File fileWithNoExtension = writeToFile(lines, tempFolder, fileName, compression); |
| assertReadingCompressedFileMatchesExpected(fileWithNoExtension, compression, lines, p); |
| p.run(); |
| } |
| |
| @Test |
| @Category(NeedsRunner.class) |
| public void testCompressedReadWithExtension() throws Exception { |
| String fileName = |
| lines.size() + "_" + compression + "_no_extension" + getFileSuffix(compression); |
| File fileWithExtension = writeToFile(lines, tempFolder, fileName, compression); |
| |
| // Sanity check that we're properly testing compression. |
| if (lines.size() == LINES_NUMBER_FOR_LARGE && !compression.equals(UNCOMPRESSED)) { |
| File uncompressedFile = writeToFile(lines, tempFolder, "large.txt", UNCOMPRESSED); |
| assertThat(uncompressedFile.length(), greaterThan(fileWithExtension.length())); |
| } |
| |
| assertReadingCompressedFileMatchesExpected(fileWithExtension, compression, lines, p); |
| p.run(); |
| } |
| |
| @Test |
| @Category(NeedsRunner.class) |
| public void testReadWithAuto() throws Exception { |
| // Files with non-compressed extensions should work in AUTO and UNCOMPRESSED modes. |
| String fileName = |
| lines.size() + "_" + compression + "_no_extension" + getFileSuffix(compression); |
| File fileWithExtension = writeToFile(lines, tempFolder, fileName, compression); |
| assertReadingCompressedFileMatchesExpected(fileWithExtension, AUTO, lines, p); |
| p.run(); |
| } |
| } |
| |
| /** Tests for reading files with various delimiters. */ |
| @RunWith(Parameterized.class) |
| public static class ReadWithDelimiterTest { |
| private static final ImmutableList<String> EXPECTED = ImmutableList.of("asdf", "hjkl", "xyz"); |
| @Rule public TemporaryFolder tempFolder = new TemporaryFolder(); |
| |
| @Parameterized.Parameters(name = "{index}: {0}") |
| public static Iterable<Object[]> data() { |
| return ImmutableList.<Object[]>builder() |
| .add(new Object[] {"\n\n\n", ImmutableList.of("", "", "")}) |
| .add(new Object[] {"asdf\nhjkl\nxyz\n", EXPECTED}) |
| .add(new Object[] {"asdf\rhjkl\rxyz\r", EXPECTED}) |
| .add(new Object[] {"asdf\r\nhjkl\r\nxyz\r\n", EXPECTED}) |
| .add(new Object[] {"asdf\rhjkl\r\nxyz\n", EXPECTED}) |
| .add(new Object[] {"asdf\nhjkl\nxyz", EXPECTED}) |
| .add(new Object[] {"asdf\rhjkl\rxyz", EXPECTED}) |
| .add(new Object[] {"asdf\r\nhjkl\r\nxyz", EXPECTED}) |
| .add(new Object[] {"asdf\rhjkl\r\nxyz", EXPECTED}) |
| .build(); |
| } |
| |
| @Parameterized.Parameter(0) |
| public String line; |
| |
| @Parameterized.Parameter(1) |
| public ImmutableList<String> expected; |
| |
| @Test |
| public void testReadLinesWithDelimiter() throws Exception { |
| runTestReadWithData(line.getBytes(UTF_8), expected); |
| } |
| |
| @Test |
| public void testSplittingSource() throws Exception { |
| TextSource source = prepareSource(line.getBytes(UTF_8)); |
| SourceTestUtils.assertSplitAtFractionExhaustive(source, PipelineOptionsFactory.create()); |
| } |
| |
| private TextSource prepareSource(byte[] data) throws IOException { |
| return TextIOReadTest.prepareSource(tempFolder, data, null); |
| } |
| |
| private void runTestReadWithData(byte[] data, List<String> expectedResults) throws Exception { |
| TextSource source = prepareSource(data); |
| List<String> actual = SourceTestUtils.readFromSource(source, PipelineOptionsFactory.create()); |
| assertThat( |
| actual, containsInAnyOrder(new ArrayList<>(expectedResults).toArray(new String[0]))); |
| } |
| } |
| |
| /** Tests for some basic operations in {@link TextIO.Read}. */ |
| @RunWith(JUnit4.class) |
| public static class BasicIOTest { |
| @Rule public TemporaryFolder tempFolder = new TemporaryFolder(); |
| @Rule public TestPipeline p = TestPipeline.create(); |
| |
| private void runTestRead(String[] expected) throws Exception { |
| File tmpFile = tempFolder.newFile(); |
| String filename = tmpFile.getPath(); |
| |
| try (PrintStream writer = new PrintStream(new FileOutputStream(tmpFile))) { |
| for (String elem : expected) { |
| byte[] encodedElem = CoderUtils.encodeToByteArray(StringUtf8Coder.of(), elem); |
| String line = new String(encodedElem, Charsets.UTF_8); |
| writer.println(line); |
| } |
| } |
| |
| TextIO.Read read = TextIO.read().from(filename); |
| PCollection<String> output = p.apply(read); |
| |
| PAssert.that(output).containsInAnyOrder(expected); |
| p.run(); |
| } |
| |
| @Test |
| public void testDelimiterSelfOverlaps() { |
| assertFalse(TextIO.Read.isSelfOverlapping(new byte[] {'a', 'b', 'c'})); |
| assertFalse(TextIO.Read.isSelfOverlapping(new byte[] {'c', 'a', 'b', 'd', 'a', 'b'})); |
| assertFalse(TextIO.Read.isSelfOverlapping(new byte[] {'a', 'b', 'c', 'a', 'b', 'd'})); |
| assertTrue(TextIO.Read.isSelfOverlapping(new byte[] {'a', 'b', 'a'})); |
| assertTrue(TextIO.Read.isSelfOverlapping(new byte[] {'a', 'b', 'c', 'a', 'b'})); |
| } |
| |
| @Test |
| @Category(NeedsRunner.class) |
| public void testReadStringsWithCustomDelimiter() throws Exception { |
| final String[] inputStrings = |
| new String[] { |
| // incomplete delimiter |
| "To be, or not to be: that |is the question: ", |
| // incomplete delimiter |
| "To be, or not to be: that *is the question: ", |
| // complete delimiter |
| "Whether 'tis nobler in the mind to suffer |*", |
| // truncated delimiter |
| "The slings and arrows of outrageous fortune,|" |
| }; |
| |
| File tmpFile = tempFolder.newFile("tmpfile.txt"); |
| String filename = tmpFile.getPath(); |
| |
| try (Writer writer = Files.newBufferedWriter(tmpFile.toPath(), UTF_8)) { |
| writer.write(Joiner.on("").join(inputStrings)); |
| } |
| |
| PAssert.that(p.apply(TextIO.read().from(filename).withDelimiter(new byte[] {'|', '*'}))) |
| .containsInAnyOrder( |
| "To be, or not to be: that |is the question: To be, or not to be: " |
| + "that *is the question: Whether 'tis nobler in the mind to suffer ", |
| "The slings and arrows of outrageous fortune,|"); |
| p.run(); |
| } |
| |
| @Test |
| public void testSplittingSourceWithCustomDelimiter() throws Exception { |
| List<String> testCases = Lists.newArrayList(); |
| String infix = "first|*second|*|*third"; |
| String[] affixes = new String[] {"", "|", "*", "|*"}; |
| for (String prefix : affixes) { |
| for (String suffix : affixes) { |
| testCases.add(prefix + infix + suffix); |
| } |
| } |
| for (String testCase : testCases) { |
| SourceTestUtils.assertSplitAtFractionExhaustive( |
| TextIOReadTest.prepareSource( |
| tempFolder, testCase.getBytes(UTF_8), new byte[] {'|', '*'}), |
| PipelineOptionsFactory.create()); |
| } |
| } |
| |
| @Test |
| @Category(NeedsRunner.class) |
| public void testReadStrings() throws Exception { |
| runTestRead(LINES_ARRAY); |
| } |
| |
| @Test |
| @Category(NeedsRunner.class) |
| public void testReadEmptyStrings() throws Exception { |
| runTestRead(NO_LINES_ARRAY); |
| } |
| |
| @Test |
| public void testReadNamed() throws Exception { |
| File emptyFile = tempFolder.newFile(); |
| p.enableAbandonedNodeEnforcement(false); |
| |
| assertThat(p.apply(TextIO.read().from("somefile")).getName(), startsWith("TextIO.Read/Read")); |
| assertThat( |
| p.apply("MyRead", TextIO.read().from(emptyFile.getPath())).getName(), |
| startsWith("MyRead/Read")); |
| } |
| |
| @Test |
| public void testReadDisplayData() { |
| TextIO.Read read = TextIO.read().from("foo.*").withCompression(BZIP2); |
| |
| DisplayData displayData = DisplayData.from(read); |
| |
| assertThat(displayData, hasDisplayItem("filePattern", "foo.*")); |
| assertThat(displayData, hasDisplayItem("compressionType", BZIP2.toString())); |
| } |
| |
| /** Options for testing. */ |
| public interface RuntimeTestOptions extends PipelineOptions { |
| ValueProvider<String> getInput(); |
| |
| void setInput(ValueProvider<String> value); |
| } |
| |
| @Test |
| public void testRuntimeOptionsNotCalledInApply() throws Exception { |
| p.enableAbandonedNodeEnforcement(false); |
| |
| RuntimeTestOptions options = PipelineOptionsFactory.as(RuntimeTestOptions.class); |
| |
| p.apply(TextIO.read().from(options.getInput())); |
| } |
| |
| @Test |
| public void testCompressionIsSet() throws Exception { |
| TextIO.Read read = TextIO.read().from("/tmp/test"); |
| assertEquals(AUTO, read.getCompression()); |
| read = TextIO.read().from("/tmp/test").withCompression(GZIP); |
| assertEquals(GZIP, read.getCompression()); |
| } |
| |
| /** |
| * Tests reading from a small, uncompressed file with .gz extension. This must work in GZIP |
| * modes. This is needed because some network file systems / HTTP clients will transparently |
| * decompress gzipped content. |
| */ |
| @Test |
| @Category(NeedsRunner.class) |
| public void testSmallCompressedGzipReadActuallyUncompressed() throws Exception { |
| File smallGzNotCompressed = |
| writeToFile(TINY, tempFolder, "tiny_uncompressed.gz", UNCOMPRESSED); |
| // Should work with GZIP compression set. |
| assertReadingCompressedFileMatchesExpected(smallGzNotCompressed, GZIP, TINY, p); |
| p.run(); |
| } |
| |
| /** |
| * Tests reading from a small, uncompressed file with .gz extension. This must work in AUTO |
| * modes. This is needed because some network file systems / HTTP clients will transparently |
| * decompress gzipped content. |
| */ |
| @Test |
| @Category(NeedsRunner.class) |
| public void testSmallCompressedAutoReadActuallyUncompressed() throws Exception { |
| File smallGzNotCompressed = |
| writeToFile(TINY, tempFolder, "tiny_uncompressed.gz", UNCOMPRESSED); |
| // Should also work with AUTO mode set. |
| assertReadingCompressedFileMatchesExpected(smallGzNotCompressed, AUTO, TINY, p); |
| p.run(); |
| } |
| |
| /** |
| * Tests a zip file with no entries. This is a corner case not tested elsewhere as the default |
| * test zip files have a single entry. |
| */ |
| @Test |
| @Category(NeedsRunner.class) |
| public void testZipCompressedReadWithNoEntries() throws Exception { |
| File file = createZipFile(new ArrayList<>(), tempFolder, "empty zip file"); |
| assertReadingCompressedFileMatchesExpected(file, ZIP, EMPTY, p); |
| p.run(); |
| } |
| |
| /** |
| * Tests a zip file with multiple entries. This is a corner case not tested elsewhere as the |
| * default test zip files have a single entry. |
| */ |
| @Test |
| @Category(NeedsRunner.class) |
| public void testZipCompressedReadWithMultiEntriesFile() throws Exception { |
| String[] entry0 = new String[] {"first", "second", "three"}; |
| String[] entry1 = new String[] {"four", "five", "six"}; |
| String[] entry2 = new String[] {"seven", "eight", "nine"}; |
| |
| List<String> expected = new ArrayList<>(); |
| |
| File file = createZipFile(expected, tempFolder, "multiple entries", entry0, entry1, entry2); |
| assertReadingCompressedFileMatchesExpected(file, ZIP, expected, p); |
| p.run(); |
| } |
| |
| /** |
| * Read a ZIP compressed file containing data, multiple empty entries, and then more data. We |
| * expect just the data back. |
| */ |
| @Test |
| @Category(NeedsRunner.class) |
| public void testZipCompressedReadWithComplexEmptyAndPresentEntries() throws Exception { |
| File file = |
| createZipFile( |
| new ArrayList<>(), |
| tempFolder, |
| "complex empty and present entries", |
| new String[] {"cat"}, |
| new String[] {}, |
| new String[] {}, |
| new String[] {"dog"}); |
| |
| assertReadingCompressedFileMatchesExpected(file, ZIP, Arrays.asList("cat", "dog"), p); |
| p.run(); |
| } |
| |
| @Test |
| public void testTextIOGetName() { |
| assertEquals("TextIO.Read", TextIO.read().from("somefile").getName()); |
| assertEquals("TextIO.Read", TextIO.read().from("somefile").toString()); |
| } |
| |
| private TextSource prepareSource(byte[] data) throws IOException { |
| return TextIOReadTest.prepareSource(tempFolder, data, null); |
| } |
| |
| @Test |
| public void testProgressEmptyFile() throws IOException { |
| try (BoundedSource.BoundedReader<String> reader = |
| prepareSource(new byte[0]).createReader(PipelineOptionsFactory.create())) { |
| // Check preconditions before starting. |
| assertEquals(0.0, reader.getFractionConsumed(), 1e-6); |
| assertEquals(0, reader.getSplitPointsConsumed()); |
| assertEquals( |
| BoundedSource.BoundedReader.SPLIT_POINTS_UNKNOWN, reader.getSplitPointsRemaining()); |
| |
| // Assert empty |
| assertFalse(reader.start()); |
| |
| // Check postconditions after finishing |
| assertEquals(1.0, reader.getFractionConsumed(), 1e-6); |
| assertEquals(0, reader.getSplitPointsConsumed()); |
| assertEquals(0, reader.getSplitPointsRemaining()); |
| } |
| } |
| |
| @Test |
| public void testProgressTextFile() throws IOException { |
| String file = "line1\nline2\nline3"; |
| try (BoundedSource.BoundedReader<String> reader = |
| prepareSource(file.getBytes(Charsets.UTF_8)) |
| .createReader(PipelineOptionsFactory.create())) { |
| // Check preconditions before starting |
| assertEquals(0.0, reader.getFractionConsumed(), 1e-6); |
| assertEquals(0, reader.getSplitPointsConsumed()); |
| assertEquals( |
| BoundedSource.BoundedReader.SPLIT_POINTS_UNKNOWN, reader.getSplitPointsRemaining()); |
| |
| // Line 1 |
| assertTrue(reader.start()); |
| assertEquals(0, reader.getSplitPointsConsumed()); |
| assertEquals( |
| BoundedSource.BoundedReader.SPLIT_POINTS_UNKNOWN, reader.getSplitPointsRemaining()); |
| |
| // Line 2 |
| assertTrue(reader.advance()); |
| assertEquals(1, reader.getSplitPointsConsumed()); |
| assertEquals( |
| BoundedSource.BoundedReader.SPLIT_POINTS_UNKNOWN, reader.getSplitPointsRemaining()); |
| |
| // Line 3 |
| assertTrue(reader.advance()); |
| assertEquals(2, reader.getSplitPointsConsumed()); |
| assertEquals(1, reader.getSplitPointsRemaining()); |
| |
| // Check postconditions after finishing |
| assertFalse(reader.advance()); |
| assertEquals(1.0, reader.getFractionConsumed(), 1e-6); |
| assertEquals(3, reader.getSplitPointsConsumed()); |
| assertEquals(0, reader.getSplitPointsRemaining()); |
| } |
| } |
| |
| @Test |
| public void testProgressAfterSplitting() throws IOException { |
| String file = "line1\nline2\nline3"; |
| BoundedSource<String> source = prepareSource(file.getBytes(Charsets.UTF_8)); |
| BoundedSource<String> remainder; |
| |
| // Create the remainder, verifying properties pre- and post-splitting. |
| try (BoundedSource.BoundedReader<String> readerOrig = |
| source.createReader(PipelineOptionsFactory.create())) { |
| // Preconditions. |
| assertEquals(0.0, readerOrig.getFractionConsumed(), 1e-6); |
| assertEquals(0, readerOrig.getSplitPointsConsumed()); |
| assertEquals( |
| BoundedSource.BoundedReader.SPLIT_POINTS_UNKNOWN, readerOrig.getSplitPointsRemaining()); |
| |
| // First record, before splitting. |
| assertTrue(readerOrig.start()); |
| assertEquals(0, readerOrig.getSplitPointsConsumed()); |
| assertEquals( |
| BoundedSource.BoundedReader.SPLIT_POINTS_UNKNOWN, readerOrig.getSplitPointsRemaining()); |
| |
| // Split. 0.1 is in line1, so should now be able to detect last record. |
| remainder = readerOrig.splitAtFraction(0.1); |
| System.err.println(readerOrig.getCurrentSource()); |
| assertNotNull(remainder); |
| |
| // First record, after splitting. |
| assertEquals(0, readerOrig.getSplitPointsConsumed()); |
| assertEquals(1, readerOrig.getSplitPointsRemaining()); |
| |
| // Finish and postconditions. |
| assertFalse(readerOrig.advance()); |
| assertEquals(1.0, readerOrig.getFractionConsumed(), 1e-6); |
| assertEquals(1, readerOrig.getSplitPointsConsumed()); |
| assertEquals(0, readerOrig.getSplitPointsRemaining()); |
| } |
| |
| // Check the properties of the remainder. |
| try (BoundedSource.BoundedReader<String> reader = |
| remainder.createReader(PipelineOptionsFactory.create())) { |
| // Preconditions. |
| assertEquals(0.0, reader.getFractionConsumed(), 1e-6); |
| assertEquals(0, reader.getSplitPointsConsumed()); |
| assertEquals( |
| BoundedSource.BoundedReader.SPLIT_POINTS_UNKNOWN, reader.getSplitPointsRemaining()); |
| |
| // First record should be line 2. |
| assertTrue(reader.start()); |
| assertEquals(0, reader.getSplitPointsConsumed()); |
| assertEquals( |
| BoundedSource.BoundedReader.SPLIT_POINTS_UNKNOWN, reader.getSplitPointsRemaining()); |
| |
| // Second record is line 3 |
| assertTrue(reader.advance()); |
| assertEquals(1, reader.getSplitPointsConsumed()); |
| assertEquals(1, reader.getSplitPointsRemaining()); |
| |
| // Check postconditions after finishing |
| assertFalse(reader.advance()); |
| assertEquals(1.0, reader.getFractionConsumed(), 1e-6); |
| assertEquals(2, reader.getSplitPointsConsumed()); |
| assertEquals(0, reader.getSplitPointsRemaining()); |
| } |
| } |
| |
| @Test |
| public void testInitialSplitAutoModeTxt() throws Exception { |
| PipelineOptions options = TestPipeline.testingPipelineOptions(); |
| long desiredBundleSize = 1000; |
| File largeTxt = writeToFile(LARGE, tempFolder, "large.txt", UNCOMPRESSED); |
| |
| // Sanity check: file is at least 2 bundles long. |
| assertThat(largeTxt.length(), greaterThan(2 * desiredBundleSize)); |
| |
| FileBasedSource<String> source = TextIO.read().from(largeTxt.getPath()).getSource(); |
| List<? extends FileBasedSource<String>> splits = source.split(desiredBundleSize, options); |
| |
| // At least 2 splits and they are equal to reading the whole file. |
| assertThat(splits, hasSize(greaterThan(1))); |
| SourceTestUtils.assertSourcesEqualReferenceSource(source, splits, options); |
| } |
| |
| @Test |
| public void testInitialSplitAutoModeGz() throws Exception { |
| // TODO: Java core test failing on windows, https://issues.apache.org/jira/browse/BEAM-10746 |
| assumeFalse(SystemUtils.IS_OS_WINDOWS); |
| PipelineOptions options = TestPipeline.testingPipelineOptions(); |
| long desiredBundleSize = 1000; |
| File largeGz = writeToFile(LARGE, tempFolder, "large.gz", GZIP); |
| // Sanity check: file is at least 2 bundles long. |
| assertThat(largeGz.length(), greaterThan(2 * desiredBundleSize)); |
| |
| FileBasedSource<String> source = TextIO.read().from(largeGz.getPath()).getSource(); |
| List<? extends FileBasedSource<String>> splits = source.split(desiredBundleSize, options); |
| |
| // Exactly 1 split, even in AUTO mode, since it is a gzip file. |
| assertThat(splits, hasSize(equalTo(1))); |
| SourceTestUtils.assertSourcesEqualReferenceSource(source, splits, options); |
| } |
| |
| @Test |
| public void testInitialSplitGzipModeTxt() throws Exception { |
| PipelineOptions options = TestPipeline.testingPipelineOptions(); |
| long desiredBundleSize = 1000; |
| File largeTxt = writeToFile(LARGE, tempFolder, "large.txt", UNCOMPRESSED); |
| // Sanity check: file is at least 2 bundles long. |
| assertThat(largeTxt.length(), greaterThan(2 * desiredBundleSize)); |
| |
| FileBasedSource<String> source = |
| TextIO.read().from(largeTxt.getPath()).withCompression(GZIP).getSource(); |
| List<? extends FileBasedSource<String>> splits = source.split(desiredBundleSize, options); |
| |
| // Exactly 1 split, even though splittable text file, since using GZIP mode. |
| assertThat(splits, hasSize(equalTo(1))); |
| SourceTestUtils.assertSourcesEqualReferenceSource(source, splits, options); |
| } |
| |
| @Test |
| @Category(NeedsRunner.class) |
| public void testReadAll() throws IOException { |
| Path tempFolderPath = tempFolder.getRoot().toPath(); |
| writeToFile(TINY, tempFolder, "readAllTiny1.zip", ZIP); |
| writeToFile(TINY, tempFolder, "readAllTiny2.txt", UNCOMPRESSED); |
| writeToFile(LARGE, tempFolder, "readAllLarge1.zip", ZIP); |
| writeToFile(LARGE, tempFolder, "readAllLarge2.txt", UNCOMPRESSED); |
| PCollection<String> lines = |
| p.apply( |
| Create.of( |
| tempFolderPath.resolve("readAllTiny*").toString(), |
| tempFolderPath.resolve("readAllLarge*").toString())) |
| .apply(TextIO.readAll().withCompression(AUTO)); |
| PAssert.that(lines).containsInAnyOrder(Iterables.concat(TINY, TINY, LARGE, LARGE)); |
| p.run(); |
| } |
| |
| @Test |
| @Category(NeedsRunner.class) |
| public void testReadFiles() throws IOException { |
| Path tempFolderPath = tempFolder.getRoot().toPath(); |
| writeToFile(TINY, tempFolder, "readAllTiny1.zip", ZIP); |
| writeToFile(TINY, tempFolder, "readAllTiny2.txt", UNCOMPRESSED); |
| writeToFile(LARGE, tempFolder, "readAllLarge1.zip", ZIP); |
| writeToFile(LARGE, tempFolder, "readAllLarge2.txt", UNCOMPRESSED); |
| PCollection<String> lines = |
| p.apply( |
| Create.of( |
| tempFolderPath.resolve("readAllTiny*").toString(), |
| tempFolderPath.resolve("readAllLarge*").toString())) |
| .apply(FileIO.matchAll()) |
| .apply(FileIO.readMatches().withCompression(AUTO)) |
| .apply(TextIO.readFiles().withDesiredBundleSizeBytes(10)); |
| PAssert.that(lines).containsInAnyOrder(Iterables.concat(TINY, TINY, LARGE, LARGE)); |
| p.run(); |
| } |
| |
| @Test |
| @Category({NeedsRunner.class, UsesUnboundedSplittableParDo.class}) |
| public void testReadWatchForNewFiles() throws IOException, InterruptedException { |
| final Path basePath = tempFolder.getRoot().toPath().resolve("readWatch"); |
| basePath.toFile().mkdir(); |
| |
| p.apply(GenerateSequence.from(0).to(10).withRate(1, Duration.millis(100))) |
| .apply( |
| Window.<Long>into(FixedWindows.of(Duration.millis(150))) |
| .withAllowedLateness(Duration.ZERO) |
| .triggering(Repeatedly.forever(AfterPane.elementCountAtLeast(1))) |
| .discardingFiredPanes()) |
| .apply(ToString.elements()) |
| .apply( |
| TextIO.write() |
| .to(basePath.resolve("data").toString()) |
| .withNumShards(1) |
| .withWindowedWrites()); |
| |
| PCollection<String> lines = |
| p.apply( |
| TextIO.read() |
| .from(basePath.resolve("*").toString()) |
| .watchForNewFiles( |
| Duration.millis(100), |
| Watch.Growth.afterTimeSinceNewOutput(Duration.standardSeconds(3)))); |
| |
| PAssert.that(lines).containsInAnyOrder("0", "1", "2", "3", "4", "5", "6", "7", "8", "9"); |
| p.run(); |
| } |
| } |
| |
| /** Tests for TextSource class. */ |
| @RunWith(JUnit4.class) |
| public static class TextSourceTest { |
| @Rule public transient TestPipeline pipeline = TestPipeline.create(); |
| |
| @Test |
| @Category(NeedsRunner.class) |
| public void testRemoveUtf8BOM() throws Exception { |
| Path p1 = createTestFile("test_txt_ascii", Charset.forName("US-ASCII"), "1,p1", "2,p1"); |
| Path p2 = |
| createTestFile( |
| "test_txt_utf8_no_bom", |
| Charset.forName("UTF-8"), |
| "1,p2-Japanese:テスト", |
| "2,p2-Japanese:テスト"); |
| Path p3 = |
| createTestFile( |
| "test_txt_utf8_bom", |
| Charset.forName("UTF-8"), |
| "\uFEFF1,p3-テストBOM", |
| "\uFEFF2,p3-テストBOM"); |
| PCollection<String> contents = |
| pipeline |
| .apply("Create", Create.of(p1.toString(), p2.toString(), p3.toString())) |
| .setCoder(StringUtf8Coder.of()) |
| // PCollection<String> |
| .apply("Read file", new TextIOReadTest.TextSourceTest.TextFileReadTransform()); |
| // PCollection<KV<String, String>>: tableName, line |
| |
| // Validate that the BOM bytes (\uFEFF) at the beginning of the first line have been removed. |
| PAssert.that(contents) |
| .containsInAnyOrder( |
| "1,p1", |
| "2,p1", |
| "1,p2-Japanese:テスト", |
| "2,p2-Japanese:テスト", |
| "1,p3-テストBOM", |
| "\uFEFF2,p3-テストBOM"); |
| |
| pipeline.run(); |
| } |
| |
| @Test |
| @Category(NeedsRunner.class) |
| public void testPreserveNonBOMBytes() throws Exception { |
| // Contains \uFEFE, not UTF BOM. |
| Path p1 = |
| createTestFile( |
| "test_txt_utf_bom", Charset.forName("UTF-8"), "\uFEFE1,p1テスト", "\uFEFE2,p1テスト"); |
| PCollection<String> contents = |
| pipeline |
| .apply("Create", Create.of(p1.toString())) |
| .setCoder(StringUtf8Coder.of()) |
| // PCollection<String> |
| .apply("Read file", new TextIOReadTest.TextSourceTest.TextFileReadTransform()); |
| |
| PAssert.that(contents).containsInAnyOrder("\uFEFE1,p1テスト", "\uFEFE2,p1テスト"); |
| |
| pipeline.run(); |
| } |
| |
| private static class FileReadDoFn extends DoFn<FileIO.ReadableFile, String> { |
| |
| @ProcessElement |
| public void processElement(ProcessContext c) { |
| FileIO.ReadableFile file = c.element(); |
| ValueProvider<String> filenameProvider = |
| ValueProvider.StaticValueProvider.of(file.getMetadata().resourceId().getFilename()); |
| // Create a TextSource, passing null as the delimiter to use the default |
| // delimiters ('\n', '\r', or '\r\n'). |
| TextSource textSource = new TextSource(filenameProvider, null, null); |
| try { |
| BoundedSource.BoundedReader<String> reader = |
| textSource |
| .createForSubrangeOfFile(file.getMetadata(), 0, file.getMetadata().sizeBytes()) |
| .createReader(c.getPipelineOptions()); |
| for (boolean more = reader.start(); more; more = reader.advance()) { |
| c.output(reader.getCurrent()); |
| } |
| } catch (IOException e) { |
| throw new RuntimeException( |
| "Unable to readFile: " + file.getMetadata().resourceId().toString()); |
| } |
| } |
| } |
| |
| /** A transform that reads CSV file records. */ |
| private static class TextFileReadTransform |
| extends PTransform<PCollection<String>, PCollection<String>> { |
| public TextFileReadTransform() {} |
| |
| @Override |
| public PCollection<String> expand(PCollection<String> files) { |
| return files |
| // PCollection<String> |
| .apply(FileIO.matchAll().withEmptyMatchTreatment(EmptyMatchTreatment.DISALLOW)) |
| // PCollection<Match.Metadata> |
| .apply(FileIO.readMatches()) |
| // PCollection<FileIO.ReadableFile> |
| .apply("Read lines", ParDo.of(new TextIOReadTest.TextSourceTest.FileReadDoFn())); |
| // PCollection<String>: line |
| } |
| } |
| |
| private Path createTestFile(String filename, Charset charset, String... lines) |
| throws IOException { |
| Path path = Files.createTempFile(filename, ".csv"); |
| try (BufferedWriter writer = Files.newBufferedWriter(path, charset)) { |
| for (String line : lines) { |
| writer.write(line); |
| writer.write('\n'); |
| } |
| } |
| return path; |
| } |
| } |
| } |