blob: 338aa7e83a5bf1a975de1c563f3e3ae769a09fb8 [file] [log] [blame]
/*
* Licensed to the Apache Software Foundation (ASF) under one or more
* contributor license agreements. See the NOTICE file distributed with
* this work for additional information regarding copyright ownership.
* The ASF licenses this file to You under the Apache License, Version 2.0
* (the "License") you may not use this file except in compliance with
* the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/
package org.apache.nifi.provenance
import org.apache.nifi.events.EventReporter
import org.apache.nifi.flowfile.FlowFile
import org.apache.nifi.provenance.serialization.RecordReaders
import org.apache.nifi.reporting.Severity
import org.apache.nifi.security.kms.StaticKeyProvider
import org.apache.nifi.util.NiFiProperties
import org.apache.nifi.util.file.FileUtils
import org.bouncycastle.jce.provider.BouncyCastleProvider
import org.junit.jupiter.api.AfterEach
import org.junit.jupiter.api.BeforeAll
import org.junit.jupiter.api.BeforeEach
import org.junit.jupiter.api.Test
import java.security.Security
import java.util.concurrent.TimeUnit
import java.util.concurrent.atomic.AtomicLong
import static org.apache.nifi.provenance.TestUtil.createFlowFile
class EncryptedWriteAheadProvenanceRepositoryTest {
private static final String KEY_HEX_128 = "0123456789ABCDEFFEDCBA9876543210"
private static final String KEY_HEX = KEY_HEX_128
private static final String KEY_ID = "K1"
private static final String TRANSIT_URI = "nifi://unit-test"
private static final String PROCESSOR_TYPE = "Mock Processor"
private static final String COMPONENT_ID = "1234"
private static final AtomicLong recordId = new AtomicLong()
private ProvenanceRepository repo
private static RepositoryConfiguration config
private EventReporter eventReporter
private List<ReportedEvent> reportedEvents = Collections.synchronizedList(new ArrayList<ReportedEvent>())
@BeforeAll
static void setUpOnce() throws Exception {
Security.addProvider(new BouncyCastleProvider())
}
@BeforeEach
void setUp() throws Exception {
reportedEvents?.clear()
eventReporter = createMockEventReporter()
}
@AfterEach
void tearDown() throws Exception {
closeRepo(repo, config)
}
private static RepositoryConfiguration createConfiguration() {
RepositoryConfiguration config = new RepositoryConfiguration()
config.addStorageDirectory("1", File.createTempDir(getClass().simpleName))
config.setCompressOnRollover(true)
config.setMaxEventFileLife(2000L, TimeUnit.SECONDS)
config.setCompressionBlockBytes(100)
return config
}
private EventReporter createMockEventReporter() {
[reportEvent: { Severity s, String c, String m ->
ReportedEvent event = new ReportedEvent(s, c, m)
reportedEvents.add(event)
}] as EventReporter
}
private void closeRepo(ProvenanceRepository repo = this.repo, RepositoryConfiguration config = this.config) throws IOException {
if (repo == null) {
return
}
try {
repo.close()
} catch (IOException ioe) {
}
// Delete all of the storage files. We do this in order to clean up the tons of files that
// we create but also to ensure that we have closed all of the file handles. If we leave any
// streams open, for instance, this will throw an IOException, causing our unit test to fail.
if (config != null) {
for (final File storageDir : config.getStorageDirectories().values()) {
int i
for (i = 0; i < 3; i++) {
try {
FileUtils.deleteFile(storageDir, true)
break
} catch (IOException ioe) {
// if there is a virus scanner, etc. running in the background we may not be able to
// delete the file. Wait a sec and try again.
if (i == 2) {
throw ioe
} else {
try {
System.out.println("file: " + storageDir.toString() + " exists=" + storageDir.exists())
FileUtils.deleteFile(storageDir, true)
break
} catch (final IOException ioe2) {
// if there is a virus scanner, etc. running in the background we may not be able to
// delete the file. Wait a sec and try again.
if (i == 2) {
throw ioe2
} else {
try {
Thread.sleep(1000L)
} catch (final InterruptedException ie) {
}
}
}
}
}
}
}
}
}
private static
final FlowFile buildFlowFile(Map attributes = [:], long id = recordId.getAndIncrement(), long fileSize = 3000L) {
if (!attributes?.uuid) {
attributes.uuid = UUID.randomUUID().toString()
}
createFlowFile(id, fileSize, attributes)
}
private
static ProvenanceEventRecord buildEventRecord(FlowFile flowfile = buildFlowFile(), ProvenanceEventType eventType = ProvenanceEventType.RECEIVE, String transitUri = TRANSIT_URI, String componentId = COMPONENT_ID, String componentType = PROCESSOR_TYPE, long eventTime = System.currentTimeMillis()) {
final ProvenanceEventBuilder builder = new StandardProvenanceEventRecord.Builder()
builder.setEventTime(eventTime)
builder.setEventType(eventType)
builder.setTransitUri(transitUri)
builder.fromFlowFile(flowfile)
builder.setComponentId(componentId)
builder.setComponentType(componentType)
builder.build()
}
/**
* This test operates on {@link WriteAheadProvenanceRepository} to verify the normal operations of existing implementations.
*
* @throws IOException
* @throws InterruptedException
*/
@Test
void testWriteAheadProvenanceRepositoryShouldRegisterAndRetrieveEvents() throws IOException, InterruptedException {
// Arrange
config = createConfiguration()
// Needed until NIFI-3605 is implemented
// config.setMaxEventFileCapacity(1L)
config.setMaxEventFileCount(1)
config.setMaxEventFileLife(1, TimeUnit.SECONDS)
repo = new WriteAheadProvenanceRepository(config)
repo.initialize(eventReporter, null, null, IdentifierLookup.EMPTY)
Map attributes = ["abc": "xyz",
"123": "456"]
final ProvenanceEventRecord record = buildEventRecord(buildFlowFile(attributes))
final int RECORD_COUNT = 10
// Act
RECORD_COUNT.times {
repo.registerEvent(record)
}
// Ensure there is not a timing issue retrieving all records
Thread.sleep(1000)
final List<ProvenanceEventRecord> recoveredRecords = repo.getEvents(0L, RECORD_COUNT + 1)
// Assert
assert recoveredRecords.size() == RECORD_COUNT
recoveredRecords.eachWithIndex { ProvenanceEventRecord recovered, int i ->
assert recovered.getEventId() == (i as Long)
assert recovered.getTransitUri() == TRANSIT_URI
assert recovered.getEventType() == ProvenanceEventType.RECEIVE
// The UUID was added later but we care that all attributes we provided are still there
assert recovered.getAttributes().entrySet().containsAll(attributes.entrySet())
}
}
@Test
void testShouldRegisterAndGetEvents() {
// Arrange
final int RECORD_COUNT = 10
NiFiProperties properties = NiFiProperties.createBasicNiFiProperties(null, [
(NiFiProperties.PROVENANCE_REPO_ENCRYPTION_KEY_PROVIDER_IMPLEMENTATION_CLASS): StaticKeyProvider.class.name,
(NiFiProperties.PROVENANCE_REPO_ENCRYPTION_KEY): KEY_HEX,
(NiFiProperties.PROVENANCE_REPO_ENCRYPTION_KEY_ID): KEY_ID
])
repo = new EncryptedWriteAheadProvenanceRepository(properties)
repo.initialize(eventReporter, null, null, IdentifierLookup.EMPTY)
Map attributes = ["abc": "This is a plaintext attribute.",
"123": "This is another plaintext attribute."]
final List<ProvenanceEventRecord> records = []
RECORD_COUNT.times { int i ->
records << buildEventRecord(buildFlowFile(attributes + [count: i as String]))
}
final long LAST_RECORD_ID = repo.getMaxEventId()
// Act
repo.registerEvents(records)
// Retrieve the events through the interface
List<ProvenanceEventRecord> recoveredRecords = repo.getEvents(LAST_RECORD_ID + 1, RECORD_COUNT * 2)
// Assert
recoveredRecords.eachWithIndex { ProvenanceEventRecord recoveredRecord, int i ->
assert recoveredRecord.getEventId() == LAST_RECORD_ID + 1 + i
assert recoveredRecord.getTransitUri() == TRANSIT_URI
assert recoveredRecord.getEventType() == ProvenanceEventType.RECEIVE
// The UUID was added later but we care that all attributes we provided are still there
assert recoveredRecord.getAttributes().entrySet().containsAll(attributes.entrySet())
assert recoveredRecord.getAttribute("count") == i as String
}
}
private static class ReportedEvent {
final Severity severity
final String category
final String message
ReportedEvent(final Severity severity, final String category, final String message) {
this.severity = severity
this.category = category
this.message = message
}
@Override
String toString() {
"ReportedEvent [${severity}] ${category}: ${message}"
}
}
}