NIFI-388: Refactored to make compression codec flexible
diff --git a/nifi/nifi-nar-bundles/nifi-provenance-repository-bundle/nifi-journaling-provenance-repository/src/main/java/org/apache/nifi/provenance/journaling/journals/CompressedOutputStream.java b/nifi/nifi-nar-bundles/nifi-provenance-repository-bundle/nifi-journaling-provenance-repository/src/main/java/org/apache/nifi/provenance/journaling/journals/CompressedOutputStream.java
new file mode 100644
index 0000000..618dd88
--- /dev/null
+++ b/nifi/nifi-nar-bundles/nifi-provenance-repository-bundle/nifi-journaling-provenance-repository/src/main/java/org/apache/nifi/provenance/journaling/journals/CompressedOutputStream.java
@@ -0,0 +1,36 @@
+/*

+ * Licensed to the Apache Software Foundation (ASF) under one or more

+ * contributor license agreements.  See the NOTICE file distributed with

+ * this work for additional information regarding copyright ownership.

+ * The ASF licenses this file to You under the Apache License, Version 2.0

+ * (the "License"); you may not use this file except in compliance with

+ * the License.  You may obtain a copy of the License at

+ *

+ *     http://www.apache.org/licenses/LICENSE-2.0

+ *

+ * Unless required by applicable law or agreed to in writing, software

+ * distributed under the License is distributed on an "AS IS" BASIS,

+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.

+ * See the License for the specific language governing permissions and

+ * limitations under the License.

+ */

+package org.apache.nifi.provenance.journaling.journals;

+

+import java.io.IOException;

+import java.io.OutputStream;

+

+public abstract class CompressedOutputStream extends OutputStream {

+

+    /**

+     * Begins a new compression block

+     * @throws IOException

+     */

+    public abstract void beginNewBlock() throws IOException;

+

+    /**

+     * Ends the current compression block

+     * @throws IOException

+     */

+    public abstract void finishBlock() throws IOException;

+    

+}

diff --git a/nifi/nifi-nar-bundles/nifi-provenance-repository-bundle/nifi-journaling-provenance-repository/src/main/java/org/apache/nifi/provenance/journaling/journals/CompressionCodec.java b/nifi/nifi-nar-bundles/nifi-provenance-repository-bundle/nifi-journaling-provenance-repository/src/main/java/org/apache/nifi/provenance/journaling/journals/CompressionCodec.java
new file mode 100644
index 0000000..f6e856e
--- /dev/null
+++ b/nifi/nifi-nar-bundles/nifi-provenance-repository-bundle/nifi-journaling-provenance-repository/src/main/java/org/apache/nifi/provenance/journaling/journals/CompressionCodec.java
@@ -0,0 +1,45 @@
+/*

+ * Licensed to the Apache Software Foundation (ASF) under one or more

+ * contributor license agreements.  See the NOTICE file distributed with

+ * this work for additional information regarding copyright ownership.

+ * The ASF licenses this file to You under the Apache License, Version 2.0

+ * (the "License"); you may not use this file except in compliance with

+ * the License.  You may obtain a copy of the License at

+ *

+ *     http://www.apache.org/licenses/LICENSE-2.0

+ *

+ * Unless required by applicable law or agreed to in writing, software

+ * distributed under the License is distributed on an "AS IS" BASIS,

+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.

+ * See the License for the specific language governing permissions and

+ * limitations under the License.

+ */

+package org.apache.nifi.provenance.journaling.journals;

+

+import java.io.IOException;

+import java.io.InputStream;

+import java.io.OutputStream;

+

+public interface CompressionCodec {

+    /**

+     * Returns the name of the compression codec

+     * @return

+     */

+    String getName();

+    

+    /**

+     * Wraps the given OutputStream so that data written will be compressed

+     * @param out

+     * @return

+     * @throws IOException

+     */

+    CompressedOutputStream newCompressionOutputStream(OutputStream out) throws IOException;

+    

+    /**

+     * Wraps the given InputStream so that data read will be decompressed

+     * @param in

+     * @return

+     * @throws IOException

+     */

+    InputStream newCompressionInputStream(InputStream in) throws IOException;

+}

diff --git a/nifi/nifi-nar-bundles/nifi-provenance-repository-bundle/nifi-journaling-provenance-repository/src/main/java/org/apache/nifi/provenance/journaling/journals/DeflatorCompressionCodec.java b/nifi/nifi-nar-bundles/nifi-provenance-repository-bundle/nifi-journaling-provenance-repository/src/main/java/org/apache/nifi/provenance/journaling/journals/DeflatorCompressionCodec.java
new file mode 100644
index 0000000..b9f2959
--- /dev/null
+++ b/nifi/nifi-nar-bundles/nifi-provenance-repository-bundle/nifi-journaling-provenance-repository/src/main/java/org/apache/nifi/provenance/journaling/journals/DeflatorCompressionCodec.java
@@ -0,0 +1,113 @@
+/*

+ * Licensed to the Apache Software Foundation (ASF) under one or more

+ * contributor license agreements.  See the NOTICE file distributed with

+ * this work for additional information regarding copyright ownership.

+ * The ASF licenses this file to You under the Apache License, Version 2.0

+ * (the "License"); you may not use this file except in compliance with

+ * the License.  You may obtain a copy of the License at

+ *

+ *     http://www.apache.org/licenses/LICENSE-2.0

+ *

+ * Unless required by applicable law or agreed to in writing, software

+ * distributed under the License is distributed on an "AS IS" BASIS,

+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.

+ * See the License for the specific language governing permissions and

+ * limitations under the License.

+ */

+package org.apache.nifi.provenance.journaling.journals;

+

+import java.io.IOException;

+import java.io.InputStream;

+import java.io.OutputStream;

+

+import org.apache.nifi.remote.io.CompressionInputStream;

+import org.apache.nifi.remote.io.CompressionOutputStream;

+

+public class DeflatorCompressionCodec implements CompressionCodec {

+    public static final String DEFLATOR_COMPRESSION_CODEC = "deflator-compression-codec";

+    

+    @Override

+    public String getName() {

+        return DEFLATOR_COMPRESSION_CODEC;

+    }

+

+    @Override

+    public CompressedOutputStream newCompressionOutputStream(final OutputStream out) throws IOException {

+        return new DeflatorOutputStream(out);

+    }

+

+    @Override

+    public InputStream newCompressionInputStream(final InputStream in) throws IOException {

+        return new CompressionInputStream(in);

+    }

+

+    

+    private static class DeflatorOutputStream extends CompressedOutputStream {

+        private final OutputStream originalOut;

+        private CompressionOutputStream compressionOutput;

+        

+        public DeflatorOutputStream(final OutputStream out) {

+            this.originalOut = out;

+        }

+        

+        private void verifyState() {

+            if ( compressionOutput == null ) {

+                throw new IllegalStateException("No Compression Block has been created");

+            }

+        }

+        

+        @Override

+        public void write(final int b) throws IOException {

+            verifyState();

+            compressionOutput.write(b);

+        }

+        

+        @Override

+        public void write(final byte[] b) throws IOException {

+            verifyState();

+            compressionOutput.write(b);

+        }

+        

+        @Override

+        public void write(final byte[] b, final int off, final int len) throws IOException {

+            verifyState();

+            compressionOutput.write(b, off, len);

+        }

+        

+        @Override

+        public void flush() throws IOException {

+            if ( compressionOutput != null ) {

+                compressionOutput.flush();

+            }

+        }

+        

+        @Override

+        public void close() throws IOException {

+            if ( compressionOutput != null ) {

+                compressionOutput.close();

+            }

+            

+            originalOut.close();

+        }

+        

+        @Override

+        public void beginNewBlock() throws IOException {

+            compressionOutput = new CompressionOutputStream(originalOut);

+        }

+        

+        @Override

+        public void finishBlock() throws IOException {

+            // Calling close() on CompressionOutputStream doesn't close the underlying stream -- it is designed

+            // such that calling close() will write out the Compression footer and become unusable but not

+            // close the underlying stream because the whole point of CompressionOutputStream as opposed to

+            // GZIPOutputStream is that with CompressionOutputStream we can concatenate many together on a single

+            // stream.

+            if ( compressionOutput == null ) {

+                return;

+            } else {

+                compressionOutput.close();

+            }

+        }

+    }

+    

+}

diff --git a/nifi/nifi-nar-bundles/nifi-provenance-repository-bundle/nifi-journaling-provenance-repository/src/main/java/org/apache/nifi/provenance/journaling/journals/StandardJournalReader.java b/nifi/nifi-nar-bundles/nifi-provenance-repository-bundle/nifi-journaling-provenance-repository/src/main/java/org/apache/nifi/provenance/journaling/journals/StandardJournalReader.java
index 13878f8..9a937b4 100644
--- a/nifi/nifi-nar-bundles/nifi-provenance-repository-bundle/nifi-journaling-provenance-repository/src/main/java/org/apache/nifi/provenance/journaling/journals/StandardJournalReader.java
+++ b/nifi/nifi-nar-bundles/nifi-provenance-repository-bundle/nifi-journaling-provenance-repository/src/main/java/org/apache/nifi/provenance/journaling/journals/StandardJournalReader.java
@@ -27,7 +27,6 @@
 import org.apache.nifi.provenance.ProvenanceEventRecord;

 import org.apache.nifi.provenance.journaling.io.Deserializer;

 import org.apache.nifi.provenance.journaling.io.Deserializers;

-import org.apache.nifi.remote.io.CompressionInputStream;

 import org.apache.nifi.stream.io.ByteCountingInputStream;

 import org.apache.nifi.stream.io.LimitingInputStream;

 import org.apache.nifi.stream.io.MinimumLengthInputStream;

@@ -49,7 +48,7 @@
     

     private Deserializer deserializer;

     private int serializationVersion;

-    private boolean compressed;

+    private CompressionCodec compressionCodec = null;

     

     private long lastEventIdRead = -1L;

     

@@ -68,7 +67,15 @@
             StandardJournalMagicHeader.read(dis);

             final String codecName = dis.readUTF();

             serializationVersion = dis.readInt();

-            compressed = dis.readBoolean();

+            final boolean compressed = dis.readBoolean();

+            if ( compressed ) {

+                final String compressionCodecName = dis.readUTF();

+                if ( DeflatorCompressionCodec.DEFLATOR_COMPRESSION_CODEC.equals(compressionCodecName) ) {

+                    compressionCodec = new DeflatorCompressionCodec();

+                } else {

+                    throw new IOException(file + " is compressed using unknown Compression Codec " + compressionCodecName);

+                }

+            }

             deserializer = Deserializers.getDeserializer(codecName);

             

             resetDecompressedStream();

@@ -83,10 +90,10 @@
     

     

     private void resetDecompressedStream() throws IOException {

-        if ( compressed ) {

-            decompressedStream = new ByteCountingInputStream(new BufferedInputStream(new CompressionInputStream(compressedStream)), compressedStream.getBytesConsumed());

-        } else {

+        if ( compressionCodec == null ) {

             decompressedStream = compressedStream;

+        } else {

+            decompressedStream = new ByteCountingInputStream(new BufferedInputStream(compressionCodec.newCompressionInputStream(compressedStream)), compressedStream.getBytesConsumed());

         }

     }

     

@@ -129,7 +136,7 @@
             

             // we are allowed to span blocks. We're out of data but if we are compressed, it could

             // just mean that the block has ended.

-            if ( !compressed ) {

+            if ( compressionCodec == null ) {

                 return null;

             }

             

diff --git a/nifi/nifi-nar-bundles/nifi-provenance-repository-bundle/nifi-journaling-provenance-repository/src/main/java/org/apache/nifi/provenance/journaling/journals/StandardJournalWriter.java b/nifi/nifi-nar-bundles/nifi-provenance-repository-bundle/nifi-journaling-provenance-repository/src/main/java/org/apache/nifi/provenance/journaling/journals/StandardJournalWriter.java
index a9cb361..d18b05b 100644
--- a/nifi/nifi-nar-bundles/nifi-provenance-repository-bundle/nifi-journaling-provenance-repository/src/main/java/org/apache/nifi/provenance/journaling/journals/StandardJournalWriter.java
+++ b/nifi/nifi-nar-bundles/nifi-provenance-repository-bundle/nifi-journaling-provenance-repository/src/main/java/org/apache/nifi/provenance/journaling/journals/StandardJournalWriter.java
@@ -32,7 +32,6 @@
 

 import org.apache.nifi.provenance.ProvenanceEventRecord;

 import org.apache.nifi.provenance.journaling.io.Serializer;

-import org.apache.nifi.remote.io.CompressionOutputStream;

 import org.apache.nifi.stream.io.BufferedOutputStream;

 import org.apache.nifi.stream.io.ByteArrayOutputStream;

 import org.apache.nifi.stream.io.ByteCountingOutputStream;

@@ -67,11 +66,13 @@
  * 

  * Where <header> is defined as:

  * <pre>

+ *  magic header "NiFiProvJournal_1"

  *  String: serialization codec name (retrieved from serializer)

  *      --> 2 bytes for length of string

  *      --> N bytes for actual serialization codec name

  *  int: serialization version

  *  boolean: compressed: 1 -> compressed, 0 -> not compressed

+ *  String : if compressed, name of compression codec; otherwise, not present

  * </pre>

  * 

  * And &lt;record&gt; is defined as:

@@ -94,7 +95,7 @@
     

     private final long journalId;

     private final File journalFile;

-    private final boolean compressed;

+    private final CompressionCodec compressionCodec;

     private final Serializer serializer;

     private final long creationTime = System.nanoTime();

     private final String description;

@@ -111,7 +112,7 @@
     private long recordCount = 1L;

     

     

-    public StandardJournalWriter(final long journalId, final File journalFile, final boolean compressed, final Serializer serializer) throws IOException {

+    public StandardJournalWriter(final long journalId, final File journalFile, final CompressionCodec compressionCodec, final Serializer serializer) throws IOException {

         if ( journalFile.exists() ) {

             // Check if there is actually any data here.

             try (final InputStream fis = new FileInputStream(journalFile);

@@ -133,7 +134,7 @@
         

         this.journalId = journalId;

         this.journalFile = journalFile;

-        this.compressed = compressed;

+        this.compressionCodec = compressionCodec;

         this.serializer = serializer;

         this.description = "Journal Writer for " + journalFile;

         this.fos = new FileOutputStream(journalFile);

@@ -141,8 +142,10 @@
         uncompressedStream = new ByteCountingOutputStream(fos);

         writeHeader(uncompressedStream);

         

-        if (compressed) {

-            compressedStream = new CompressionOutputStream(uncompressedStream);

+        if (compressionCodec != null) {

+            final CompressedOutputStream cos = compressionCodec.newCompressionOutputStream(uncompressedStream);

+            cos.beginNewBlock();

+            compressedStream = cos;

         } else {

             compressedStream = fos;

         }

@@ -155,7 +158,13 @@
         StandardJournalMagicHeader.write(out);

         dos.writeUTF(serializer.getCodecName());

         dos.writeInt(serializer.getVersion());

+        

+        final boolean compressed = compressionCodec != null;

         dos.writeBoolean(compressed);

+        if ( compressed ) {

+            dos.writeUTF(compressionCodec.getName());

+        }

+        

         dos.flush();

     }

     

@@ -258,6 +267,7 @@
     public long getAge(final TimeUnit timeUnit) {

         return timeUnit.convert(System.nanoTime() - creationTime, TimeUnit.NANOSECONDS);

     }

+    

 

     @Override

     public void finishBlock() throws IOException {

@@ -266,16 +276,10 @@
         }

         

         blockStarted = false;

-        if ( !compressed ) {

-            return;

+        

+        if ( compressedStream instanceof CompressedOutputStream ) {

+            ((CompressedOutputStream) compressedStream).finishBlock();

         }

-

-        // Calling close() on CompressionOutputStream doesn't close the underlying stream -- it is designed

-        // such that calling close() will write out the Compression footer and become unusable but not

-        // close the underlying stream because the whole point of CompressionOutputStream as opposed to

-        // GZIPOutputStream is that with CompressionOutputStream we can concatenate many together on a single

-        // stream.

-        compressedStream.close();

     }

     

     @Override

@@ -285,15 +289,10 @@
         }

         blockStarted = true;

         

-        if ( !compressed ) {

-            return;

+        if ( compressedStream instanceof CompressedOutputStream ) {

+            ((CompressedOutputStream) compressedStream).beginNewBlock();

+            this.out = new ByteCountingOutputStream(compressedStream, uncompressedStream.getBytesWritten());

         }

-        if ( eventCount == 0 ) {

-            return;

-        }

-        

-        this.compressedStream = new CompressionOutputStream(uncompressedStream);

-        this.out = new ByteCountingOutputStream(compressedStream, uncompressedStream.getBytesWritten());

     }

     

     @Override

diff --git a/nifi/nifi-nar-bundles/nifi-provenance-repository-bundle/nifi-journaling-provenance-repository/src/main/java/org/apache/nifi/provenance/journaling/partition/JournalingPartition.java b/nifi/nifi-nar-bundles/nifi-provenance-repository-bundle/nifi-journaling-provenance-repository/src/main/java/org/apache/nifi/provenance/journaling/partition/JournalingPartition.java
index bba6899..31371af 100644
--- a/nifi/nifi-nar-bundles/nifi-provenance-repository-bundle/nifi-journaling-provenance-repository/src/main/java/org/apache/nifi/provenance/journaling/partition/JournalingPartition.java
+++ b/nifi/nifi-nar-bundles/nifi-provenance-repository-bundle/nifi-journaling-provenance-repository/src/main/java/org/apache/nifi/provenance/journaling/partition/JournalingPartition.java
@@ -252,7 +252,7 @@
         

         // create new writers and reset state.

         final File journalFile = new File(journalsDir, firstEventId + JOURNAL_FILE_EXTENSION);

-        journalWriter = new StandardJournalWriter(firstEventId, journalFile, false, new StandardEventSerializer());

+        journalWriter = new StandardJournalWriter(firstEventId, journalFile, null, new StandardEventSerializer());

         try {

             tocWriter = new StandardTocWriter(QueryUtils.getTocFile(journalFile), false, config.isAlwaysSync());

             tocWriter.addBlockOffset(journalWriter.getSize());

diff --git a/nifi/nifi-nar-bundles/nifi-provenance-repository-bundle/nifi-journaling-provenance-repository/src/main/java/org/apache/nifi/provenance/journaling/tasks/CompressionTask.java b/nifi/nifi-nar-bundles/nifi-provenance-repository-bundle/nifi-journaling-provenance-repository/src/main/java/org/apache/nifi/provenance/journaling/tasks/CompressionTask.java
index fc9fb46..7977620 100644
--- a/nifi/nifi-nar-bundles/nifi-provenance-repository-bundle/nifi-journaling-provenance-repository/src/main/java/org/apache/nifi/provenance/journaling/tasks/CompressionTask.java
+++ b/nifi/nifi-nar-bundles/nifi-provenance-repository-bundle/nifi-journaling-provenance-repository/src/main/java/org/apache/nifi/provenance/journaling/tasks/CompressionTask.java
@@ -26,6 +26,7 @@
 

 import org.apache.nifi.provenance.ProvenanceEventRecord;

 import org.apache.nifi.provenance.journaling.io.StandardEventSerializer;

+import org.apache.nifi.provenance.journaling.journals.DeflatorCompressionCodec;

 import org.apache.nifi.provenance.journaling.journals.JournalReader;

 import org.apache.nifi.provenance.journaling.journals.JournalWriter;

 import org.apache.nifi.provenance.journaling.journals.StandardJournalReader;

@@ -153,7 +154,7 @@
             }

             

             try (final JournalReader journalReader = new StandardJournalReader(journalFile);

-                final JournalWriter compressedWriter = new StandardJournalWriter(journalId, compressedFile, true, new StandardEventSerializer());

+                final JournalWriter compressedWriter = new StandardJournalWriter(journalId, compressedFile, new DeflatorCompressionCodec(), new StandardEventSerializer());

                 final TocReader tocReader = new StandardTocReader(tocFile);

                 final TocWriter compressedTocWriter = new StandardTocWriter(compressedTocFile, true, false)) {

                 

diff --git a/nifi/nifi-nar-bundles/nifi-provenance-repository-bundle/nifi-journaling-provenance-repository/src/test/java/org/apache/nifi/provenance/journaling/journals/TestJournalReadWrite.java b/nifi/nifi-nar-bundles/nifi-provenance-repository-bundle/nifi-journaling-provenance-repository/src/test/java/org/apache/nifi/provenance/journaling/journals/TestJournalReadWrite.java
index f2266e2..89eace7 100644
--- a/nifi/nifi-nar-bundles/nifi-provenance-repository-bundle/nifi-journaling-provenance-repository/src/test/java/org/apache/nifi/provenance/journaling/journals/TestJournalReadWrite.java
+++ b/nifi/nifi-nar-bundles/nifi-provenance-repository-bundle/nifi-journaling-provenance-repository/src/test/java/org/apache/nifi/provenance/journaling/journals/TestJournalReadWrite.java
@@ -42,7 +42,8 @@
         final StandardEventSerializer serializer = new StandardEventSerializer();

         

         try {

-            try (final StandardJournalWriter writer = new StandardJournalWriter(journalId, journalFile, compressed, serializer)) {

+            final CompressionCodec codec = compressed ? new DeflatorCompressionCodec() : null;

+            try (final StandardJournalWriter writer = new StandardJournalWriter(journalId, journalFile, codec, serializer)) {

                 for (int block=0; block < 100; block++) {

                     writer.beginNewBlock();

                     

diff --git a/nifi/nifi-nar-bundles/nifi-provenance-repository-bundle/nifi-journaling-provenance-repository/src/test/java/org/apache/nifi/provenance/journaling/journals/TestStandardJournalReader.java b/nifi/nifi-nar-bundles/nifi-provenance-repository-bundle/nifi-journaling-provenance-repository/src/test/java/org/apache/nifi/provenance/journaling/journals/TestStandardJournalReader.java
index 9f0ba99..f29af1b 100644
--- a/nifi/nifi-nar-bundles/nifi-provenance-repository-bundle/nifi-journaling-provenance-repository/src/test/java/org/apache/nifi/provenance/journaling/journals/TestStandardJournalReader.java
+++ b/nifi/nifi-nar-bundles/nifi-provenance-repository-bundle/nifi-journaling-provenance-repository/src/test/java/org/apache/nifi/provenance/journaling/journals/TestStandardJournalReader.java
@@ -209,6 +209,8 @@
     @Test

     public void testReadFirstEventCompressed() throws IOException {

         dos.writeBoolean(true);

+        dos.writeUTF(DeflatorCompressionCodec.DEFLATOR_COMPRESSION_CODEC);

+

         writeRecords(88L, 1, true);

         

         // write data to a file so that we can read it with the journal reader

@@ -235,6 +237,8 @@
     @Test

     public void testReadManyCompressed() throws IOException {

         dos.writeBoolean(true);

+        dos.writeUTF(DeflatorCompressionCodec.DEFLATOR_COMPRESSION_CODEC);

+

         writeRecords(0, 1024, true);

         

         // write data to a file so that we can read it with the journal reader

@@ -266,6 +270,7 @@
     @Test

     public void testReadFirstEventWithBlockOffsetCompressed() throws IOException {

         dos.writeBoolean(true);

+        dos.writeUTF(DeflatorCompressionCodec.DEFLATOR_COMPRESSION_CODEC);

         writeRecords(0, 10, true);

         

         final int secondBlockOffset = baos.size();

@@ -295,6 +300,8 @@
     @Test

     public void testReadSubsequentEventWithBlockOffsetCompressed() throws IOException {

         dos.writeBoolean(true);

+        dos.writeUTF(DeflatorCompressionCodec.DEFLATOR_COMPRESSION_CODEC);

+

         writeRecords(0, 10, true);

         

         final int secondBlockOffset = baos.size();

@@ -324,6 +331,8 @@
     @Test

     public void testReadMultipleEventsWithBlockOffsetCompressed() throws IOException {

         dos.writeBoolean(true);

+        dos.writeUTF(DeflatorCompressionCodec.DEFLATOR_COMPRESSION_CODEC);

+

         writeRecords(0, 10, true);

         

         final int secondBlockOffset = baos.size();

@@ -417,6 +426,8 @@
     @Test

     public void testReadEventWithBlockOffsetThenPreviousBlockOffsetCompressed() throws IOException {

         dos.writeBoolean(true);

+        dos.writeUTF(DeflatorCompressionCodec.DEFLATOR_COMPRESSION_CODEC);

+

         final int firstBlockOffset = baos.size();

         writeRecords(0, 10, true);

         

diff --git a/nifi/nifi-nar-bundles/nifi-provenance-repository-bundle/nifi-journaling-provenance-repository/src/test/java/org/apache/nifi/provenance/journaling/journals/TestStandardJournalWriter.java b/nifi/nifi-nar-bundles/nifi-provenance-repository-bundle/nifi-journaling-provenance-repository/src/test/java/org/apache/nifi/provenance/journaling/journals/TestStandardJournalWriter.java
index e8a6787..956df80 100644
--- a/nifi/nifi-nar-bundles/nifi-provenance-repository-bundle/nifi-journaling-provenance-repository/src/test/java/org/apache/nifi/provenance/journaling/journals/TestStandardJournalWriter.java
+++ b/nifi/nifi-nar-bundles/nifi-provenance-repository-bundle/nifi-journaling-provenance-repository/src/test/java/org/apache/nifi/provenance/journaling/journals/TestStandardJournalWriter.java
@@ -46,7 +46,7 @@
         try {

             assertTrue( journalFile.createNewFile() );

             

-            try (final StandardJournalWriter writer = new StandardJournalWriter(1L, journalFile, true, new StandardEventSerializer())) {

+            try (final StandardJournalWriter writer = new StandardJournalWriter(1L, journalFile, new DeflatorCompressionCodec(), new StandardEventSerializer())) {

                 

             }

         } finally {

@@ -60,11 +60,11 @@
         try {

             assertTrue( journalFile.createNewFile() );

             

-            try (final StandardJournalWriter writer = new StandardJournalWriter(1L, journalFile, true, new StandardEventSerializer())) {

+            try (final StandardJournalWriter writer = new StandardJournalWriter(1L, journalFile, new DeflatorCompressionCodec(), new StandardEventSerializer())) {

                 writer.write(Collections.singleton(TestUtil.generateEvent(1L)), 1L);

             }

             

-            try (final StandardJournalWriter writer = new StandardJournalWriter(1L, journalFile, true, new StandardEventSerializer())) {

+            try (final StandardJournalWriter writer = new StandardJournalWriter(1L, journalFile, new DeflatorCompressionCodec(), new StandardEventSerializer())) {

                 Assert.fail("StandardJournalWriter attempted to overwrite existing file");

             } catch (final FileAlreadyExistsException faee) {

                 // expected

@@ -80,7 +80,7 @@
         

         final StandardEventSerializer serializer = new StandardEventSerializer();

         try {

-            try (final StandardJournalWriter writer = new StandardJournalWriter(1L, journalFile, true, serializer)) {

+            try (final StandardJournalWriter writer = new StandardJournalWriter(1L, journalFile, new DeflatorCompressionCodec(), serializer)) {

                 writer.beginNewBlock();

                 writer.write(Collections.singleton(TestUtil.generateEvent(1L)), 1L);

                 writer.finishBlock();

@@ -100,6 +100,10 @@
             // compression flag

             assertEquals(true, dis.readBoolean());

             

+            // compression codec name

+            final String compressionCodecName = dis.readUTF();

+            assertEquals(DeflatorCompressionCodec.DEFLATOR_COMPRESSION_CODEC, compressionCodecName);

+            

             // read block start

             final CompressionInputStream decompressedIn = new CompressionInputStream(bais);

             final StandardEventDeserializer deserializer = new StandardEventDeserializer();

@@ -123,7 +127,7 @@
         

         final StandardEventSerializer serializer = new StandardEventSerializer();

         try {

-            try (final StandardJournalWriter writer = new StandardJournalWriter(1L, journalFile, true, serializer)) {

+            try (final StandardJournalWriter writer = new StandardJournalWriter(1L, journalFile, new DeflatorCompressionCodec(), serializer)) {

                 for (int i=0; i < 1024; i++) {

                     writer.beginNewBlock();

                     writer.write(Collections.singleton(TestUtil.generateEvent(1L)), 1L);

@@ -145,6 +149,8 @@
             // compression flag

             assertEquals(true, dis.readBoolean());

             

+            assertEquals(DeflatorCompressionCodec.DEFLATOR_COMPRESSION_CODEC, dis.readUTF());

+            

             // read block start

             for (int i=0; i < 1024; i++) {

                 final CompressionInputStream decompressedIn = new CompressionInputStream(bais);