blob: af5fe5079f947a798e877a85bb3d9517ed454b00 [file] [log] [blame]
/*
* Licensed to the Apache Software Foundation (ASF) under one or more
* contributor license agreements. See the NOTICE file distributed with
* this work for additional information regarding copyright ownership.
* The ASF licenses this file to You under the Apache License, Version 2.0
* (the "License"); you may not use this file except in compliance with
* the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/
package org.apache.nifi.provenance.lucene;
import java.io.File;
import java.io.FileNotFoundException;
import java.io.IOException;
import java.nio.file.Path;
import java.util.ArrayList;
import java.util.Collection;
import java.util.Collections;
import java.util.LinkedHashSet;
import java.util.List;
import java.util.Set;
import java.util.concurrent.atomic.AtomicInteger;
import org.apache.nifi.provenance.ProvenanceEventRecord;
import org.apache.nifi.provenance.StandardProvenanceEventRecord;
import org.apache.nifi.provenance.serialization.RecordReader;
import org.apache.nifi.provenance.serialization.RecordReaders;
import org.apache.lucene.document.Document;
import org.apache.lucene.index.IndexReader;
import org.apache.lucene.search.ScoreDoc;
import org.apache.lucene.search.TopDocs;
public class DocsReader {
public DocsReader(final List<File> storageDirectories) {
}
public Set<ProvenanceEventRecord> read(final TopDocs topDocs, final IndexReader indexReader, final Collection<Path> allProvenanceLogFiles, final AtomicInteger retrievalCount, final int maxResults) throws IOException {
if (retrievalCount.get() >= maxResults) {
return Collections.emptySet();
}
final List<Document> docs = new ArrayList<>();
for (ScoreDoc scoreDoc : topDocs.scoreDocs) {
final int docId = scoreDoc.doc;
final Document d = indexReader.document(docId);
docs.add(d);
}
return read(docs, allProvenanceLogFiles, retrievalCount, maxResults);
}
public Set<ProvenanceEventRecord> read(final List<Document> docs, final Collection<Path> allProvenanceLogFiles, final AtomicInteger retrievalCount, final int maxResults) throws IOException {
LuceneUtil.sortDocsForRetrieval(docs);
RecordReader reader = null;
String lastStorageFilename = null;
long lastByteOffset = 0L;
final Set<ProvenanceEventRecord> matchingRecords = new LinkedHashSet<>();
try {
for (final Document d : docs) {
final String storageFilename = d.getField(FieldNames.STORAGE_FILENAME).stringValue();
final long byteOffset = d.getField(FieldNames.STORAGE_FILE_OFFSET).numericValue().longValue();
try {
if (reader != null && storageFilename.equals(lastStorageFilename) && byteOffset > lastByteOffset) {
// Still the same file and the offset is downstream.
try {
reader.skipTo(byteOffset);
final StandardProvenanceEventRecord record = reader.nextRecord();
matchingRecords.add(record);
if (retrievalCount.incrementAndGet() >= maxResults) {
break;
}
} catch (final IOException e) {
throw new FileNotFoundException("Could not find Provenance Log File with basename " + storageFilename + " in the Provenance Repository");
}
} else {
if (reader != null) {
reader.close();
}
final List<File> potentialFiles = LuceneUtil.getProvenanceLogFiles(storageFilename, allProvenanceLogFiles);
if (potentialFiles.isEmpty()) {
throw new FileNotFoundException("Could not find Provenance Log File with basename " + storageFilename + " in the Provenance Repository");
}
if (potentialFiles.size() > 1) {
throw new FileNotFoundException("Found multiple Provenance Log Files with basename " + storageFilename + " in the Provenance Repository");
}
for (final File file : potentialFiles) {
reader = RecordReaders.newRecordReader(file, allProvenanceLogFiles);
try {
reader.skip(byteOffset);
final StandardProvenanceEventRecord record = reader.nextRecord();
matchingRecords.add(record);
if (retrievalCount.incrementAndGet() >= maxResults) {
break;
}
} catch (final IOException e) {
throw new IOException("Failed to retrieve record from Provenance File " + file + " due to " + e, e);
}
}
}
} finally {
lastStorageFilename = storageFilename;
lastByteOffset = byteOffset;
}
}
} finally {
if (reader != null) {
reader.close();
}
}
return matchingRecords;
}
}