blob: bd084bc4c4854a786bc33849eb7ff8052307028b [file] [log] [blame]
/*
* Licensed to the Apache Software Foundation (ASF) under one or more
* contributor license agreements. See the NOTICE file distributed with
* this work for additional information regarding copyright ownership.
* The ASF licenses this file to You under the Apache License, Version 2.0
* (the "License"); you may not use this file except in compliance with
* the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/
package org.apache.nifi.provenance;
import java.io.IOException;
import java.util.ArrayList;
import java.util.Collection;
import java.util.Collections;
import java.util.Date;
import java.util.Iterator;
import java.util.List;
import java.util.Map;
import java.util.concurrent.ConcurrentHashMap;
import java.util.concurrent.ConcurrentMap;
import java.util.concurrent.ExecutorService;
import java.util.concurrent.Executors;
import java.util.concurrent.ScheduledExecutorService;
import java.util.concurrent.ThreadFactory;
import java.util.concurrent.TimeUnit;
import java.util.concurrent.atomic.AtomicBoolean;
import java.util.concurrent.atomic.AtomicInteger;
import java.util.concurrent.atomic.AtomicLong;
import java.util.regex.Pattern;
import org.apache.nifi.events.EventReporter;
import org.apache.nifi.flowfile.attributes.CoreAttributes;
import org.apache.nifi.processor.DataUnit;
import org.apache.nifi.provenance.lineage.ComputeLineageSubmission;
import org.apache.nifi.provenance.lineage.FlowFileLineage;
import org.apache.nifi.provenance.lineage.Lineage;
import org.apache.nifi.provenance.lineage.LineageComputationType;
import org.apache.nifi.provenance.query.ProvenanceQuerySubmission;
import org.apache.nifi.provenance.search.Query;
import org.apache.nifi.provenance.search.QueryResult;
import org.apache.nifi.provenance.search.QuerySubmission;
import org.apache.nifi.provenance.search.SearchTerm;
import org.apache.nifi.provenance.search.SearchableField;
import org.apache.nifi.util.IntegerHolder;
import org.apache.nifi.util.NiFiProperties;
import org.apache.nifi.util.RingBuffer;
import org.apache.nifi.util.RingBuffer.Filter;
import org.apache.nifi.util.RingBuffer.ForEachEvaluator;
import org.apache.nifi.util.RingBuffer.IterationDirection;
public class VolatileProvenanceRepository implements ProvenanceEventRepository {
private static final int MAX_CONCURRENT_QUERIES = 10;
// properties
public static final String BUFFER_SIZE = "nifi.provenance.repository.buffer.size";
// default property values
public static final int DEFAULT_BUFFER_SIZE = 10000;
private final RingBuffer<StoredProvenanceEvent> ringBuffer;
private final List<SearchableField> searchableFields;
private final List<SearchableField> searchableAttributes;
private final ExecutorService queryExecService;
private final ScheduledExecutorService scheduledExecService;
private final ConcurrentMap<String, AsyncQuerySubmission> querySubmissionMap = new ConcurrentHashMap<>();
private final ConcurrentMap<String, AsyncLineageSubmission> lineageSubmissionMap = new ConcurrentHashMap<>();
private final ConcurrentMap<String, ProvenanceQuerySubmission> provQuerySubmissionMap = new ConcurrentHashMap<>();
private final AtomicLong idGenerator = new AtomicLong(0L);
private final AtomicBoolean initialized = new AtomicBoolean(false);
public VolatileProvenanceRepository() {
final NiFiProperties properties = NiFiProperties.getInstance();
final int bufferSize = properties.getIntegerProperty(BUFFER_SIZE, DEFAULT_BUFFER_SIZE);
ringBuffer = new RingBuffer<>(bufferSize);
final String indexedFieldString = properties.getProperty(NiFiProperties.PROVENANCE_INDEXED_FIELDS);
final String indexedAttrString = properties.getProperty(NiFiProperties.PROVENANCE_INDEXED_ATTRIBUTES);
searchableFields = Collections.unmodifiableList(SearchableFieldParser.extractSearchableFields(indexedFieldString, true));
searchableAttributes = Collections.unmodifiableList(SearchableFieldParser.extractSearchableFields(indexedAttrString, false));
final ThreadFactory defaultThreadFactory = Executors.defaultThreadFactory();
queryExecService = Executors.newFixedThreadPool(2, new ThreadFactory() {
private final AtomicInteger counter = new AtomicInteger(0);
@Override
public Thread newThread(final Runnable r) {
final Thread thread = defaultThreadFactory.newThread(r);
thread.setName("Provenance Query Thread-" + counter.incrementAndGet());
return thread;
}
});
scheduledExecService = Executors.newScheduledThreadPool(2);
}
@Override
public void initialize(final EventReporter eventReporter) {
if (initialized.getAndSet(true)) {
return;
}
scheduledExecService.scheduleWithFixedDelay(new RemoveExpiredQueryResults(), 30L, 30L, TimeUnit.SECONDS);
}
@Override
public ProvenanceEventBuilder eventBuilder() {
return new StandardProvenanceEventRecord.Builder();
}
@Override
public void registerEvent(final ProvenanceEventRecord event) {
final long id = idGenerator.getAndIncrement();
ringBuffer.add(new IdEnrichedProvenanceEvent(event, id));
}
@Override
public void registerEvents(final Collection<ProvenanceEventRecord> events) {
for (final ProvenanceEventRecord event : events) {
registerEvent(event);
}
}
@Override
public List<StoredProvenanceEvent> getEvents(final long firstRecordId, final int maxRecords) throws IOException {
return ringBuffer.getSelectedElements(new Filter<StoredProvenanceEvent>() {
@Override
public boolean select(final StoredProvenanceEvent value) {
return value.getEventId() >= firstRecordId;
}
}, maxRecords);
}
@Override
public Long getMaxEventId() {
final ProvenanceEventRecord newest = ringBuffer.getNewestElement();
return (newest == null) ? null : newest.getEventId();
}
public StoredProvenanceEvent getEvent(final String identifier) throws IOException {
final List<StoredProvenanceEvent> records = ringBuffer.getSelectedElements(new Filter<StoredProvenanceEvent>() {
@Override
public boolean select(final StoredProvenanceEvent event) {
return identifier.equals(event.getFlowFileUuid());
}
}, 1);
return records.isEmpty() ? null : records.get(0);
}
@Override
public StoredProvenanceEvent getEvent(final long id) {
final List<StoredProvenanceEvent> records = ringBuffer.getSelectedElements(new Filter<StoredProvenanceEvent>() {
@Override
public boolean select(final StoredProvenanceEvent event) {
return event.getEventId() == id;
}
}, 1);
return records.isEmpty() ? null : records.get(0);
}
@Override
public void close() throws IOException {
queryExecService.shutdownNow();
scheduledExecService.shutdown();
}
@Override
public List<SearchableField> getSearchableFields() {
return searchableFields;
}
@Override
public List<SearchableField> getSearchableAttributes() {
return searchableAttributes;
}
public QueryResult queryEvents(final Query query) throws IOException {
final QuerySubmission submission = submitQuery(query);
final QueryResult result = submission.getResult();
while (!result.isFinished()) {
try {
Thread.sleep(100L);
} catch (final InterruptedException ie) {
}
}
if (result.getError() != null) {
throw new IOException(result.getError());
}
return result;
}
private Filter<ProvenanceEventRecord> createFilter(final Query query) {
return new Filter<ProvenanceEventRecord>() {
@Override
public boolean select(final ProvenanceEventRecord event) {
if (query.getStartDate() != null && query.getStartDate().getTime() > event.getEventTime()) {
return false;
}
if (query.getEndDate() != null && query.getEndDate().getTime() < event.getEventTime()) {
return false;
}
if (query.getMaxFileSize() != null) {
final long maxFileSize = DataUnit.parseDataSize(query.getMaxFileSize(), DataUnit.B).longValue();
if (event.getFileSize() > maxFileSize) {
return false;
}
}
if (query.getMinFileSize() != null) {
final long minFileSize = DataUnit.parseDataSize(query.getMinFileSize(), DataUnit.B).longValue();
if (event.getFileSize() < minFileSize) {
return false;
}
}
for (final SearchTerm searchTerm : query.getSearchTerms()) {
final SearchableField searchableField = searchTerm.getSearchableField();
final String searchValue = searchTerm.getValue();
if (searchableField.isAttribute()) {
final String attributeName = searchableField.getIdentifier();
final String eventAttributeValue = event.getAttributes().get(attributeName);
if (searchValue.contains("?") || searchValue.contains("*")) {
if (eventAttributeValue == null || eventAttributeValue.isEmpty()) {
return false;
}
final String regex = searchValue.replace("?", ".").replace("*", ".*");
final Pattern pattern = Pattern.compile(regex, Pattern.CASE_INSENSITIVE);
if (!pattern.matcher(eventAttributeValue).matches()) {
return false;
}
} else {
if (!searchValue.equalsIgnoreCase(eventAttributeValue)) {
return false;
}
}
} else {
// if FlowFileUUID, search parent & child UUID's also.
if (searchableField.equals(SearchableFields.FlowFileUUID)) {
if (searchValue.contains("?") || searchValue.contains("*")) {
final String regex = searchValue.replace("?", ".").replace("*", ".*");
final Pattern pattern = Pattern.compile(regex, Pattern.CASE_INSENSITIVE);
if (pattern.matcher(event.getFlowFileUuid()).matches()) {
continue;
}
boolean found = false;
for (final String uuid : event.getParentUuids()) {
if (pattern.matcher(uuid).matches()) {
found = true;
break;
}
}
for (final String uuid : event.getChildUuids()) {
if (pattern.matcher(uuid).matches()) {
found = true;
break;
}
}
if (found) {
continue;
}
} else if (event.getFlowFileUuid().equals(searchValue) || event.getParentUuids().contains(searchValue) || event.getChildUuids().contains(searchValue)) {
continue;
}
return false;
}
final Object fieldValue = getFieldValue(event, searchableField);
if (fieldValue == null) {
return false;
}
if (searchValue.contains("?") || searchValue.contains("*")) {
final String regex = searchValue.replace("?", ".").replace("*", ".*");
final Pattern pattern = Pattern.compile(regex, Pattern.CASE_INSENSITIVE);
if (!pattern.matcher(String.valueOf(fieldValue)).matches()) {
return false;
}
} else {
if (!searchValue.equalsIgnoreCase(String.valueOf(fieldValue))) {
return false;
}
}
}
}
return true;
}
};
}
private Object getFieldValue(final ProvenanceEventRecord record, final SearchableField field) {
if (SearchableFields.AlternateIdentifierURI.equals(field)) {
return record.getAlternateIdentifierUri();
}
if (SearchableFields.ComponentID.equals(field)) {
return record.getComponentId();
}
if (SearchableFields.Details.equals(field)) {
return record.getDetails();
}
if (SearchableFields.EventTime.equals(field)) {
return record.getEventTime();
}
if (SearchableFields.EventType.equals(field)) {
return record.getEventType();
}
if (SearchableFields.Filename.equals(field)) {
return record.getAttributes().get(CoreAttributes.FILENAME.key());
}
if (SearchableFields.FileSize.equals(field)) {
return record.getFileSize();
}
if (SearchableFields.FlowFileUUID.equals(field)) {
return record.getFlowFileUuid();
}
if (SearchableFields.LineageStartDate.equals(field)) {
return record.getLineageStartDate();
}
if (SearchableFields.Relationship.equals(field)) {
return record.getRelationship();
}
if (SearchableFields.TransitURI.equals(field)) {
return record.getTransitUri();
}
return null;
}
@Override
public QuerySubmission submitQuery(final Query query) {
if (query.getEndDate() != null && query.getStartDate() != null && query.getStartDate().getTime() > query.getEndDate().getTime()) {
throw new IllegalArgumentException("Query End Time cannot be before Query Start Time");
}
if (query.getSearchTerms().isEmpty() && query.getStartDate() == null && query.getEndDate() == null) {
final AsyncQuerySubmission result = new AsyncQuerySubmission(query, 1);
queryExecService.submit(new QueryRunnable(ringBuffer, createFilter(query), query.getMaxResults(), result));
querySubmissionMap.put(query.getIdentifier(), result);
return result;
}
final AsyncQuerySubmission result = new AsyncQuerySubmission(query, 1);
querySubmissionMap.put(query.getIdentifier(), result);
queryExecService.submit(new QueryRunnable(ringBuffer, createFilter(query), query.getMaxResults(), result));
return result;
}
@Override
public QuerySubmission retrieveQuerySubmission(final String queryIdentifier) {
return querySubmissionMap.get(queryIdentifier);
}
public Lineage computeLineage(final String flowFileUUID) throws IOException {
return computeLineage(Collections.<String>singleton(flowFileUUID), LineageComputationType.FLOWFILE_LINEAGE, null);
}
private Lineage computeLineage(final Collection<String> flowFileUuids, final LineageComputationType computationType, final Long eventId) throws IOException {
final AsyncLineageSubmission submission = submitLineageComputation(flowFileUuids, computationType, eventId);
final StandardLineageResult result = submission.getResult();
while (!result.isFinished()) {
try {
Thread.sleep(100L);
} catch (final InterruptedException ie) {
}
}
if (result.getError() != null) {
throw new IOException(result.getError());
}
return new FlowFileLineage(result.getNodes(), result.getEdges());
}
@Override
public AsyncLineageSubmission submitLineageComputation(final String flowFileUuid) {
return submitLineageComputation(Collections.singleton(flowFileUuid), LineageComputationType.FLOWFILE_LINEAGE, null);
}
@Override
public ComputeLineageSubmission retrieveLineageSubmission(String lineageIdentifier) {
return lineageSubmissionMap.get(lineageIdentifier);
}
public Lineage expandSpawnEventParents(String identifier) throws IOException {
throw new UnsupportedOperationException();
}
@Override
public List<StoredProvenanceEvent> getEvents(final List<StorageLocation> storageLocations) throws IOException {
final List<StoredProvenanceEvent> events = new ArrayList<>(storageLocations.size());
for ( final StorageLocation location : storageLocations ) {
if ( !(location instanceof EventIdLocation) ) {
throw new IllegalArgumentException("Illegal Storage Location");
}
final long id = ((EventIdLocation) location).getId();
final StoredProvenanceEvent event = getEvent(id);
if ( event != null ) {
events.add(event);
}
}
return events;
}
@Override
public StoredProvenanceEvent getEvent(final StorageLocation location) throws IOException {
if ( !(location instanceof EventIdLocation) ) {
throw new IllegalArgumentException("Illegal Storage Location");
}
final long id = ((EventIdLocation) location).getId();
return getEvent(id);
}
@Override
public Long getEarliestEventTime() throws IOException {
final List<StoredProvenanceEvent> events = getEvents(0L, 1);
if ( events.isEmpty() ) {
return null;
}
return events.get(0).getEventTime();
}
@Override
public ComputeLineageSubmission submitExpandParents(final long eventId) {
final ProvenanceEventRecord event = getEvent(eventId);
if (event == null) {
final AsyncLineageSubmission submission = new AsyncLineageSubmission(LineageComputationType.EXPAND_PARENTS, eventId, Collections.<String>emptyList(), 1);
lineageSubmissionMap.put(submission.getLineageIdentifier(), submission);
submission.getResult().update(Collections.<StoredProvenanceEvent>emptyList());
return submission;
}
switch (event.getEventType()) {
case JOIN:
case FORK:
case REPLAY:
case CLONE:
return submitLineageComputation(event.getParentUuids(), LineageComputationType.EXPAND_PARENTS, eventId);
default: {
final AsyncLineageSubmission submission = new AsyncLineageSubmission(LineageComputationType.EXPAND_PARENTS, eventId, Collections.<String>emptyList(), 1);
lineageSubmissionMap.put(submission.getLineageIdentifier(), submission);
submission.getResult().setError("Event ID " + eventId + " indicates an event of type " + event.getEventType() + " so its parents cannot be expanded");
return submission;
}
}
}
@Override
public ComputeLineageSubmission submitExpandChildren(final long eventId) {
final ProvenanceEventRecord event = getEvent(eventId);
if (event == null) {
final AsyncLineageSubmission submission = new AsyncLineageSubmission(LineageComputationType.EXPAND_CHILDREN, eventId, Collections.<String>emptyList(), 1);
lineageSubmissionMap.put(submission.getLineageIdentifier(), submission);
submission.getResult().update(Collections.<StoredProvenanceEvent>emptyList());
return submission;
}
switch (event.getEventType()) {
case JOIN:
case FORK:
case REPLAY:
case CLONE:
return submitLineageComputation(event.getChildUuids(), LineageComputationType.EXPAND_CHILDREN, eventId);
default: {
final AsyncLineageSubmission submission = new AsyncLineageSubmission(LineageComputationType.EXPAND_CHILDREN, eventId, Collections.<String>emptyList(), 1);
lineageSubmissionMap.put(submission.getLineageIdentifier(), submission);
submission.getResult().setError("Event ID " + eventId + " indicates an event of type " + event.getEventType() + " so its children cannot be expanded");
return submission;
}
}
}
private AsyncLineageSubmission submitLineageComputation(final Collection<String> flowFileUuids, final LineageComputationType computationType, final Long eventId) {
final AsyncLineageSubmission result = new AsyncLineageSubmission(computationType, eventId, flowFileUuids, 1);
lineageSubmissionMap.put(result.getLineageIdentifier(), result);
final Filter<StoredProvenanceEvent> filter = new Filter<StoredProvenanceEvent>() {
@Override
public boolean select(final StoredProvenanceEvent event) {
if (flowFileUuids.contains(event.getFlowFileUuid())) {
return true;
}
for (final String parentId : event.getParentUuids()) {
if (flowFileUuids.contains(parentId)) {
return true;
}
}
for (final String childId : event.getChildUuids()) {
if (flowFileUuids.contains(childId)) {
return true;
}
}
return false;
}
};
queryExecService.submit(new ComputeLineageRunnable(ringBuffer, filter, result));
return result;
}
private static class QueryRunnable implements Runnable {
private final RingBuffer<StoredProvenanceEvent> ringBuffer;
private final Filter<ProvenanceEventRecord> filter;
private final AsyncQuerySubmission submission;
private final int maxRecords;
public QueryRunnable(final RingBuffer<StoredProvenanceEvent> ringBuffer, final Filter<ProvenanceEventRecord> filter, final int maxRecords, final AsyncQuerySubmission submission) {
this.ringBuffer = ringBuffer;
this.filter = filter;
this.submission = submission;
this.maxRecords = maxRecords;
}
@Override
public void run() {
// Retrieve the most recent results and count the total number of matches
final IntegerHolder matchingCount = new IntegerHolder(0);
final List<StoredProvenanceEvent> matchingRecords = new ArrayList<>(maxRecords);
ringBuffer.forEach(new ForEachEvaluator<StoredProvenanceEvent>() {
@Override
public boolean evaluate(final StoredProvenanceEvent record) {
if (filter.select(record)) {
if (matchingCount.incrementAndGet() <= maxRecords) {
matchingRecords.add(record);
}
}
return true;
}
}, IterationDirection.BACKWARD);
submission.getResult().update(matchingRecords, matchingCount.get());
}
}
private static class ComputeLineageRunnable implements Runnable {
private final RingBuffer<StoredProvenanceEvent> ringBuffer;
private final Filter<StoredProvenanceEvent> filter;
private final AsyncLineageSubmission submission;
public ComputeLineageRunnable(final RingBuffer<StoredProvenanceEvent> ringBuffer, final Filter<StoredProvenanceEvent> filter, final AsyncLineageSubmission submission) {
this.ringBuffer = ringBuffer;
this.filter = filter;
this.submission = submission;
}
@Override
@SuppressWarnings({ "unchecked", "rawtypes" })
public void run() {
final List<StoredProvenanceEvent> records = ringBuffer.getSelectedElements(filter);
submission.getResult().update((List) records);
}
}
private class RemoveExpiredQueryResults implements Runnable {
@Override
public void run() {
final Date now = new Date();
final Iterator<Map.Entry<String, AsyncQuerySubmission>> queryIterator = querySubmissionMap.entrySet().iterator();
while (queryIterator.hasNext()) {
final Map.Entry<String, AsyncQuerySubmission> entry = queryIterator.next();
final StandardQueryResult result = entry.getValue().getResult();
if (result.isFinished() && result.getExpiration().before(now)) {
querySubmissionMap.remove(entry.getKey());
}
}
final Iterator<Map.Entry<String, AsyncLineageSubmission>> lineageIterator = lineageSubmissionMap.entrySet().iterator();
while (lineageIterator.hasNext()) {
final Map.Entry<String, AsyncLineageSubmission> entry = lineageIterator.next();
final StandardLineageResult result = entry.getValue().getResult();
if (result.isFinished() && result.getExpiration().before(now)) {
querySubmissionMap.remove(entry.getKey());
}
}
}
}
@Override
public ProvenanceQuerySubmission submitQuery(final String query) {
throw new UnsupportedOperationException();
/*
if ( provQuerySubmissionMap.size() > MAX_CONCURRENT_QUERIES ) {
final List<String> toRemove = new ArrayList<>();
final Date now = new Date();
for ( final Map.Entry<String, ProvenanceQuerySubmission> entry : provQuerySubmissionMap.entrySet() ) {
if ( entry.getValue().getResult().getExpiration().after(now) ) {
toRemove.add(entry.getKey());
}
}
for ( final String id : toRemove ) {
provQuerySubmissionMap.remove(id);
}
if ( provQuerySubmissionMap.size() > MAX_CONCURRENT_QUERIES ) {
throw new IllegalStateException("There are already " + MAX_CONCURRENT_QUERIES + " outstanding queries for this Provenance Repository; cannot perform any more queries until the existing queries are expired or canceled");
}
}
final Iterator<StoredProvenanceEvent> eventItr = ringBuffer.asList().iterator();
final ProvenanceResultSet rs = ProvenanceQuery.compile(query, getSearchableFields(), getSearchableAttributes()).evaluate(eventItr);
final Date submissionTime = new Date();
final String queryId = UUID.randomUUID().toString();
final Date expiration = new Date(System.currentTimeMillis() + TimeUnit.MINUTES.toNanos(10));
final ProvenanceQuerySubmission submission = new ProvenanceQuerySubmission() {
private final AtomicBoolean canceled = new AtomicBoolean(false);
@Override
public String getQuery() {
return query;
}
@Override
public ProvenanceQueryResult getResult() {
return new ProvenanceQueryResult() {
@Override
public ProvenanceResultSet getResultSet() {
return rs;
}
@Override
public Date getExpiration() {
return expiration;
}
@Override
public String getError() {
return null;
}
@Override
public int getPercentComplete() {
return 100;
}
@Override
public boolean isFinished() {
return true;
}
};
}
@Override
public Date getSubmissionTime() {
return submissionTime;
}
@Override
public String getQueryIdentifier() {
return queryId;
}
@Override
public void cancel() {
canceled.set(true);
provQuerySubmissionMap.remove(queryId);
}
@Override
public boolean isCanceled() {
return canceled.get();
}
};
provQuerySubmissionMap.putIfAbsent(queryId, submission);
return submission;
*/
}
@Override
public ProvenanceQuerySubmission retrieveProvenanceQuerySubmission(final String queryIdentifier) {
throw new UnsupportedOperationException();
/*
return provQuerySubmissionMap.get(queryIdentifier);
*/
}
}