blob: b008709ee3107e125fe7b4189dd2e079c99b0420 [file] [log] [blame]
/*
* Licensed to the Apache Software Foundation (ASF) under one or more
* contributor license agreements. See the NOTICE file distributed with
* this work for additional information regarding copyright ownership.
* The ASF licenses this file to You under the Apache License, Version 2.0
* (the "License"); you may not use this file except in compliance with
* the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/
package org.apache.nifi.provenance.serialization;
import org.apache.nifi.provenance.ByteArraySchemaRecordReader;
import org.apache.nifi.provenance.ByteArraySchemaRecordWriter;
import org.apache.nifi.provenance.EncryptedSchemaRecordReader;
import org.apache.nifi.provenance.EventIdFirstSchemaRecordReader;
import org.apache.nifi.provenance.EventIdFirstSchemaRecordWriter;
import org.apache.nifi.provenance.StandardRecordReader;
import org.apache.nifi.provenance.lucene.LuceneUtil;
import org.apache.nifi.provenance.toc.StandardTocReader;
import org.apache.nifi.provenance.toc.TocReader;
import org.apache.nifi.provenance.toc.TocUtil;
import java.io.BufferedInputStream;
import java.io.DataInputStream;
import java.io.EOFException;
import java.io.File;
import java.io.FileInputStream;
import java.io.FileNotFoundException;
import java.io.IOException;
import java.io.InputStream;
import java.nio.file.Path;
import java.util.Collection;
import java.util.zip.GZIPInputStream;
public class RecordReaders {
/**
* Creates a new Record Reader that is capable of reading Provenance Event Journals
*
* @param file the Provenance Event Journal to read data from
* @param provenanceLogFiles collection of all provenance journal files
* @param maxAttributeChars the maximum number of characters to retrieve for any one attribute. This allows us to avoid
* issues where a FlowFile has an extremely large attribute and reading events
* for that FlowFile results in loading that attribute into memory many times, exhausting the Java Heap
* @return a Record Reader capable of reading Provenance Event Journals
* @throws IOException if unable to create a Record Reader for the given file
*/
public static RecordReader newRecordReader(File file, final Collection<Path> provenanceLogFiles, final int maxAttributeChars) throws IOException {
final File originalFile = file;
InputStream fis = null;
try {
if (!file.exists()) {
if (provenanceLogFiles != null) {
final String baseName = LuceneUtil.substringBefore(file.getName(), ".") + ".";
for (final Path path : provenanceLogFiles) {
if (path.toFile().getName().startsWith(baseName)) {
file = path.toFile();
break;
}
}
}
}
if (file.exists()) {
try {
fis = new FileInputStream(file);
} catch (final FileNotFoundException fnfe) {
fis = null;
}
}
String filename = file.getName();
openStream:
while (fis == null) {
final File dir = file.getParentFile();
final String baseName = LuceneUtil.substringBefore(file.getName(), ".prov");
// depending on which rollover actions have occurred, we could have 2 possibilities for the
// filename that we need. The majority of the time, we will use the extension ".prov.gz"
// because most often we are compressing on rollover and most often we have already finished
// compressing by the time that we are querying the data.
for (final String extension : new String[]{".prov.gz", ".prov"}) {
file = new File(dir, baseName + extension);
if (file.exists()) {
try {
fis = new FileInputStream(file);
filename = baseName + extension;
break openStream;
} catch (final FileNotFoundException fnfe) {
// file was modified by a RolloverAction after we verified that it exists but before we could
// create an InputStream for it. Start over.
fis = null;
continue openStream;
}
}
}
break;
}
if (fis == null) {
throw new FileNotFoundException("Unable to locate file " + originalFile);
}
final File tocFile = TocUtil.getTocFile(file);
final InputStream bufferedInStream = new BufferedInputStream(fis);
final String serializationName;
try {
bufferedInStream.mark(4096);
final InputStream in = filename.endsWith(".gz") ? new GZIPInputStream(bufferedInStream) : bufferedInStream;
final DataInputStream dis = new DataInputStream(in);
serializationName = dis.readUTF();
bufferedInStream.reset();
} catch (final EOFException eof) {
fis.close();
return new EmptyRecordReader();
}
switch (serializationName) {
case StandardRecordReader.SERIALIZATION_NAME: {
if (tocFile.exists()) {
final TocReader tocReader = new StandardTocReader(tocFile);
return new StandardRecordReader(bufferedInStream, filename, tocReader, maxAttributeChars);
} else {
return new StandardRecordReader(bufferedInStream, filename, maxAttributeChars);
}
}
case ByteArraySchemaRecordWriter.SERIALIZATION_NAME: {
if (tocFile.exists()) {
final TocReader tocReader = new StandardTocReader(tocFile);
return new ByteArraySchemaRecordReader(bufferedInStream, filename, tocReader, maxAttributeChars);
} else {
return new ByteArraySchemaRecordReader(bufferedInStream, filename, maxAttributeChars);
}
}
case EventIdFirstSchemaRecordWriter.SERIALIZATION_NAME: {
if (!tocFile.exists()) {
throw new FileNotFoundException("Cannot create TOC Reader because the file " + tocFile + " does not exist");
}
final TocReader tocReader = new StandardTocReader(tocFile);
return new EventIdFirstSchemaRecordReader(bufferedInStream, filename, tocReader, maxAttributeChars);
}
case EncryptedSchemaRecordReader.SERIALIZATION_NAME: {
if (!tocFile.exists()) {
throw new FileNotFoundException("Cannot create TOC Reader because the file " + tocFile + " does not exist");
}
final TocReader tocReader = new StandardTocReader(tocFile);
// Return a reader with no eventEncryptor because this method contract cannot change, then inject the encryptor from the writer in the calling method
return new EncryptedSchemaRecordReader(bufferedInStream, filename, tocReader, maxAttributeChars, null);
}
default: {
throw new IOException("Unable to read data from file " + file + " because the file was written using an unknown Serializer: " + serializationName);
}
}
} catch (final IOException ioe) {
if (fis != null) {
try {
fis.close();
} catch (final IOException inner) {
ioe.addSuppressed(inner);
}
}
throw ioe;
}
}
}