blob: 3ea1e7dc3ef8f375605541b12a76a17e2a0474ae [file] [log] [blame]
* Licensed to the Apache Software Foundation (ASF) under one or more
* contributor license agreements. See the NOTICE file distributed with
* this work for additional information regarding copyright ownership.
* The ASF licenses this file to You under the Apache License, Version 2.0
* (the "License"); you may not use this file except in compliance with
* the License. You may obtain a copy of the License at
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* See the License for the specific language governing permissions and
* limitations under the License.
package org.apache.nifi.wali;
import org.junit.jupiter.api.BeforeEach;
import org.junit.jupiter.api.Test;
import org.wali.DummyRecord;
import org.wali.DummyRecordSerde;
import org.wali.SerDeFactory;
import org.wali.SingletonSerDeFactory;
import org.wali.UpdateType;
import java.nio.file.Files;
import java.nio.file.Path;
import java.util.ArrayList;
import java.util.Arrays;
import java.util.Collection;
import java.util.Collections;
import java.util.HashMap;
import java.util.HashSet;
import java.util.List;
import java.util.Map;
import java.util.Set;
import java.util.concurrent.atomic.AtomicInteger;
import java.util.concurrent.atomic.AtomicReference;
import java.util.function.Supplier;
import static org.junit.jupiter.api.Assertions.assertEquals;
import static org.junit.jupiter.api.Assertions.assertFalse;
import static org.junit.jupiter.api.Assertions.assertNotNull;
import static org.junit.jupiter.api.Assertions.assertNull;
import static org.junit.jupiter.api.Assertions.assertThrows;
import static org.junit.jupiter.api.Assertions.assertTrue;
public class TestLengthDelimitedJournal {
private final File journalFile = new File("target/testLengthDelimitedJournal/testJournal.journal");
private SerDeFactory<DummyRecord> serdeFactory;
private DummyRecordSerde serde;
private ObjectPool<ByteArrayDataOutputStream> streamPool;
private static final int BUFFER_SIZE = 4096;
public void setupJournal() throws IOException {
if (!journalFile.getParentFile().exists()) {
serde = new DummyRecordSerde();
serdeFactory = new SingletonSerDeFactory<>(serde);
streamPool = new BlockingQueuePool<>(1,
() -> new ByteArrayDataOutputStream(BUFFER_SIZE),
stream -> stream.getByteArrayOutputStream().size() < BUFFER_SIZE,
stream -> stream.getByteArrayOutputStream().reset());
public void testHandlingOfTrailingNulBytes() throws IOException {
try (final LengthDelimitedJournal<DummyRecord> journal = new LengthDelimitedJournal<>(journalFile, serdeFactory, streamPool, 0L)) {
final List<DummyRecord> firstTransaction = new ArrayList<>();
firstTransaction.add(new DummyRecord("1", UpdateType.CREATE));
firstTransaction.add(new DummyRecord("2", UpdateType.CREATE));
firstTransaction.add(new DummyRecord("3", UpdateType.CREATE));
final List<DummyRecord> secondTransaction = new ArrayList<>();
secondTransaction.add(new DummyRecord("1", UpdateType.UPDATE).setProperty("abc", "123"));
secondTransaction.add(new DummyRecord("2", UpdateType.UPDATE).setProperty("cba", "123"));
secondTransaction.add(new DummyRecord("3", UpdateType.UPDATE).setProperty("aaa", "123"));
final List<DummyRecord> thirdTransaction = new ArrayList<>();
thirdTransaction.add(new DummyRecord("1", UpdateType.DELETE));
thirdTransaction.add(new DummyRecord("2", UpdateType.DELETE));
journal.update(firstTransaction, id -> null);
journal.update(secondTransaction, id -> null);
journal.update(thirdTransaction, id -> null);
// Truncate the contents of the journal file by 8 bytes. Then replace with 28 trailing NUL bytes,
// as this is what we often see when we have a sudden power loss.
final byte[] contents = Files.readAllBytes(journalFile.toPath());
final byte[] truncated = Arrays.copyOfRange(contents, 0, contents.length - 8);
final byte[] withNuls = new byte[truncated.length + 28];
System.arraycopy(truncated, 0, withNuls, 0, truncated.length);
try (final OutputStream fos = new FileOutputStream(journalFile)) {
try (final LengthDelimitedJournal<DummyRecord> journal = new LengthDelimitedJournal<>(journalFile, serdeFactory, streamPool, 0L)) {
final Map<Object, DummyRecord> recordMap = new HashMap<>();
final Set<String> swapLocations = new HashSet<>();
journal.recoverRecords(recordMap, swapLocations);
assertEquals(3, recordMap.size());
final DummyRecord record1 = recordMap.get("1");
assertEquals(Collections.singletonMap("abc", "123"), record1.getProperties());
final DummyRecord record2 = recordMap.get("2");
assertEquals(Collections.singletonMap("cba", "123"), record2.getProperties());
final DummyRecord record3 = recordMap.get("3");
assertEquals(Collections.singletonMap("aaa", "123"), record3.getProperties());
public void testUpdateOnlyAppliedIfEntireTransactionApplied() throws IOException {
try (final LengthDelimitedJournal<DummyRecord> journal = new LengthDelimitedJournal<>(journalFile, serdeFactory, streamPool, 0L)) {
for (int i = 0; i < 3; i++) {
final DummyRecord record = new DummyRecord(String.valueOf(i), UpdateType.CREATE);
journal.update(Collections.singleton(record), key -> null);
final DummyRecord swapOut1Record = new DummyRecord("1", UpdateType.SWAP_OUT);
journal.update(Collections.singleton(swapOut1Record), id -> null);
final DummyRecord swapOut2Record = new DummyRecord("2", UpdateType.SWAP_OUT);
journal.update(Collections.singleton(swapOut2Record), id -> null);
final List<DummyRecord> records = new ArrayList<>();
for (int i = 0; i < 10; i++) {
final DummyRecord record = new DummyRecord("1" + i, UpdateType.CREATE);
final DummyRecord swapIn1Record = new DummyRecord("1", UpdateType.SWAP_IN);
final DummyRecord swapOut1AgainRecord = new DummyRecord("1", UpdateType.SWAP_OUT);
final DummyRecord swapIn2Record = new DummyRecord("2", UpdateType.SWAP_IN);
final DummyRecord swapOut0Record = new DummyRecord("0", UpdateType.SWAP_OUT);
journal.update(records, id -> null);
// Truncate the last 8 bytes so that we will get an EOFException when reading the last transaction.
try (final FileOutputStream fos = new FileOutputStream(journalFile, true)) {
fos.getChannel().truncate(journalFile.length() - 8);
try (final LengthDelimitedJournal<DummyRecord> journal = new LengthDelimitedJournal<>(journalFile, serdeFactory, streamPool, 0L)) {
final Map<Object, DummyRecord> recordMap = new HashMap<>();
final Set<String> swapLocations = new HashSet<>();
final JournalRecovery recovery = journal.recoverRecords(recordMap, swapLocations);
assertEquals(5L, recovery.getMaxTransactionId());
assertEquals(5, recovery.getUpdateCount());
final Set<String> expectedSwap = Collections.singleton("swap12");
assertEquals(expectedSwap, swapLocations);
final Map<Object, DummyRecord> expectedRecordMap = new HashMap<>();
expectedRecordMap.put("0", new DummyRecord("0", UpdateType.CREATE));
assertEquals(expectedRecordMap, recordMap);
public void testPoisonedJournalNotWritableAfterIOE() throws IOException {
try (final LengthDelimitedJournal<DummyRecord> journal = new LengthDelimitedJournal<>(journalFile, serdeFactory, streamPool, 0L)) {
final DummyRecord firstRecord = new DummyRecord("1", UpdateType.CREATE);
journal.update(Collections.singleton(firstRecord), key -> null);
final DummyRecord secondRecord = new DummyRecord("1", UpdateType.UPDATE);
journal.update(Collections.singleton(secondRecord), key -> firstRecord);
final DummyRecord thirdRecord = new DummyRecord("1", UpdateType.UPDATE);
final RecordLookup<DummyRecord> lookup = key -> secondRecord;
assertThrows(IOException.class, () -> journal.update(Collections.singleton(thirdRecord), lookup));
final Collection<DummyRecord> records = Collections.singleton(thirdRecord);
for (int i = 0; i < 10; i++) {
assertThrows(IOException.class, () -> journal.update(records, lookup));
assertThrows(IOException.class, () -> journal.fsync());
public void testPoisonedJournalNotWritableAfterOOME() throws IOException {
try (final LengthDelimitedJournal<DummyRecord> journal = new LengthDelimitedJournal<>(journalFile, serdeFactory, streamPool, 0L)) {
final DummyRecord firstRecord = new DummyRecord("1", UpdateType.CREATE);
journal.update(Collections.singleton(firstRecord), key -> null);
final DummyRecord secondRecord = new DummyRecord("1", UpdateType.UPDATE);
journal.update(Collections.singleton(secondRecord), key -> firstRecord);
final DummyRecord thirdRecord = new DummyRecord("1", UpdateType.UPDATE);
final RecordLookup<DummyRecord> lookup = key -> secondRecord;
assertThrows(OutOfMemoryError.class, () ->journal.update(Collections.singleton(thirdRecord), lookup));
final Collection<DummyRecord> records = Collections.singleton(thirdRecord);
for (int i = 0; i < 10; i++) {
assertThrows(IOException.class, () -> journal.update(records, lookup));
assertThrows(IOException.class, () -> journal.fsync());
public void testSuccessfulRoundTrip() throws IOException {
try (final LengthDelimitedJournal<DummyRecord> journal = new LengthDelimitedJournal<>(journalFile, serdeFactory, streamPool, 0L)) {
final DummyRecord firstRecord = new DummyRecord("1", UpdateType.CREATE);
journal.update(Collections.singleton(firstRecord), key -> null);
final DummyRecord secondRecord = new DummyRecord("1", UpdateType.UPDATE);
journal.update(Collections.singleton(secondRecord), key -> firstRecord);
final DummyRecord thirdRecord = new DummyRecord("1", UpdateType.UPDATE);
journal.update(Collections.singleton(thirdRecord), key -> secondRecord);
final Map<Object, DummyRecord> recordMap = new HashMap<>();
final Set<String> swapLocations = new HashSet<>();
final JournalRecovery recovery = journal.recoverRecords(recordMap, swapLocations);
assertEquals(2L, recovery.getMaxTransactionId());
assertEquals(3, recovery.getUpdateCount());
assertEquals(1, recordMap.size());
final DummyRecord retrieved = recordMap.get("1");
assertEquals(thirdRecord, retrieved);
public void testMultipleThreadsCreatingOverflowDirectory() throws IOException, InterruptedException {
final LengthDelimitedJournal<DummyRecord> journal = new LengthDelimitedJournal<DummyRecord>(journalFile, serdeFactory, streamPool, 3820L, 100) {
protected void createOverflowDirectory(final Path path) throws IOException {
// Create the overflow directory.
// Ensure that a second call to create the overflow directory will not cause an issue.
// Ensure that the overflow directory does not exist.
try {
final List<DummyRecord> largeCollection1 = new ArrayList<>();
for (int i=0; i < 1_000; i++) {
largeCollection1.add(new DummyRecord(String.valueOf(i), UpdateType.CREATE));
final Map<String, DummyRecord> recordMap =
.collect(Collectors.toMap(DummyRecord::getId, rec -> rec));
final List<DummyRecord> largeCollection2 = new ArrayList<>();
for (int i=0; i < 1_000; i++) {
largeCollection2.add(new DummyRecord(String.valueOf(5_000_000 + i), UpdateType.CREATE));
final Map<String, DummyRecord> recordMap2 =
.collect(Collectors.toMap(DummyRecord::getId, rec -> rec));
final AtomicReference<Exception> thread1Failure = new AtomicReference<>();
final Thread t1 = new Thread(() -> {
try {
journal.update(largeCollection1, recordMap::get);
} catch (final Exception e) {
final AtomicReference<Exception> thread2Failure = new AtomicReference<>();
final Thread t2 = new Thread(() -> {
try {
journal.update(largeCollection2, recordMap2::get);
} catch (final Exception e) {
} finally {
public void testTruncatedJournalFile() throws IOException {
final DummyRecord firstRecord, secondRecord;
try (final LengthDelimitedJournal<DummyRecord> journal = new LengthDelimitedJournal<>(journalFile, serdeFactory, streamPool, 0L)) {
firstRecord = new DummyRecord("1", UpdateType.CREATE);
journal.update(Collections.singleton(firstRecord), key -> null);
secondRecord = new DummyRecord("2", UpdateType.CREATE);
journal.update(Collections.singleton(secondRecord), key -> firstRecord);
final DummyRecord thirdRecord = new DummyRecord("1", UpdateType.UPDATE);
journal.update(Collections.singleton(thirdRecord), key -> secondRecord);
// Truncate the file
try (final FileOutputStream fos = new FileOutputStream(journalFile, true)) {
fos.getChannel().truncate(journalFile.length() - 8);
// Ensure that we are able to recover the first two records without an issue but the third is lost.
try (final LengthDelimitedJournal<DummyRecord> journal = new LengthDelimitedJournal<>(journalFile, serdeFactory, streamPool, 0L)) {
final Map<Object, DummyRecord> recordMap = new HashMap<>();
final Set<String> swapLocations = new HashSet<>();
final JournalRecovery recovery = journal.recoverRecords(recordMap, swapLocations);
assertEquals(2L, recovery.getMaxTransactionId()); // transaction ID is still 2 because that's what was written to the journal
assertEquals(2, recovery.getUpdateCount()); // only 2 updates because the last update will incur an EOFException and be skipped
assertEquals(2, recordMap.size());
final DummyRecord retrieved1 = recordMap.get("1");
assertEquals(firstRecord, retrieved1);
final DummyRecord retrieved2 = recordMap.get("2");
assertEquals(secondRecord, retrieved2);
* This test is rather complicated and creates a lot of odd objects with Thread.sleep, etc., and it may not be at-all clear what is happening. The intent of this
* test is to try to cause a race condition to occur that would cause the journal to become corrupt. Consider the following scenario:
* 1. Thread 1 attempts to update the Journal. In the #update method, it checks the state of the Journal, which is healthy.
* 2. Thread 2 attempts to update the Journal. In the #update method, it checks the state of the Journal, which is healthy.
* 3. Thread 1 now throws an Exception when attempt to write to disk (in this case, an IOException but could also be an OutOfMemoryError, etc.) The Exception is caught by the Journal,
* and the #poisoni method is called. Before the #poison method is able to close the underlying Output Stream, Thread 2 is able to write to the Output Stream (which is no longer guarded
* by Thread 1 because of the Exception that was just thrown).
* 4. Thread 2 writes to the Journal, after Thread 1 had written only a partial update. The repository has now become corrupt.
* 5. Thread 1 closes the file handle, but does so too late. Corruption has already occurred!
* In order to replicate the above series of steps in a unit test, we need to introduce some oddities.
* - The #poison method, when called, needs to wait a bit before actually closing the underlying FileOutputStream, so that Thread 2 has a chance to run after Thread 1 stop guarding the Output
* Stream and before Thread 1 closes the Output Stream. (This is handled in the test by subclassing the Journal and overriding the #poison method to pause).
* - We need to ensure that Thread 1 and Thread 2 are both able to pass the #checkState check in #update before the corruption occurs. (This is handled in the test by the 'pausingBados' object,
* which will enter the #update method, then pause before returning the ByteArrayOutputStream, which essentially yields to Thread 1, the 'corrupting thread').
* - After both threads have passed the #checkState check, we need Thread 1 to write a partial update, then throw an Exception (This is handled in the test by the 'corruptingBados' object).
* - After the Exception is thrown, we need Thread 2 to update the contents of the repository before the file is closed. (This is handled in the test by subclassing the Journal and calling
* Thread.sleep in the #poison method).
public void testFailureDuringWriteCannotCauseCorruption() throws IOException, InterruptedException {
// Create a ByteArrayDataOutputStream such that when attempting to write the data to another OutputStream via its ByteArrayOutputStream,
// the BADOS will copy 5 bytes, then throw an IOException. This should result in the journal being poisoned such that it can no longer
// be written to.
final ByteArrayDataOutputStream corruptingBados = new ByteArrayDataOutputStream(4096) {
final ByteArrayOutputStream baos = new ByteArrayOutputStream() {
public synchronized void writeTo(final OutputStream out) throws IOException {
out.write(buf, 0, 5);
throw new IOException("Intentional Exception in Unit Test designed to cause corruption");
public synchronized String toString() {
return "Corrupting ByteArrayOutputStream[" + super.toString() + "]";
public ByteArrayOutputStream getByteArrayOutputStream() {
return baos;
public DataOutputStream getDataOutputStream() {
return new DataOutputStream(baos);
// Create a ByteArrayDataOutputStream such that when attempting to write the data to another OutputStream via its ByteArrayOutputStream,
// the BADOS will sleep for 1 second before writing. This allwos other threads to trigger corruption in the repo in the meantime.
final ByteArrayDataOutputStream pausingBados = new ByteArrayDataOutputStream(4096) {
private final ByteArrayOutputStream baos = new ByteArrayOutputStream();
private final AtomicInteger count = new AtomicInteger(0);
public ByteArrayOutputStream getByteArrayOutputStream() {
// Pause only on the second iteration.
if (count.getAndIncrement() == 1) {
try {
} catch (final InterruptedException ie) {
return baos;
public DataOutputStream getDataOutputStream() {
return new DataOutputStream(baos);
final Supplier<ByteArrayDataOutputStream> badosSupplier = new Supplier<ByteArrayDataOutputStream>() {
private final AtomicInteger count = new AtomicInteger(0);
public ByteArrayDataOutputStream get() {
if (count.getAndIncrement() == 0) {
return pausingBados;
return corruptingBados;
final ObjectPool<ByteArrayDataOutputStream> corruptingStreamPool = new BlockingQueuePool<>(2,
stream -> true,
stream -> stream.getByteArrayOutputStream().reset());
final Thread[] threads = new Thread[2];
final LengthDelimitedJournal<DummyRecord> journal = new LengthDelimitedJournal<DummyRecord>(journalFile, serdeFactory, corruptingStreamPool, 0L) {
private final AtomicInteger count = new AtomicInteger(0);
protected void poison(final Throwable t) {
if (count.getAndIncrement() == 0) { // it is only important that we sleep the first time. If we sleep every time, it just slows the test down.
try {
} catch (InterruptedException e) {
final DummyRecord firstRecord, secondRecord;
try {
firstRecord = new DummyRecord("1", UpdateType.CREATE);
secondRecord = new DummyRecord("2", UpdateType.CREATE);
final Thread t1 = new Thread(() -> {
try {
journal.update(Collections.singleton(firstRecord), key -> null);
} catch (final IOException ioe) {
final Thread t2 = new Thread(() -> {
try {
journal.update(Collections.singleton(secondRecord), key -> firstRecord);
} catch (final IOException ioe) {
threads[0] = t1;
threads[1] = t2;
for (Thread thread : threads) {
} finally {
// Now, attempt to read from the Journal to ensure that it is not corrupt.
try (final LengthDelimitedJournal<DummyRecord> recoveryJournal = new LengthDelimitedJournal<>(journalFile, serdeFactory, corruptingStreamPool, 0L)) {
final Map<Object, DummyRecord> recordMap = new HashMap<>();
final Set<String> swapLocations = new HashSet<>();
recoveryJournal.recoverRecords(recordMap, swapLocations);
assertEquals(0, recordMap.size());