blob: fd27470af54a7693083fd6dd049d021b887c44f0 [file] [log] [blame]
/*
* Licensed to the Apache Software Foundation (ASF) under one or more
* contributor license agreements. See the NOTICE file distributed with
* this work for additional information regarding copyright ownership.
* The ASF licenses this file to You under the Apache License, Version 2.0
* (the "License"); you may not use this file except in compliance with
* the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/
package org.apache.nifi.provenance;
import static org.junit.Assert.assertEquals;
import java.io.IOException;
import java.util.HashMap;
import java.util.HashSet;
import java.util.List;
import java.util.Map;
import java.util.Set;
import java.util.UUID;
import org.apache.nifi.flowfile.FlowFile;
import org.apache.nifi.provenance.search.Query;
import org.apache.nifi.provenance.search.QuerySubmission;
import org.apache.nifi.provenance.search.SearchTerms;
import org.apache.nifi.util.NiFiProperties;
import org.junit.BeforeClass;
import org.junit.Test;
public class TestVolatileProvenanceRepository {
private VolatileProvenanceRepository repo;
@BeforeClass
public static void setup() {
System.setProperty(NiFiProperties.PROPERTIES_FILE_PATH, "src/test/resources/nifi.properties");
}
@Test
public void testAddAndGet() throws IOException, InterruptedException {
repo = new VolatileProvenanceRepository();
final Map<String, String> attributes = new HashMap<>();
attributes.put("abc", "xyz");
attributes.put("xyz", "abc");
attributes.put("uuid", UUID.randomUUID().toString());
final ProvenanceEventBuilder builder = new StandardProvenanceEventRecord.Builder();
builder.setEventTime(System.currentTimeMillis());
builder.setEventType(ProvenanceEventType.RECEIVE);
builder.setTransitUri("nifi://unit-test");
builder.fromFlowFile(createFlowFile(3L, 3000L, attributes));
builder.setComponentId("1234");
builder.setComponentType("dummy processor");
for (int i = 0; i < 10; i++) {
repo.registerEvent(builder.build());
}
final List<StoredProvenanceEvent> retrieved = repo.getEvents(0L, 12);
assertEquals(10, retrieved.size());
for (int i = 0; i < 10; i++) {
final ProvenanceEventRecord recovered = retrieved.get(i);
assertEquals((long) i, recovered.getEventId());
assertEquals("nifi://unit-test", recovered.getTransitUri());
assertEquals(ProvenanceEventType.RECEIVE, recovered.getEventType());
assertEquals(attributes, recovered.getAttributes());
}
}
@Test
public void testIndexAndCompressOnRolloverAndSubsequentSearchAsync() throws InterruptedException {
repo = new VolatileProvenanceRepository();
final String uuid = "00000000-0000-0000-0000-000000000000";
final Map<String, String> attributes = new HashMap<>();
attributes.put("abc", "xyz");
attributes.put("xyz", "abc");
attributes.put("filename", "file-" + uuid);
final ProvenanceEventBuilder builder = new StandardProvenanceEventRecord.Builder();
builder.setEventTime(System.currentTimeMillis());
builder.setEventType(ProvenanceEventType.RECEIVE);
builder.setTransitUri("nifi://unit-test");
builder.fromFlowFile(createFlowFile(3L, 3000L, attributes));
builder.setComponentId("1234");
builder.setComponentType("dummy processor");
for (int i = 0; i < 10; i++) {
attributes.put("uuid", "00000000-0000-0000-0000-00000000000" + i);
builder.fromFlowFile(createFlowFile(i, 3000L, attributes));
repo.registerEvent(builder.build());
}
final Query query = new Query(UUID.randomUUID().toString());
query.addSearchTerm(SearchTerms.newSearchTerm(SearchableFields.FlowFileUUID, "00000*"));
query.addSearchTerm(SearchTerms.newSearchTerm(SearchableFields.Filename, "file-*"));
query.addSearchTerm(SearchTerms.newSearchTerm(SearchableFields.ComponentID, "12?4"));
query.addSearchTerm(SearchTerms.newSearchTerm(SearchableFields.TransitURI, "nifi://*"));
query.setMaxResults(100);
final QuerySubmission submission = repo.submitQuery(query);
while (!submission.getResult().isFinished()) {
Thread.sleep(100L);
}
assertEquals(10, submission.getResult().getMatchingEvents().size());
for (final ProvenanceEventRecord match : submission.getResult().getMatchingEvents()) {
System.out.println(match);
}
}
private FlowFile createFlowFile(final long id, final long fileSize, final Map<String, String> attributes) {
final Map<String, String> attrCopy = new HashMap<>(attributes);
return new FlowFile() {
@Override
public long getId() {
return id;
}
@Override
public long getEntryDate() {
return System.currentTimeMillis();
}
@Override
public Set<String> getLineageIdentifiers() {
return new HashSet<String>();
}
@Override
public long getLineageStartDate() {
return System.currentTimeMillis();
}
@Override
public boolean isPenalized() {
return false;
}
@Override
public String getAttribute(final String s) {
return attrCopy.get(s);
}
@Override
public long getSize() {
return fileSize;
}
@Override
public Map<String, String> getAttributes() {
return attrCopy;
}
@Override
public int compareTo(final FlowFile o) {
return 0;
}
@Override
public Long getLastQueueDate() {
return System.currentTimeMillis();
}
};
}
}