blob: 6d24445093dc0c7fc68aaff98b736bf305a6de61 [file] [log] [blame]
/*
* Licensed to the Apache Software Foundation (ASF) under one or more
* contributor license agreements. See the NOTICE file distributed with
* this work for additional information regarding copyright ownership.
* The ASF licenses this file to You under the Apache License, Version 2.0
* (the "License"); you may not use this file except in compliance with
* the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/
package org.apache.nifi.wali;
import org.junit.Assert;
import org.junit.Ignore;
import org.junit.Rule;
import org.junit.Test;
import org.junit.rules.TestName;
import org.wali.DummyRecord;
import org.wali.DummyRecordSerde;
import org.wali.SerDeFactory;
import org.wali.SingletonSerDeFactory;
import org.wali.UpdateType;
import org.wali.WriteAheadRepository;
import java.io.File;
import java.io.IOException;
import java.nio.file.Path;
import java.nio.file.Paths;
import java.text.NumberFormat;
import java.util.ArrayList;
import java.util.Collection;
import java.util.Collections;
import java.util.HashSet;
import java.util.List;
import java.util.Map;
import java.util.Set;
import java.util.concurrent.TimeUnit;
import java.util.function.Function;
import java.util.stream.Collectors;
import static org.junit.Assert.assertEquals;
import static org.junit.Assert.assertFalse;
import static org.junit.Assert.assertNotNull;
import static org.junit.Assert.assertTrue;
public class TestSequentialAccessWriteAheadLog {
@Rule
public TestName testName = new TestName();
@Test
public void testUpdateWithExternalFile() throws IOException {
final DummyRecordSerde serde = new DummyRecordSerde();
final SequentialAccessWriteAheadLog<DummyRecord> repo = createWriteRepo(serde);
final List<DummyRecord> records = new ArrayList<>();
for (int i = 0; i < 350_000; i++) {
final DummyRecord record = new DummyRecord(String.valueOf(i), UpdateType.CREATE);
records.add(record);
}
repo.update(records, false);
repo.shutdown();
assertEquals(1, serde.getExternalFileReferences().size());
final SequentialAccessWriteAheadLog<DummyRecord> recoveryRepo = createRecoveryRepo();
final Collection<DummyRecord> recovered = recoveryRepo.recoverRecords();
// ensure that we get the same records back, but the order may be different, so wrap both collections
// in a HashSet so that we can compare unordered collections of the same type.
assertEquals(new HashSet<>(records), new HashSet<>(recovered));
}
@Test
public void testUpdateWithExternalFileFollowedByInlineUpdate() throws IOException {
final DummyRecordSerde serde = new DummyRecordSerde();
final SequentialAccessWriteAheadLog<DummyRecord> repo = createWriteRepo(serde);
final List<DummyRecord> records = new ArrayList<>();
for (int i = 0; i < 350_000; i++) {
final DummyRecord record = new DummyRecord(String.valueOf(i), UpdateType.CREATE);
records.add(record);
}
repo.update(records, false);
final DummyRecord subsequentRecord = new DummyRecord("350001", UpdateType.CREATE);
repo.update(Collections.singleton(subsequentRecord), false);
repo.shutdown();
assertEquals(1, serde.getExternalFileReferences().size());
final SequentialAccessWriteAheadLog<DummyRecord> recoveryRepo = createRecoveryRepo();
final Collection<DummyRecord> recovered = recoveryRepo.recoverRecords();
// ensure that we get the same records back, but the order may be different, so wrap both collections
// in a HashSet so that we can compare unordered collections of the same type.
final Set<DummyRecord> expectedRecords = new HashSet<>(records);
expectedRecords.add(subsequentRecord);
assertEquals(expectedRecords, new HashSet<>(recovered));
}
@Test
public void testRecoverWithNoCheckpoint() throws IOException {
final SequentialAccessWriteAheadLog<DummyRecord> repo = createWriteRepo();
final List<DummyRecord> records = new ArrayList<>();
for (int i = 0; i < 10; i++) {
final DummyRecord record = new DummyRecord(String.valueOf(i), UpdateType.CREATE);
records.add(record);
}
repo.update(records, false);
repo.shutdown();
final SequentialAccessWriteAheadLog<DummyRecord> recoveryRepo = createRecoveryRepo();
final Collection<DummyRecord> recovered = recoveryRepo.recoverRecords();
// ensure that we get the same records back, but the order may be different, so wrap both collections
// in a HashSet so that we can compare unordered collections of the same type.
assertEquals(new HashSet<>(records), new HashSet<>(recovered));
}
@Test
public void testRecoverWithNoJournalUpdates() throws IOException {
final SequentialAccessWriteAheadLog<DummyRecord> repo = createWriteRepo();
final List<DummyRecord> records = new ArrayList<>();
for (int i = 0; i < 10; i++) {
final DummyRecord record = new DummyRecord(String.valueOf(i), UpdateType.CREATE);
records.add(record);
}
repo.update(records, false);
repo.checkpoint();
repo.shutdown();
final SequentialAccessWriteAheadLog<DummyRecord> recoveryRepo = createRecoveryRepo();
final Collection<DummyRecord> recovered = recoveryRepo.recoverRecords();
// ensure that we get the same records back, but the order may be different, so wrap both collections
// in a HashSet so that we can compare unordered collections of the same type.
assertEquals(new HashSet<>(records), new HashSet<>(recovered));
}
@Test
public void testRecoverWithMultipleCheckpointsBetweenJournalUpdate() throws IOException {
final SequentialAccessWriteAheadLog<DummyRecord> repo = createWriteRepo();
final List<DummyRecord> records = new ArrayList<>();
for (int i = 0; i < 10; i++) {
final DummyRecord record = new DummyRecord(String.valueOf(i), UpdateType.CREATE);
records.add(record);
}
repo.update(records, false);
for (int i = 0; i < 8; i++) {
repo.checkpoint();
}
final DummyRecord updateRecord = new DummyRecord("4", UpdateType.UPDATE);
updateRecord.setProperties(Collections.singletonMap("updated", "true"));
repo.update(Collections.singleton(updateRecord), false);
repo.shutdown();
final SequentialAccessWriteAheadLog<DummyRecord> recoveryRepo = createRecoveryRepo();
final Collection<DummyRecord> recovered = recoveryRepo.recoverRecords();
// what we expect is the same as what we updated with, except we don't want the DummyRecord for CREATE 4
// because we will instead recover an UPDATE only for 4.
final Set<DummyRecord> expected = new HashSet<>(records);
expected.remove(new DummyRecord("4", UpdateType.CREATE));
expected.add(updateRecord);
// ensure that we get the same records back, but the order may be different, so wrap both collections
// in a HashSet so that we can compare unordered collections of the same type.
assertEquals(expected, new HashSet<>(recovered));
}
private SequentialAccessWriteAheadLog<DummyRecord> createRecoveryRepo() throws IOException {
final File targetDir = new File("target");
final File storageDir = new File(targetDir, testName.getMethodName());
final DummyRecordSerde serde = new DummyRecordSerde();
final SerDeFactory<DummyRecord> serdeFactory = new SingletonSerDeFactory<>(serde);
final SequentialAccessWriteAheadLog<DummyRecord> repo = new SequentialAccessWriteAheadLog<>(storageDir, serdeFactory);
return repo;
}
private SequentialAccessWriteAheadLog<DummyRecord> createWriteRepo() throws IOException {
return createWriteRepo(new DummyRecordSerde());
}
private SequentialAccessWriteAheadLog<DummyRecord> createWriteRepo(final DummyRecordSerde serde) throws IOException {
final File targetDir = new File("target");
final File storageDir = new File(targetDir, testName.getMethodName());
deleteRecursively(storageDir);
assertTrue(storageDir.mkdirs());
final SerDeFactory<DummyRecord> serdeFactory = new SingletonSerDeFactory<>(serde);
final SequentialAccessWriteAheadLog<DummyRecord> repo = new SequentialAccessWriteAheadLog<>(storageDir, serdeFactory);
final Collection<DummyRecord> recovered = repo.recoverRecords();
assertNotNull(recovered);
assertTrue(recovered.isEmpty());
return repo;
}
/**
* This test is designed to update the repository in several different wants, testing CREATE, UPDATE, SWAP IN, SWAP OUT, and DELETE
* update types, as well as testing updates with single records and with multiple records in a transaction. It also verifies that we
* are able to checkpoint, then update journals, and then recover updates to both the checkpoint and the journals.
*/
@Test
public void testUpdateThenRecover() throws IOException {
final SequentialAccessWriteAheadLog<DummyRecord> repo = createWriteRepo();
final DummyRecord firstCreate = new DummyRecord("0", UpdateType.CREATE);
repo.update(Collections.singleton(firstCreate), false);
final List<DummyRecord> creations = new ArrayList<>();
for (int i = 1; i < 11; i++) {
final DummyRecord record = new DummyRecord(String.valueOf(i), UpdateType.CREATE);
creations.add(record);
}
repo.update(creations, false);
final DummyRecord deleteRecord3 = new DummyRecord("3", UpdateType.DELETE);
repo.update(Collections.singleton(deleteRecord3), false);
final DummyRecord swapOutRecord4 = new DummyRecord("4", UpdateType.SWAP_OUT);
swapOutRecord4.setSwapLocation("swap");
final DummyRecord swapOutRecord5 = new DummyRecord("5", UpdateType.SWAP_OUT);
swapOutRecord5.setSwapLocation("swap");
final List<DummyRecord> swapOuts = new ArrayList<>();
swapOuts.add(swapOutRecord4);
swapOuts.add(swapOutRecord5);
repo.update(swapOuts, false);
final DummyRecord swapInRecord5 = new DummyRecord("5", UpdateType.SWAP_IN);
swapInRecord5.setSwapLocation("swap");
repo.update(Collections.singleton(swapInRecord5), false);
final int recordCount = repo.checkpoint();
assertEquals(9, recordCount);
final DummyRecord updateRecord6 = new DummyRecord("6", UpdateType.UPDATE);
updateRecord6.setProperties(Collections.singletonMap("greeting", "hello"));
repo.update(Collections.singleton(updateRecord6), false);
final List<DummyRecord> updateRecords = new ArrayList<>();
for (int i = 7; i < 11; i++) {
final DummyRecord updateRecord = new DummyRecord(String.valueOf(i), UpdateType.UPDATE);
updateRecord.setProperties(Collections.singletonMap("greeting", "hi"));
updateRecords.add(updateRecord);
}
final DummyRecord deleteRecord2 = new DummyRecord("2", UpdateType.DELETE);
updateRecords.add(deleteRecord2);
repo.update(updateRecords, false);
repo.shutdown();
final SequentialAccessWriteAheadLog<DummyRecord> recoveryRepo = createRecoveryRepo();
final Collection<DummyRecord> recoveredRecords = recoveryRepo.recoverRecords();
// We should now have records:
// 0-10 CREATED
// 2 & 3 deleted
// 4 & 5 swapped out
// 5 swapped back in
// 6 updated with greeting = hello
// 7-10 updated with greeting = hi
assertEquals(8, recoveredRecords.size());
final Map<String, DummyRecord> recordMap = recoveredRecords.stream()
.collect(Collectors.toMap(record -> record.getId(), Function.identity()));
assertFalse(recordMap.containsKey("2"));
assertFalse(recordMap.containsKey("3"));
assertFalse(recordMap.containsKey("4"));
assertTrue(recordMap.get("1").getProperties().isEmpty());
assertTrue(recordMap.get("5").getProperties().isEmpty());
assertEquals("hello", recordMap.get("6").getProperties().get("greeting"));
for (int i = 7; i < 11; i++) {
assertEquals("hi", recordMap.get(String.valueOf(i)).getProperties().get("greeting"));
}
recoveryRepo.shutdown();
}
@Test
@Ignore("For manual performance testing")
public void testUpdatePerformance() throws IOException, InterruptedException {
final Path path = Paths.get("target/sequential-access-repo");
deleteRecursively(path.toFile());
assertTrue(path.toFile().mkdirs());
final DummyRecordSerde serde = new DummyRecordSerde();
final SerDeFactory<DummyRecord> serdeFactory = new SingletonSerDeFactory<>(serde);
final WriteAheadRepository<DummyRecord> repo = new SequentialAccessWriteAheadLog<>(path.toFile(), serdeFactory);
final Collection<DummyRecord> initialRecs = repo.recoverRecords();
assertTrue(initialRecs.isEmpty());
final long updateCountPerThread = 1_000_000;
final int numThreads = 4;
final Thread[] threads = new Thread[numThreads];
final int batchSize = 1;
long previousBytes = 0L;
for (int j = 0; j < 2; j++) {
for (int i = 0; i < numThreads; i++) {
final Thread t = new Thread(new Runnable() {
@Override
public void run() {
final List<DummyRecord> batch = new ArrayList<>();
for (int i = 0; i < updateCountPerThread / batchSize; i++) {
batch.clear();
for (int j = 0; j < batchSize; j++) {
final DummyRecord record = new DummyRecord(String.valueOf(i), UpdateType.CREATE);
batch.add(record);
}
try {
repo.update(batch, false);
} catch (Throwable t) {
t.printStackTrace();
Assert.fail(t.toString());
}
}
}
});
threads[i] = t;
}
final long start = System.nanoTime();
for (final Thread t : threads) {
t.start();
}
for (final Thread t : threads) {
t.join();
}
long bytes = 0L;
for (final File journalFile : path.resolve("journals").toFile().listFiles()) {
bytes += journalFile.length();
}
bytes -= previousBytes;
previousBytes = bytes;
final long millis = TimeUnit.NANOSECONDS.toMillis(System.nanoTime() - start);
final long eventsPerSecond = (updateCountPerThread * numThreads * 1000) / millis;
final String eps = NumberFormat.getInstance().format(eventsPerSecond);
final long bytesPerSecond = bytes * 1000 / millis;
final String bps = NumberFormat.getInstance().format(bytesPerSecond);
if (j == 0) {
System.out.println(millis + " ms to insert " + updateCountPerThread * numThreads + " updates using " + numThreads
+ " threads, *as a warmup!* " + eps + " events per second, " + bps + " bytes per second");
} else {
System.out.println(millis + " ms to insert " + updateCountPerThread * numThreads + " updates using " + numThreads
+ " threads, " + eps + " events per second, " + bps + " bytes per second");
}
}
}
private void deleteRecursively(final File file) {
final File[] children = file.listFiles();
if (children != null) {
for (final File child : children) {
deleteRecursively(child);
}
}
file.delete();
}
}