blob: daf12cff17a96dc45f849118d4a1f502c17ecfcb [file] [log] [blame]
/*
* Licensed to the Apache Software Foundation (ASF) under one or more
* contributor license agreements. See the NOTICE file distributed with
* this work for additional information regarding copyright ownership.
* The ASF licenses this file to You under the Apache License, Version 2.0
* (the "License"); you may not use this file except in compliance with
* the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/
package org.apache.nifi.provenance;
import java.io.ByteArrayInputStream;
import java.io.DataInputStream;
import java.io.File;
import java.io.IOException;
import java.io.InputStream;
import java.util.Collection;
import java.util.Optional;
import org.apache.nifi.provenance.schema.LookupTableEventRecord;
import org.apache.nifi.provenance.toc.TocReader;
import org.apache.nifi.repository.encryption.RepositoryEncryptor;
import org.apache.nifi.repository.schema.Record;
import org.apache.nifi.stream.io.LimitingInputStream;
import org.apache.nifi.stream.io.StreamUtils;
public class EncryptedSchemaRecordReader extends EventIdFirstSchemaRecordReader {
private RepositoryEncryptor<byte[], byte[]> repositoryEncryptor;
public static final String SERIALIZATION_NAME = "EncryptedSchemaRecordWriter";
public EncryptedSchemaRecordReader(final InputStream inputStream, final String filename, final TocReader tocReader, final int maxAttributeChars,
final RepositoryEncryptor<byte[], byte[]> repositoryEncryptor) throws IOException {
super(inputStream, filename, tocReader, maxAttributeChars);
this.repositoryEncryptor = repositoryEncryptor;
}
@Override
protected StandardProvenanceEventRecord nextRecord(final DataInputStream in, final int serializationVersion) throws IOException {
verifySerializationVersion(serializationVersion);
final long byteOffset = getBytesConsumed();
final long eventId = in.readInt() + getFirstEventId();
final int recordLength = in.readInt();
return readRecord(in, eventId, byteOffset, recordLength);
}
private StandardProvenanceEventRecord readRecord(final DataInputStream inputStream, final long eventId, final long startOffset, final int recordLength) throws IOException {
final InputStream limitedIn = new LimitingInputStream(inputStream, recordLength);
final byte[] encryptedSerializedBytes = new byte[recordLength];
final DataInputStream encryptedInputStream = new DataInputStream(limitedIn);
encryptedInputStream.readFully(encryptedSerializedBytes);
final byte[] plainSerializedBytes = repositoryEncryptor.decrypt(encryptedSerializedBytes, Long.toString(eventId));
final InputStream plainStream = new ByteArrayInputStream(plainSerializedBytes);
final Record eventRecord = getRecordReader().readRecord(plainStream);
if (eventRecord == null) {
return null;
}
final StandardProvenanceEventRecord deserializedEvent = LookupTableEventRecord.getEvent(eventRecord, getFilename(), startOffset, getMaxAttributeLength(),
getFirstEventId(), getSystemTimeOffset(), getComponentIds(), getComponentTypes(), getQueueIds(), getEventTypes());
deserializedEvent.setEventId(eventId);
return deserializedEvent;
}
// Copied from EventIdFirstSchemaRecordReader to force local/overridden readRecord()
@Override
protected Optional<StandardProvenanceEventRecord> readToEvent(final long eventId, final DataInputStream dis, final int serializationVersion) throws IOException {
verifySerializationVersion(serializationVersion);
while (isData(dis)) {
final long startOffset = getBytesConsumed();
final long id = dis.readInt() + getFirstEventId();
final int recordLength = dis.readInt();
if (id >= eventId) {
final StandardProvenanceEventRecord event = readRecord(dis, id, startOffset, recordLength);
return Optional.ofNullable(event);
} else {
// This is not the record we want. Skip over it instead of deserializing it.
StreamUtils.skip(dis, recordLength);
}
}
return Optional.empty();
}
@Override
public String toString() {
return getDescription();
}
private String getDescription() {
try {
return "EncryptedSchemaRecordReader, toc: " + getTocReader().getFile().getAbsolutePath() + ", journal: " + getFilename();
} catch (Exception e) {
return "EncryptedSchemaRecordReader@" + Integer.toHexString(this.hashCode());
}
}
/**
* Sets the encryptor to use (necessary because the
* {@link org.apache.nifi.provenance.serialization.RecordReaders#newRecordReader(File, Collection, int)} method doesn't accept the encryptor.
*
* @param repositoryEncryptor Repository Encryptor
*/
void setRepositoryEncryptor(final RepositoryEncryptor<byte[], byte[]> repositoryEncryptor) {
this.repositoryEncryptor = repositoryEncryptor;
}
}