blob: 4c14c0569632fad82bcd721ee406f8d8af47b62a [file] [log] [blame]
/*
* Licensed to the Apache Software Foundation (ASF) under one or more
* contributor license agreements. See the NOTICE file distributed with
* this work for additional information regarding copyright ownership.
* The ASF licenses this file to You under the Apache License, Version 2.0
* (the "License"); you may not use this file except in compliance with
* the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/
package org.apache.nifi.provenance.journaling.index;
import java.io.File;
import java.io.IOException;
import java.util.ArrayList;
import java.util.Collection;
import java.util.Collections;
import java.util.Date;
import java.util.HashSet;
import java.util.List;
import java.util.Map;
import java.util.Set;
import java.util.concurrent.TimeUnit;
import java.util.concurrent.atomic.AtomicLong;
import org.apache.lucene.analysis.Analyzer;
import org.apache.lucene.analysis.standard.StandardAnalyzer;
import org.apache.lucene.document.Document;
import org.apache.lucene.document.Field.Store;
import org.apache.lucene.document.LongField;
import org.apache.lucene.document.StringField;
import org.apache.lucene.index.ConcurrentMergeScheduler;
import org.apache.lucene.index.DirectoryReader;
import org.apache.lucene.index.IndexWriter;
import org.apache.lucene.index.IndexWriterConfig;
import org.apache.lucene.index.Term;
import org.apache.lucene.search.BooleanClause;
import org.apache.lucene.search.BooleanClause.Occur;
import org.apache.lucene.search.BooleanQuery;
import org.apache.lucene.search.NumericRangeQuery;
import org.apache.lucene.search.Query;
import org.apache.lucene.search.TermQuery;
import org.apache.lucene.store.Directory;
import org.apache.lucene.store.FSDirectory;
import org.apache.lucene.util.Version;
import org.apache.nifi.flowfile.attributes.CoreAttributes;
import org.apache.nifi.provenance.ProvenanceEventRepository;
import org.apache.nifi.provenance.ProvenanceEventType;
import org.apache.nifi.provenance.SearchableFields;
import org.apache.nifi.provenance.journaling.JournaledProvenanceEvent;
import org.apache.nifi.provenance.journaling.JournaledStorageLocation;
import org.apache.nifi.provenance.journaling.config.JournalingRepositoryConfig;
import org.apache.nifi.provenance.search.SearchableField;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
public class LuceneIndexWriter implements EventIndexWriter {
private static final Store STORE_FIELDS = Store.YES;
private static final Logger logger = LoggerFactory.getLogger(LuceneIndexWriter.class);
private final Set<SearchableField> nonAttributeSearchableFields;
private final Set<SearchableField> attributeSearchableFields;
private final File indexDir;
private final ProvenanceEventRepository repo;
private final Directory directory;
private final Analyzer analyzer;
private final IndexWriter indexWriter;
private final AtomicLong indexMaxId = new AtomicLong(-1L);
public LuceneIndexWriter(final ProvenanceEventRepository repo, final File indexDir, final JournalingRepositoryConfig config) throws IOException {
this.repo = repo;
this.indexDir = indexDir;
attributeSearchableFields = Collections.unmodifiableSet(new HashSet<>(config.getSearchableAttributes()));
nonAttributeSearchableFields = Collections.unmodifiableSet(new HashSet<>(config.getSearchableFields()));
directory = FSDirectory.open(indexDir);
analyzer = new StandardAnalyzer();
final IndexWriterConfig writerConfig = new IndexWriterConfig(Version.LATEST, analyzer);
// Increase number of concurrent merges since we are on SSD:
final ConcurrentMergeScheduler cms = new ConcurrentMergeScheduler();
writerConfig.setMergeScheduler(cms);
final int mergeThreads = Math.max(2, Math.min(4, config.getWorkerThreadPoolSize() / 2));
cms.setMaxMergesAndThreads(mergeThreads, mergeThreads);
indexWriter = new IndexWriter(directory, writerConfig);
}
public EventIndexSearcher newIndexSearcher() throws IOException {
logger.trace("Creating index searcher for {}", indexWriter);
final DirectoryReader reader = DirectoryReader.open(indexWriter, false);
return new LuceneIndexSearcher(repo, reader, indexDir);
}
@Override
public void close() throws IOException {
IOException suppressed = null;
try {
indexWriter.close();
} catch (final IOException ioe) {
suppressed = ioe;
}
analyzer.close();
try {
directory.close();
} catch (final IOException ioe) {
if ( suppressed != null ) {
ioe.addSuppressed(suppressed);
}
throw ioe;
}
}
private void addField(final Document doc, final SearchableField field, final String value, final Store store) {
if (value == null || (!nonAttributeSearchableFields.contains(field) && !field.isAttribute())) {
return;
}
doc.add(new StringField(field.getSearchableFieldName(), value.toLowerCase(), store));
}
@Override
public void index(final Collection<JournaledProvenanceEvent> events) throws IOException {
long maxId = this.indexMaxId.get();
final long startNanos = System.nanoTime();
final List<Document> documents = new ArrayList<>(events.size());
for ( final JournaledProvenanceEvent event : events ) {
maxId = event.getEventId();
final Map<String, String> attributes = event.getAttributes();
final Document doc = new Document();
addField(doc, SearchableFields.FlowFileUUID, event.getFlowFileUuid(), STORE_FIELDS);
addField(doc, SearchableFields.Filename, attributes.get(CoreAttributes.FILENAME.key()), STORE_FIELDS);
addField(doc, SearchableFields.ComponentID, event.getComponentId(), STORE_FIELDS);
addField(doc, SearchableFields.ComponentType, event.getComponentType(), STORE_FIELDS);
addField(doc, SearchableFields.AlternateIdentifierURI, event.getAlternateIdentifierUri(), STORE_FIELDS);
addField(doc, SearchableFields.EventType, event.getEventType().name(), STORE_FIELDS);
addField(doc, SearchableFields.Relationship, event.getRelationship(), STORE_FIELDS);
addField(doc, SearchableFields.Details, event.getDetails(), STORE_FIELDS);
addField(doc, SearchableFields.ContentClaimSection, event.getContentClaimSection(), STORE_FIELDS);
addField(doc, SearchableFields.ContentClaimContainer, event.getContentClaimContainer(), STORE_FIELDS);
addField(doc, SearchableFields.ContentClaimIdentifier, event.getContentClaimIdentifier(), STORE_FIELDS);
addField(doc, SearchableFields.SourceQueueIdentifier, event.getSourceQueueIdentifier(), STORE_FIELDS);
if (nonAttributeSearchableFields.contains(SearchableFields.TransitURI)) {
addField(doc, SearchableFields.TransitURI, event.getTransitUri(), STORE_FIELDS);
}
for (final SearchableField searchableField : attributeSearchableFields) {
addField(doc, searchableField, attributes.get(searchableField.getSearchableFieldName()), STORE_FIELDS);
}
// Index the fields that we always index (unless there's nothing else to index at all)
doc.add(new LongField(SearchableFields.LineageStartDate.getSearchableFieldName(), event.getLineageStartDate(), STORE_FIELDS));
doc.add(new LongField(SearchableFields.EventTime.getSearchableFieldName(), event.getEventTime(), STORE_FIELDS));
doc.add(new LongField(SearchableFields.FileSize.getSearchableFieldName(), event.getFileSize(), STORE_FIELDS));
final JournaledStorageLocation location = event.getStorageLocation();
doc.add(new StringField(IndexedFieldNames.CONTAINER_NAME, location.getContainerName(), Store.YES));
doc.add(new StringField(IndexedFieldNames.SECTION_NAME, location.getSectionName(), Store.YES));
doc.add(new LongField(IndexedFieldNames.JOURNAL_ID, location.getJournalId(), Store.YES));
doc.add(new LongField(IndexedFieldNames.BLOCK_INDEX, location.getBlockIndex(), Store.YES));
doc.add(new LongField(IndexedFieldNames.EVENT_ID, location.getEventId(), Store.YES));
if ( nonAttributeSearchableFields.contains(SearchableFields.LineageIdentifier) ) {
for (final String lineageIdentifier : event.getLineageIdentifiers()) {
addField(doc, SearchableFields.LineageIdentifier, lineageIdentifier, STORE_FIELDS);
}
}
// If it's event is a FORK, or JOIN, add the FlowFileUUID for all child/parent UUIDs.
if (event.getEventType() == ProvenanceEventType.FORK || event.getEventType() == ProvenanceEventType.CLONE || event.getEventType() == ProvenanceEventType.REPLAY) {
for (final String uuid : event.getChildUuids()) {
if (!uuid.equals(event.getFlowFileUuid())) {
addField(doc, SearchableFields.FlowFileUUID, uuid, STORE_FIELDS);
}
}
} else if (event.getEventType() == ProvenanceEventType.JOIN) {
for (final String uuid : event.getParentUuids()) {
if (!uuid.equals(event.getFlowFileUuid())) {
addField(doc, SearchableFields.FlowFileUUID, uuid, STORE_FIELDS);
}
}
} else if (event.getEventType() == ProvenanceEventType.RECEIVE && event.getSourceSystemFlowFileIdentifier() != null) {
// If we get a receive with a Source System FlowFile Identifier, we add another Document that shows the UUID
// that the Source System uses to refer to the data.
final String sourceIdentifier = event.getSourceSystemFlowFileIdentifier();
final String sourceFlowFileUUID;
final int lastColon = sourceIdentifier.lastIndexOf(":");
if (lastColon > -1 && lastColon < sourceIdentifier.length() - 2) {
sourceFlowFileUUID = sourceIdentifier.substring(lastColon + 1);
} else {
sourceFlowFileUUID = null;
}
if (sourceFlowFileUUID != null) {
addField(doc, SearchableFields.FlowFileUUID, sourceFlowFileUUID, STORE_FIELDS);
}
}
documents.add(doc);
}
indexWriter.addDocuments(documents);
// Update the index's max id
boolean updated = false;
do {
long curMax = indexMaxId.get();
if ( maxId > curMax ) {
updated = indexMaxId.compareAndSet(curMax, maxId);
} else {
updated = true;
}
} while (!updated);
final long millis = TimeUnit.NANOSECONDS.toMillis(System.nanoTime() - startNanos);
logger.debug("Indexed {} events in {} millis with {}", events.size(), millis, this);
}
@Override
public void delete(final String containerName, final String section, final Long journalId) throws IOException {
final BooleanQuery query = new BooleanQuery();
query.add(new BooleanClause(new TermQuery(new Term(IndexedFieldNames.CONTAINER_NAME, containerName)), Occur.MUST));
query.add(new BooleanClause(new TermQuery(new Term(IndexedFieldNames.SECTION_NAME, section)), Occur.MUST));
query.add(NumericRangeQuery.newLongRange(IndexedFieldNames.JOURNAL_ID, journalId, journalId, true, true), Occur.MUST);
final long start = System.nanoTime();
indexWriter.deleteDocuments(query);
final long millis = TimeUnit.NANOSECONDS.toMillis(System.nanoTime() - start);
logger.info("Deleted events from {} that matched container={}, section={}, journal={} in {} millis", indexWriter, containerName, section, journalId, millis);
}
@Override
public void deleteEventsBefore(final String containerName, final String section, final Long journalId) throws IOException {
final BooleanQuery query = new BooleanQuery();
query.add(new BooleanClause(new TermQuery(new Term(IndexedFieldNames.CONTAINER_NAME, containerName)), Occur.MUST));
query.add(new BooleanClause(new TermQuery(new Term(IndexedFieldNames.SECTION_NAME, section)), Occur.MUST));
query.add(NumericRangeQuery.newLongRange(IndexedFieldNames.JOURNAL_ID, 0L, journalId, true, false), Occur.MUST);
final long start = System.nanoTime();
indexWriter.deleteDocuments(query);
final long millis = TimeUnit.NANOSECONDS.toMillis(System.nanoTime() - start);
logger.info("Deleted events from {} that matched container={}, section={}, journal less than {} in {} millis", indexWriter, containerName, section, journalId, millis);
}
@Override
public void deleteOldEvents(final long earliestEventTimeToDelete) throws IOException {
final Query query = NumericRangeQuery.newLongRange(SearchableFields.EventTime.getSearchableFieldName(), 0L, earliestEventTimeToDelete, true, true);
final long start = System.nanoTime();
indexWriter.deleteDocuments(query);
final long millis = TimeUnit.NANOSECONDS.toMillis(System.nanoTime() - start);
logger.info("Deleted events from {} that ocurred before {}; deletion took {} millis", this, new Date(earliestEventTimeToDelete), millis);
}
@Override
public void sync() throws IOException {
final long start = System.nanoTime();
indexWriter.commit();
final long millis = TimeUnit.NANOSECONDS.toMillis(System.nanoTime() - start);
logger.info("Successfully sync'ed {} in {} millis", this, millis);
}
@Override
public String toString() {
return "LuceneIndexWriter[indexDir=" + indexDir + "]";
}
}