[BEAM-167] Fix custom source gzip input to read concatenated gzip files
This applies patch from kirpichov@google.com from https://gist.github.com/jkff/d8d984a33a41ec607328cee8e418c174
diff --git a/sdk/src/main/java/com/google/cloud/dataflow/sdk/io/CompressedSource.java b/sdk/src/main/java/com/google/cloud/dataflow/sdk/io/CompressedSource.java
index e3dca91..b0636ea 100644
--- a/sdk/src/main/java/com/google/cloud/dataflow/sdk/io/CompressedSource.java
+++ b/sdk/src/main/java/com/google/cloud/dataflow/sdk/io/CompressedSource.java
@@ -121,7 +121,7 @@
byte zero = 0x00;
int header = Ints.fromBytes(zero, zero, headerBytes[1], headerBytes[0]);
if (header == GZIPInputStream.GZIP_MAGIC) {
- return Channels.newChannel(new GzipCompressorInputStream(stream));
+ return Channels.newChannel(new GzipCompressorInputStream(stream, true));
}
}
return Channels.newChannel(stream);
diff --git a/sdk/src/test/java/com/google/cloud/dataflow/sdk/io/CompressedSourceTest.java b/sdk/src/test/java/com/google/cloud/dataflow/sdk/io/CompressedSourceTest.java
index 14c8fe9..86c39a4 100644
--- a/sdk/src/test/java/com/google/cloud/dataflow/sdk/io/CompressedSourceTest.java
+++ b/sdk/src/test/java/com/google/cloud/dataflow/sdk/io/CompressedSourceTest.java
@@ -46,16 +46,19 @@
import org.junit.runner.RunWith;
import org.junit.runners.JUnit4;
+import java.io.ByteArrayOutputStream;
import java.io.File;
import java.io.FileOutputStream;
import java.io.IOException;
import java.io.OutputStream;
import java.nio.ByteBuffer;
import java.nio.channels.ReadableByteChannel;
+import java.nio.charset.StandardCharsets;
import java.util.ArrayList;
import java.util.List;
import java.util.NoSuchElementException;
import java.util.Random;
+import java.util.zip.GZIPOutputStream;
import javax.annotation.Nullable;
@@ -97,6 +100,43 @@
runReadTest(input, CompressionMode.GZIP);
}
+ private static byte[] compressGzip(byte[] input) throws IOException {
+ ByteArrayOutputStream res = new ByteArrayOutputStream();
+ try (GZIPOutputStream gzipStream = new GZIPOutputStream(res)) {
+ gzipStream.write(input);
+ }
+ return res.toByteArray();
+ }
+
+ private static byte[] concat(byte[] first, byte[] second) {
+ byte[] res = new byte[first.length + second.length];
+ System.arraycopy(first, 0, res, 0, first.length);
+ System.arraycopy(second, 0, res, first.length, second.length);
+ return res;
+ }
+
+ @Test
+ public void testReadConcatenatedGzip() throws IOException {
+ byte[] header = "a,b,c\n".getBytes(StandardCharsets.UTF_8);
+ byte[] body = "1,2,3\n4,5,6\n7,8,9\n".getBytes(StandardCharsets.UTF_8);
+ byte[] expected = concat(header, body);
+ byte[] totalGz = concat(compressGzip(header), compressGzip(body));
+ File tmpFile = tmpFolder.newFile();
+ try (FileOutputStream os = new FileOutputStream(tmpFile)) {
+ os.write(totalGz);
+ }
+
+ Pipeline p = TestPipeline.create();
+
+ CompressedSource<Byte> source =
+ CompressedSource.from(new ByteSource(tmpFile.getAbsolutePath(), 1))
+ .withDecompression(CompressionMode.GZIP);
+ PCollection<Byte> output = p.apply(Read.from(source));
+
+ DataflowAssert.that(output).containsInAnyOrder(Bytes.asList(expected));
+ p.run();
+ }
+
/**
* Test reading empty input with bzip2.
*/