NIFI-388: Refactored to make compression codec flexible
diff --git a/nifi/nifi-nar-bundles/nifi-provenance-repository-bundle/nifi-journaling-provenance-repository/src/main/java/org/apache/nifi/provenance/journaling/journals/CompressedOutputStream.java b/nifi/nifi-nar-bundles/nifi-provenance-repository-bundle/nifi-journaling-provenance-repository/src/main/java/org/apache/nifi/provenance/journaling/journals/CompressedOutputStream.java
new file mode 100644
index 0000000..618dd88
--- /dev/null
+++ b/nifi/nifi-nar-bundles/nifi-provenance-repository-bundle/nifi-journaling-provenance-repository/src/main/java/org/apache/nifi/provenance/journaling/journals/CompressedOutputStream.java
@@ -0,0 +1,36 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.nifi.provenance.journaling.journals;
+
+import java.io.IOException;
+import java.io.OutputStream;
+
+public abstract class CompressedOutputStream extends OutputStream {
+
+ /**
+ * Begins a new compression block
+ * @throws IOException
+ */
+ public abstract void beginNewBlock() throws IOException;
+
+ /**
+ * Ends the current compression block
+ * @throws IOException
+ */
+ public abstract void finishBlock() throws IOException;
+
+}
diff --git a/nifi/nifi-nar-bundles/nifi-provenance-repository-bundle/nifi-journaling-provenance-repository/src/main/java/org/apache/nifi/provenance/journaling/journals/CompressionCodec.java b/nifi/nifi-nar-bundles/nifi-provenance-repository-bundle/nifi-journaling-provenance-repository/src/main/java/org/apache/nifi/provenance/journaling/journals/CompressionCodec.java
new file mode 100644
index 0000000..f6e856e
--- /dev/null
+++ b/nifi/nifi-nar-bundles/nifi-provenance-repository-bundle/nifi-journaling-provenance-repository/src/main/java/org/apache/nifi/provenance/journaling/journals/CompressionCodec.java
@@ -0,0 +1,45 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.nifi.provenance.journaling.journals;
+
+import java.io.IOException;
+import java.io.InputStream;
+import java.io.OutputStream;
+
+public interface CompressionCodec {
+ /**
+ * Returns the name of the compression codec
+ * @return
+ */
+ String getName();
+
+ /**
+ * Wraps the given OutputStream so that data written will be compressed
+ * @param out
+ * @return
+ * @throws IOException
+ */
+ CompressedOutputStream newCompressionOutputStream(OutputStream out) throws IOException;
+
+ /**
+ * Wraps the given InputStream so that data read will be decompressed
+ * @param in
+ * @return
+ * @throws IOException
+ */
+ InputStream newCompressionInputStream(InputStream in) throws IOException;
+}
diff --git a/nifi/nifi-nar-bundles/nifi-provenance-repository-bundle/nifi-journaling-provenance-repository/src/main/java/org/apache/nifi/provenance/journaling/journals/DeflatorCompressionCodec.java b/nifi/nifi-nar-bundles/nifi-provenance-repository-bundle/nifi-journaling-provenance-repository/src/main/java/org/apache/nifi/provenance/journaling/journals/DeflatorCompressionCodec.java
new file mode 100644
index 0000000..b9f2959
--- /dev/null
+++ b/nifi/nifi-nar-bundles/nifi-provenance-repository-bundle/nifi-journaling-provenance-repository/src/main/java/org/apache/nifi/provenance/journaling/journals/DeflatorCompressionCodec.java
@@ -0,0 +1,113 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.nifi.provenance.journaling.journals;
+
+import java.io.IOException;
+import java.io.InputStream;
+import java.io.OutputStream;
+
+import org.apache.nifi.remote.io.CompressionInputStream;
+import org.apache.nifi.remote.io.CompressionOutputStream;
+
+public class DeflatorCompressionCodec implements CompressionCodec {
+ public static final String DEFLATOR_COMPRESSION_CODEC = "deflator-compression-codec";
+
+ @Override
+ public String getName() {
+ return DEFLATOR_COMPRESSION_CODEC;
+ }
+
+ @Override
+ public CompressedOutputStream newCompressionOutputStream(final OutputStream out) throws IOException {
+ return new DeflatorOutputStream(out);
+ }
+
+ @Override
+ public InputStream newCompressionInputStream(final InputStream in) throws IOException {
+ return new CompressionInputStream(in);
+ }
+
+
+ private static class DeflatorOutputStream extends CompressedOutputStream {
+ private final OutputStream originalOut;
+ private CompressionOutputStream compressionOutput;
+
+ public DeflatorOutputStream(final OutputStream out) {
+ this.originalOut = out;
+ }
+
+ private void verifyState() {
+ if ( compressionOutput == null ) {
+ throw new IllegalStateException("No Compression Block has been created");
+ }
+ }
+
+ @Override
+ public void write(final int b) throws IOException {
+ verifyState();
+ compressionOutput.write(b);
+ }
+
+ @Override
+ public void write(final byte[] b) throws IOException {
+ verifyState();
+ compressionOutput.write(b);
+ }
+
+ @Override
+ public void write(final byte[] b, final int off, final int len) throws IOException {
+ verifyState();
+ compressionOutput.write(b, off, len);
+ }
+
+ @Override
+ public void flush() throws IOException {
+ if ( compressionOutput != null ) {
+ compressionOutput.flush();
+ }
+ }
+
+ @Override
+ public void close() throws IOException {
+ if ( compressionOutput != null ) {
+ compressionOutput.close();
+ }
+
+ originalOut.close();
+ }
+
+ @Override
+ public void beginNewBlock() throws IOException {
+ compressionOutput = new CompressionOutputStream(originalOut);
+ }
+
+ @Override
+ public void finishBlock() throws IOException {
+ // Calling close() on CompressionOutputStream doesn't close the underlying stream -- it is designed
+ // such that calling close() will write out the Compression footer and become unusable but not
+ // close the underlying stream because the whole point of CompressionOutputStream as opposed to
+ // GZIPOutputStream is that with CompressionOutputStream we can concatenate many together on a single
+ // stream.
+ if ( compressionOutput == null ) {
+ return;
+ } else {
+ compressionOutput.close();
+ }
+ }
+ }
+
+}
diff --git a/nifi/nifi-nar-bundles/nifi-provenance-repository-bundle/nifi-journaling-provenance-repository/src/main/java/org/apache/nifi/provenance/journaling/journals/StandardJournalReader.java b/nifi/nifi-nar-bundles/nifi-provenance-repository-bundle/nifi-journaling-provenance-repository/src/main/java/org/apache/nifi/provenance/journaling/journals/StandardJournalReader.java
index 13878f8..9a937b4 100644
--- a/nifi/nifi-nar-bundles/nifi-provenance-repository-bundle/nifi-journaling-provenance-repository/src/main/java/org/apache/nifi/provenance/journaling/journals/StandardJournalReader.java
+++ b/nifi/nifi-nar-bundles/nifi-provenance-repository-bundle/nifi-journaling-provenance-repository/src/main/java/org/apache/nifi/provenance/journaling/journals/StandardJournalReader.java
@@ -27,7 +27,6 @@
import org.apache.nifi.provenance.ProvenanceEventRecord;
import org.apache.nifi.provenance.journaling.io.Deserializer;
import org.apache.nifi.provenance.journaling.io.Deserializers;
-import org.apache.nifi.remote.io.CompressionInputStream;
import org.apache.nifi.stream.io.ByteCountingInputStream;
import org.apache.nifi.stream.io.LimitingInputStream;
import org.apache.nifi.stream.io.MinimumLengthInputStream;
@@ -49,7 +48,7 @@
private Deserializer deserializer;
private int serializationVersion;
- private boolean compressed;
+ private CompressionCodec compressionCodec = null;
private long lastEventIdRead = -1L;
@@ -68,7 +67,15 @@
StandardJournalMagicHeader.read(dis);
final String codecName = dis.readUTF();
serializationVersion = dis.readInt();
- compressed = dis.readBoolean();
+ final boolean compressed = dis.readBoolean();
+ if ( compressed ) {
+ final String compressionCodecName = dis.readUTF();
+ if ( DeflatorCompressionCodec.DEFLATOR_COMPRESSION_CODEC.equals(compressionCodecName) ) {
+ compressionCodec = new DeflatorCompressionCodec();
+ } else {
+ throw new IOException(file + " is compressed using unknown Compression Codec " + compressionCodecName);
+ }
+ }
deserializer = Deserializers.getDeserializer(codecName);
resetDecompressedStream();
@@ -83,10 +90,10 @@
private void resetDecompressedStream() throws IOException {
- if ( compressed ) {
- decompressedStream = new ByteCountingInputStream(new BufferedInputStream(new CompressionInputStream(compressedStream)), compressedStream.getBytesConsumed());
- } else {
+ if ( compressionCodec == null ) {
decompressedStream = compressedStream;
+ } else {
+ decompressedStream = new ByteCountingInputStream(new BufferedInputStream(compressionCodec.newCompressionInputStream(compressedStream)), compressedStream.getBytesConsumed());
}
}
@@ -129,7 +136,7 @@
// we are allowed to span blocks. We're out of data but if we are compressed, it could
// just mean that the block has ended.
- if ( !compressed ) {
+ if ( compressionCodec == null ) {
return null;
}
diff --git a/nifi/nifi-nar-bundles/nifi-provenance-repository-bundle/nifi-journaling-provenance-repository/src/main/java/org/apache/nifi/provenance/journaling/journals/StandardJournalWriter.java b/nifi/nifi-nar-bundles/nifi-provenance-repository-bundle/nifi-journaling-provenance-repository/src/main/java/org/apache/nifi/provenance/journaling/journals/StandardJournalWriter.java
index a9cb361..d18b05b 100644
--- a/nifi/nifi-nar-bundles/nifi-provenance-repository-bundle/nifi-journaling-provenance-repository/src/main/java/org/apache/nifi/provenance/journaling/journals/StandardJournalWriter.java
+++ b/nifi/nifi-nar-bundles/nifi-provenance-repository-bundle/nifi-journaling-provenance-repository/src/main/java/org/apache/nifi/provenance/journaling/journals/StandardJournalWriter.java
@@ -32,7 +32,6 @@
import org.apache.nifi.provenance.ProvenanceEventRecord;
import org.apache.nifi.provenance.journaling.io.Serializer;
-import org.apache.nifi.remote.io.CompressionOutputStream;
import org.apache.nifi.stream.io.BufferedOutputStream;
import org.apache.nifi.stream.io.ByteArrayOutputStream;
import org.apache.nifi.stream.io.ByteCountingOutputStream;
@@ -67,11 +66,13 @@
*
* Where <header> is defined as:
* <pre>
+ * magic header "NiFiProvJournal_1"
* String: serialization codec name (retrieved from serializer)
* --> 2 bytes for length of string
* --> N bytes for actual serialization codec name
* int: serialization version
* boolean: compressed: 1 -> compressed, 0 -> not compressed
+ * String : if compressed, name of compression codec; otherwise, not present
* </pre>
*
* And <record> is defined as:
@@ -94,7 +95,7 @@
private final long journalId;
private final File journalFile;
- private final boolean compressed;
+ private final CompressionCodec compressionCodec;
private final Serializer serializer;
private final long creationTime = System.nanoTime();
private final String description;
@@ -111,7 +112,7 @@
private long recordCount = 1L;
- public StandardJournalWriter(final long journalId, final File journalFile, final boolean compressed, final Serializer serializer) throws IOException {
+ public StandardJournalWriter(final long journalId, final File journalFile, final CompressionCodec compressionCodec, final Serializer serializer) throws IOException {
if ( journalFile.exists() ) {
// Check if there is actually any data here.
try (final InputStream fis = new FileInputStream(journalFile);
@@ -133,7 +134,7 @@
this.journalId = journalId;
this.journalFile = journalFile;
- this.compressed = compressed;
+ this.compressionCodec = compressionCodec;
this.serializer = serializer;
this.description = "Journal Writer for " + journalFile;
this.fos = new FileOutputStream(journalFile);
@@ -141,8 +142,10 @@
uncompressedStream = new ByteCountingOutputStream(fos);
writeHeader(uncompressedStream);
- if (compressed) {
- compressedStream = new CompressionOutputStream(uncompressedStream);
+ if (compressionCodec != null) {
+ final CompressedOutputStream cos = compressionCodec.newCompressionOutputStream(uncompressedStream);
+ cos.beginNewBlock();
+ compressedStream = cos;
} else {
compressedStream = fos;
}
@@ -155,7 +158,13 @@
StandardJournalMagicHeader.write(out);
dos.writeUTF(serializer.getCodecName());
dos.writeInt(serializer.getVersion());
+
+ final boolean compressed = compressionCodec != null;
dos.writeBoolean(compressed);
+ if ( compressed ) {
+ dos.writeUTF(compressionCodec.getName());
+ }
+
dos.flush();
}
@@ -258,6 +267,7 @@
public long getAge(final TimeUnit timeUnit) {
return timeUnit.convert(System.nanoTime() - creationTime, TimeUnit.NANOSECONDS);
}
+
@Override
public void finishBlock() throws IOException {
@@ -266,16 +276,10 @@
}
blockStarted = false;
- if ( !compressed ) {
- return;
+
+ if ( compressedStream instanceof CompressedOutputStream ) {
+ ((CompressedOutputStream) compressedStream).finishBlock();
}
-
- // Calling close() on CompressionOutputStream doesn't close the underlying stream -- it is designed
- // such that calling close() will write out the Compression footer and become unusable but not
- // close the underlying stream because the whole point of CompressionOutputStream as opposed to
- // GZIPOutputStream is that with CompressionOutputStream we can concatenate many together on a single
- // stream.
- compressedStream.close();
}
@Override
@@ -285,15 +289,10 @@
}
blockStarted = true;
- if ( !compressed ) {
- return;
+ if ( compressedStream instanceof CompressedOutputStream ) {
+ ((CompressedOutputStream) compressedStream).beginNewBlock();
+ this.out = new ByteCountingOutputStream(compressedStream, uncompressedStream.getBytesWritten());
}
- if ( eventCount == 0 ) {
- return;
- }
-
- this.compressedStream = new CompressionOutputStream(uncompressedStream);
- this.out = new ByteCountingOutputStream(compressedStream, uncompressedStream.getBytesWritten());
}
@Override
diff --git a/nifi/nifi-nar-bundles/nifi-provenance-repository-bundle/nifi-journaling-provenance-repository/src/main/java/org/apache/nifi/provenance/journaling/partition/JournalingPartition.java b/nifi/nifi-nar-bundles/nifi-provenance-repository-bundle/nifi-journaling-provenance-repository/src/main/java/org/apache/nifi/provenance/journaling/partition/JournalingPartition.java
index bba6899..31371af 100644
--- a/nifi/nifi-nar-bundles/nifi-provenance-repository-bundle/nifi-journaling-provenance-repository/src/main/java/org/apache/nifi/provenance/journaling/partition/JournalingPartition.java
+++ b/nifi/nifi-nar-bundles/nifi-provenance-repository-bundle/nifi-journaling-provenance-repository/src/main/java/org/apache/nifi/provenance/journaling/partition/JournalingPartition.java
@@ -252,7 +252,7 @@
// create new writers and reset state.
final File journalFile = new File(journalsDir, firstEventId + JOURNAL_FILE_EXTENSION);
- journalWriter = new StandardJournalWriter(firstEventId, journalFile, false, new StandardEventSerializer());
+ journalWriter = new StandardJournalWriter(firstEventId, journalFile, null, new StandardEventSerializer());
try {
tocWriter = new StandardTocWriter(QueryUtils.getTocFile(journalFile), false, config.isAlwaysSync());
tocWriter.addBlockOffset(journalWriter.getSize());
diff --git a/nifi/nifi-nar-bundles/nifi-provenance-repository-bundle/nifi-journaling-provenance-repository/src/main/java/org/apache/nifi/provenance/journaling/tasks/CompressionTask.java b/nifi/nifi-nar-bundles/nifi-provenance-repository-bundle/nifi-journaling-provenance-repository/src/main/java/org/apache/nifi/provenance/journaling/tasks/CompressionTask.java
index fc9fb46..7977620 100644
--- a/nifi/nifi-nar-bundles/nifi-provenance-repository-bundle/nifi-journaling-provenance-repository/src/main/java/org/apache/nifi/provenance/journaling/tasks/CompressionTask.java
+++ b/nifi/nifi-nar-bundles/nifi-provenance-repository-bundle/nifi-journaling-provenance-repository/src/main/java/org/apache/nifi/provenance/journaling/tasks/CompressionTask.java
@@ -26,6 +26,7 @@
import org.apache.nifi.provenance.ProvenanceEventRecord;
import org.apache.nifi.provenance.journaling.io.StandardEventSerializer;
+import org.apache.nifi.provenance.journaling.journals.DeflatorCompressionCodec;
import org.apache.nifi.provenance.journaling.journals.JournalReader;
import org.apache.nifi.provenance.journaling.journals.JournalWriter;
import org.apache.nifi.provenance.journaling.journals.StandardJournalReader;
@@ -153,7 +154,7 @@
}
try (final JournalReader journalReader = new StandardJournalReader(journalFile);
- final JournalWriter compressedWriter = new StandardJournalWriter(journalId, compressedFile, true, new StandardEventSerializer());
+ final JournalWriter compressedWriter = new StandardJournalWriter(journalId, compressedFile, new DeflatorCompressionCodec(), new StandardEventSerializer());
final TocReader tocReader = new StandardTocReader(tocFile);
final TocWriter compressedTocWriter = new StandardTocWriter(compressedTocFile, true, false)) {
diff --git a/nifi/nifi-nar-bundles/nifi-provenance-repository-bundle/nifi-journaling-provenance-repository/src/test/java/org/apache/nifi/provenance/journaling/journals/TestJournalReadWrite.java b/nifi/nifi-nar-bundles/nifi-provenance-repository-bundle/nifi-journaling-provenance-repository/src/test/java/org/apache/nifi/provenance/journaling/journals/TestJournalReadWrite.java
index f2266e2..89eace7 100644
--- a/nifi/nifi-nar-bundles/nifi-provenance-repository-bundle/nifi-journaling-provenance-repository/src/test/java/org/apache/nifi/provenance/journaling/journals/TestJournalReadWrite.java
+++ b/nifi/nifi-nar-bundles/nifi-provenance-repository-bundle/nifi-journaling-provenance-repository/src/test/java/org/apache/nifi/provenance/journaling/journals/TestJournalReadWrite.java
@@ -42,7 +42,8 @@
final StandardEventSerializer serializer = new StandardEventSerializer();
try {
- try (final StandardJournalWriter writer = new StandardJournalWriter(journalId, journalFile, compressed, serializer)) {
+ final CompressionCodec codec = compressed ? new DeflatorCompressionCodec() : null;
+ try (final StandardJournalWriter writer = new StandardJournalWriter(journalId, journalFile, codec, serializer)) {
for (int block=0; block < 100; block++) {
writer.beginNewBlock();
diff --git a/nifi/nifi-nar-bundles/nifi-provenance-repository-bundle/nifi-journaling-provenance-repository/src/test/java/org/apache/nifi/provenance/journaling/journals/TestStandardJournalReader.java b/nifi/nifi-nar-bundles/nifi-provenance-repository-bundle/nifi-journaling-provenance-repository/src/test/java/org/apache/nifi/provenance/journaling/journals/TestStandardJournalReader.java
index 9f0ba99..f29af1b 100644
--- a/nifi/nifi-nar-bundles/nifi-provenance-repository-bundle/nifi-journaling-provenance-repository/src/test/java/org/apache/nifi/provenance/journaling/journals/TestStandardJournalReader.java
+++ b/nifi/nifi-nar-bundles/nifi-provenance-repository-bundle/nifi-journaling-provenance-repository/src/test/java/org/apache/nifi/provenance/journaling/journals/TestStandardJournalReader.java
@@ -209,6 +209,8 @@
@Test
public void testReadFirstEventCompressed() throws IOException {
dos.writeBoolean(true);
+ dos.writeUTF(DeflatorCompressionCodec.DEFLATOR_COMPRESSION_CODEC);
+
writeRecords(88L, 1, true);
// write data to a file so that we can read it with the journal reader
@@ -235,6 +237,8 @@
@Test
public void testReadManyCompressed() throws IOException {
dos.writeBoolean(true);
+ dos.writeUTF(DeflatorCompressionCodec.DEFLATOR_COMPRESSION_CODEC);
+
writeRecords(0, 1024, true);
// write data to a file so that we can read it with the journal reader
@@ -266,6 +270,7 @@
@Test
public void testReadFirstEventWithBlockOffsetCompressed() throws IOException {
dos.writeBoolean(true);
+ dos.writeUTF(DeflatorCompressionCodec.DEFLATOR_COMPRESSION_CODEC);
writeRecords(0, 10, true);
final int secondBlockOffset = baos.size();
@@ -295,6 +300,8 @@
@Test
public void testReadSubsequentEventWithBlockOffsetCompressed() throws IOException {
dos.writeBoolean(true);
+ dos.writeUTF(DeflatorCompressionCodec.DEFLATOR_COMPRESSION_CODEC);
+
writeRecords(0, 10, true);
final int secondBlockOffset = baos.size();
@@ -324,6 +331,8 @@
@Test
public void testReadMultipleEventsWithBlockOffsetCompressed() throws IOException {
dos.writeBoolean(true);
+ dos.writeUTF(DeflatorCompressionCodec.DEFLATOR_COMPRESSION_CODEC);
+
writeRecords(0, 10, true);
final int secondBlockOffset = baos.size();
@@ -417,6 +426,8 @@
@Test
public void testReadEventWithBlockOffsetThenPreviousBlockOffsetCompressed() throws IOException {
dos.writeBoolean(true);
+ dos.writeUTF(DeflatorCompressionCodec.DEFLATOR_COMPRESSION_CODEC);
+
final int firstBlockOffset = baos.size();
writeRecords(0, 10, true);
diff --git a/nifi/nifi-nar-bundles/nifi-provenance-repository-bundle/nifi-journaling-provenance-repository/src/test/java/org/apache/nifi/provenance/journaling/journals/TestStandardJournalWriter.java b/nifi/nifi-nar-bundles/nifi-provenance-repository-bundle/nifi-journaling-provenance-repository/src/test/java/org/apache/nifi/provenance/journaling/journals/TestStandardJournalWriter.java
index e8a6787..956df80 100644
--- a/nifi/nifi-nar-bundles/nifi-provenance-repository-bundle/nifi-journaling-provenance-repository/src/test/java/org/apache/nifi/provenance/journaling/journals/TestStandardJournalWriter.java
+++ b/nifi/nifi-nar-bundles/nifi-provenance-repository-bundle/nifi-journaling-provenance-repository/src/test/java/org/apache/nifi/provenance/journaling/journals/TestStandardJournalWriter.java
@@ -46,7 +46,7 @@
try {
assertTrue( journalFile.createNewFile() );
- try (final StandardJournalWriter writer = new StandardJournalWriter(1L, journalFile, true, new StandardEventSerializer())) {
+ try (final StandardJournalWriter writer = new StandardJournalWriter(1L, journalFile, new DeflatorCompressionCodec(), new StandardEventSerializer())) {
}
} finally {
@@ -60,11 +60,11 @@
try {
assertTrue( journalFile.createNewFile() );
- try (final StandardJournalWriter writer = new StandardJournalWriter(1L, journalFile, true, new StandardEventSerializer())) {
+ try (final StandardJournalWriter writer = new StandardJournalWriter(1L, journalFile, new DeflatorCompressionCodec(), new StandardEventSerializer())) {
writer.write(Collections.singleton(TestUtil.generateEvent(1L)), 1L);
}
- try (final StandardJournalWriter writer = new StandardJournalWriter(1L, journalFile, true, new StandardEventSerializer())) {
+ try (final StandardJournalWriter writer = new StandardJournalWriter(1L, journalFile, new DeflatorCompressionCodec(), new StandardEventSerializer())) {
Assert.fail("StandardJournalWriter attempted to overwrite existing file");
} catch (final FileAlreadyExistsException faee) {
// expected
@@ -80,7 +80,7 @@
final StandardEventSerializer serializer = new StandardEventSerializer();
try {
- try (final StandardJournalWriter writer = new StandardJournalWriter(1L, journalFile, true, serializer)) {
+ try (final StandardJournalWriter writer = new StandardJournalWriter(1L, journalFile, new DeflatorCompressionCodec(), serializer)) {
writer.beginNewBlock();
writer.write(Collections.singleton(TestUtil.generateEvent(1L)), 1L);
writer.finishBlock();
@@ -100,6 +100,10 @@
// compression flag
assertEquals(true, dis.readBoolean());
+ // compression codec name
+ final String compressionCodecName = dis.readUTF();
+ assertEquals(DeflatorCompressionCodec.DEFLATOR_COMPRESSION_CODEC, compressionCodecName);
+
// read block start
final CompressionInputStream decompressedIn = new CompressionInputStream(bais);
final StandardEventDeserializer deserializer = new StandardEventDeserializer();
@@ -123,7 +127,7 @@
final StandardEventSerializer serializer = new StandardEventSerializer();
try {
- try (final StandardJournalWriter writer = new StandardJournalWriter(1L, journalFile, true, serializer)) {
+ try (final StandardJournalWriter writer = new StandardJournalWriter(1L, journalFile, new DeflatorCompressionCodec(), serializer)) {
for (int i=0; i < 1024; i++) {
writer.beginNewBlock();
writer.write(Collections.singleton(TestUtil.generateEvent(1L)), 1L);
@@ -145,6 +149,8 @@
// compression flag
assertEquals(true, dis.readBoolean());
+ assertEquals(DeflatorCompressionCodec.DEFLATOR_COMPRESSION_CODEC, dis.readUTF());
+
// read block start
for (int i=0; i < 1024; i++) {
final CompressionInputStream decompressedIn = new CompressionInputStream(bais);