[BEAM-167] Fix custom source gzip input to read concatenated gzip files

This applies patch from kirpichov@google.com from https://gist.github.com/jkff/d8d984a33a41ec607328cee8e418c174
diff --git a/sdk/src/main/java/com/google/cloud/dataflow/sdk/io/CompressedSource.java b/sdk/src/main/java/com/google/cloud/dataflow/sdk/io/CompressedSource.java
index e3dca91..b0636ea 100644
--- a/sdk/src/main/java/com/google/cloud/dataflow/sdk/io/CompressedSource.java
+++ b/sdk/src/main/java/com/google/cloud/dataflow/sdk/io/CompressedSource.java
@@ -121,7 +121,7 @@
           byte zero = 0x00;
           int header = Ints.fromBytes(zero, zero, headerBytes[1], headerBytes[0]);
           if (header == GZIPInputStream.GZIP_MAGIC) {
-            return Channels.newChannel(new GzipCompressorInputStream(stream));
+            return Channels.newChannel(new GzipCompressorInputStream(stream, true));
           }
         }
         return Channels.newChannel(stream);
diff --git a/sdk/src/test/java/com/google/cloud/dataflow/sdk/io/CompressedSourceTest.java b/sdk/src/test/java/com/google/cloud/dataflow/sdk/io/CompressedSourceTest.java
index 14c8fe9..86c39a4 100644
--- a/sdk/src/test/java/com/google/cloud/dataflow/sdk/io/CompressedSourceTest.java
+++ b/sdk/src/test/java/com/google/cloud/dataflow/sdk/io/CompressedSourceTest.java
@@ -46,16 +46,19 @@
 import org.junit.runner.RunWith;
 import org.junit.runners.JUnit4;
 
+import java.io.ByteArrayOutputStream;
 import java.io.File;
 import java.io.FileOutputStream;
 import java.io.IOException;
 import java.io.OutputStream;
 import java.nio.ByteBuffer;
 import java.nio.channels.ReadableByteChannel;
+import java.nio.charset.StandardCharsets;
 import java.util.ArrayList;
 import java.util.List;
 import java.util.NoSuchElementException;
 import java.util.Random;
+import java.util.zip.GZIPOutputStream;
 
 import javax.annotation.Nullable;
 
@@ -97,6 +100,43 @@
     runReadTest(input, CompressionMode.GZIP);
   }
 
+  private static byte[] compressGzip(byte[] input) throws IOException {
+    ByteArrayOutputStream res = new ByteArrayOutputStream();
+    try (GZIPOutputStream gzipStream = new GZIPOutputStream(res)) {
+      gzipStream.write(input);
+    }
+    return res.toByteArray();
+  }
+
+  private static byte[] concat(byte[] first, byte[] second) {
+    byte[] res = new byte[first.length + second.length];
+    System.arraycopy(first, 0, res, 0, first.length);
+    System.arraycopy(second, 0, res, first.length, second.length);
+    return res;
+  }
+
+  @Test
+  public void testReadConcatenatedGzip() throws IOException {
+    byte[] header = "a,b,c\n".getBytes(StandardCharsets.UTF_8);
+    byte[] body = "1,2,3\n4,5,6\n7,8,9\n".getBytes(StandardCharsets.UTF_8);
+    byte[] expected = concat(header, body);
+    byte[] totalGz = concat(compressGzip(header), compressGzip(body));
+    File tmpFile = tmpFolder.newFile();
+    try (FileOutputStream os = new FileOutputStream(tmpFile)) {
+      os.write(totalGz);
+    }
+
+    Pipeline p = TestPipeline.create();
+
+    CompressedSource<Byte> source =
+        CompressedSource.from(new ByteSource(tmpFile.getAbsolutePath(), 1))
+            .withDecompression(CompressionMode.GZIP);
+    PCollection<Byte> output = p.apply(Read.from(source));
+
+    DataflowAssert.that(output).containsInAnyOrder(Bytes.asList(expected));
+    p.run();
+  }
+
   /**
    * Test reading empty input with bzip2.
    */