blob: 5983a1cc6f4f7feb6341f3bed6eadaeafe4cf764 [file] [log] [blame]
/*
* Licensed to the Apache Software Foundation (ASF) under one or more
* contributor license agreements. See the NOTICE file distributed with
* this work for additional information regarding copyright ownership.
* The ASF licenses this file to You under the Apache License, Version 2.0
* (the "License"); you may not use this file except in compliance with
* the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/
package org.apache.nifi.controller.repository;
import java.io.ByteArrayInputStream;
import java.io.ByteArrayOutputStream;
import java.io.DataInputStream;
import java.io.DataOutputStream;
import java.io.EOFException;
import java.io.IOException;
import java.util.Map;
import java.util.Objects;
import org.apache.nifi.repository.encryption.AesGcmByteArrayRepositoryEncryptor;
import org.apache.nifi.repository.encryption.RepositoryEncryptor;
import org.apache.nifi.repository.encryption.configuration.EncryptedRepositoryType;
import org.apache.nifi.repository.encryption.configuration.EncryptionMetadataHeader;
import org.apache.nifi.repository.encryption.configuration.kms.RepositoryKeyProviderFactory;
import org.apache.nifi.repository.encryption.configuration.kms.StandardRepositoryKeyProviderFactory;
import org.apache.nifi.security.kms.KeyProvider;
import org.apache.nifi.stream.io.StreamUtils;
import org.apache.nifi.util.NiFiProperties;
import org.wali.SerDe;
import org.wali.UpdateType;
/**
* This class is an implementation of the {@link SerDe} interface which provides transparent
* encryption/decryption of flowfile record data during file system interaction. As of Apache NiFi 1.11.0
* (January 2020), this implementation is considered <a href="https://nifi.apache.org/docs/nifi-docs/html/user-guide.html#experimental-warning">*experimental*</a>. For further details, review the
* <a href="https://nifi.apache.org/docs/nifi-docs/html/user-guide.html#encrypted-flowfile">Apache NiFi User Guide -
* Encrypted FlowFile Repository</a> and
* <a href="https://nifi.apache.org/docs/nifi-docs/html/administration-guide.html#encrypted-flowfile-repository-properties">Apache NiFi Admin Guide - Encrypted FlowFile
* Repository Properties</a>.
*/
public class EncryptedSchemaRepositoryRecordSerde implements SerDe<SerializedRepositoryRecord> {
private final SerDe<SerializedRepositoryRecord> wrappedSerDe;
private final RepositoryEncryptor<byte[], byte[]> encryptor;
private final String keyId;
/**
* Creates an instance of the serializer/deserializer which wraps another SerDe instance but transparently encrypts/decrypts the data before/after writing/reading from the streams.
*
* @param wrappedSerDe the wrapped SerDe instance which performs the object <-> bytes (de)serialization
* @param niFiProperties the configuration values necessary to encrypt/decrypt the data
*/
public EncryptedSchemaRepositoryRecordSerde(final SerDe<SerializedRepositoryRecord> wrappedSerDe, final NiFiProperties niFiProperties) {
this.wrappedSerDe = Objects.requireNonNull(wrappedSerDe, "Wrapped SerDe required");
final RepositoryKeyProviderFactory repositoryKeyProviderFactory = new StandardRepositoryKeyProviderFactory();
final KeyProvider keyProvider = repositoryKeyProviderFactory.getKeyProvider(EncryptedRepositoryType.FLOWFILE, niFiProperties);
this.encryptor = new AesGcmByteArrayRepositoryEncryptor(keyProvider, EncryptionMetadataHeader.FLOWFILE);
this.keyId = niFiProperties.getFlowFileRepoEncryptionKeyId();
}
@Override
public void writeHeader(final DataOutputStream out) throws IOException {
wrappedSerDe.writeHeader(out);
}
@Override
public void readHeader(final DataInputStream in) throws IOException {
wrappedSerDe.readHeader(in);
}
/**
* <p>
* Serializes an Edit Record to the log via the given
* {@link DataOutputStream}.
* </p>
*
* @param previousRecordState previous state
* @param newRecordState new state
* @param out stream to write to
* @throws IOException if fail during write
* @deprecated it is not beneficial to serialize the deltas, so this method just passes through to
* {@link #serializeRecord(SerializedRepositoryRecord, DataOutputStream)}. It is preferable to use that method directly.
*/
@Deprecated
@Override
public void serializeEdit(final SerializedRepositoryRecord previousRecordState,
final SerializedRepositoryRecord newRecordState,
final DataOutputStream out) throws IOException {
serializeRecord(newRecordState, out);
}
/**
* Serializes the provided {@link RepositoryRecord} to the provided stream in an encrypted format.
*
* @param record the record to encrypt and serialize
* @param out the output stream to write to
* @throws IOException if there is a problem writing to the stream
*/
@Override
public void serializeRecord(final SerializedRepositoryRecord record, final DataOutputStream out) throws IOException {
// Create BAOS wrapped in DOS to intercept the output
final ByteArrayOutputStream byteArrayOutputStream = new ByteArrayOutputStream();
final DataOutputStream tempDataStream = new DataOutputStream(byteArrayOutputStream);
// Use the delegate to serialize the actual record and extract the output
final String recordId = getRecordIdentifier(record).toString();
wrappedSerDe.serializeRecord(record, tempDataStream);
tempDataStream.flush();
final byte[] plainSerializedBytes = byteArrayOutputStream.toByteArray();
// Encrypt the serialized byte[] to the real stream
encryptToStream(plainSerializedBytes, recordId, out);
}
/**
* Encrypts the plain serialized bytes and writes them to the output stream. Precedes
* the cipher bytes with the plaintext to allow for on-demand deserialization and decryption.
*
* @param plainSerializedBytes the plain serialized bytes
* @param recordId the unique identifier for this record to be stored in the encryption metadata
* @param out the output stream
* @throws IOException if there is a problem writing to the stream
*/
private void encryptToStream(final byte[] plainSerializedBytes, final String recordId, final DataOutputStream out) throws IOException {
final byte[] cipherBytes = encryptor.encrypt(plainSerializedBytes, recordId, keyId);
out.writeInt(cipherBytes.length);
out.write(cipherBytes);
}
/**
* <p>
* Reads an Edit Record from the given {@link DataInputStream} and merges
* that edit with the current version of the record, returning the new,
* merged version. If the Edit Record indicates that the entity was deleted,
* must return a Record with an UpdateType of {@link UpdateType#DELETE}.
* This method must never return <code>null</code>.
* </p>
*
* @param in to deserialize from
* @param currentRecordStates an unmodifiable map of Record ID's to the
* current state of that record
* @param version the version of the SerDe that was used to serialize the
* edit record
* @return deserialized record
* @throws IOException if failure reading
* @deprecated it is not beneficial to serialize the deltas, so this method throws a {@link EOFException}. It is preferable to use {@link #deserializeRecord(DataInputStream, int)}.
*/
@Deprecated
@Override
public SerializedRepositoryRecord deserializeEdit(final DataInputStream in,
final Map<Object, SerializedRepositoryRecord> currentRecordStates,
final int version) throws IOException {
return deserializeRecord(in, version);
// deserializeRecord may return a null if there is no more data. However, when we are deserializing
// an edit, we do so only when we know that we should have data. This is why the JavaDocs for this method
// on the interface indicate that this method should never return null. As a result, if there is no data
// available, we handle this by throwing an EOFException.
// throw new EOFException();
}
/**
* Returns the deserialized and decrypted {@link RepositoryRecord} from the input stream.
*
* @param in stream to read from
* @param version the version of the SerDe that was used to serialize the
* record
* @return the deserialized record
* @throws IOException if there is a problem reading from the stream
*/
@Override
public SerializedRepositoryRecord deserializeRecord(final DataInputStream in, final int version) throws IOException {
// Read the expected length of the encrypted record (including the encryption metadata)
int encryptedRecordLength = in.readInt();
if (encryptedRecordLength == -1) {
return null;
}
// Read the encrypted record bytes
byte[] cipherBytes = new byte[encryptedRecordLength];
StreamUtils.fillBuffer(in, cipherBytes);
// Decrypt the byte[]
final DataInputStream wrappedInputStream = decryptToStream(cipherBytes);
// Deserialize the plain bytes using the delegate serde
return wrappedSerDe.deserializeRecord(wrappedInputStream, version);
}
/**
* Returns a {@link DataInputStream} containing the plaintext bytes so the record can be properly deserialized by the wrapped serde.
*
* @param cipherBytes the serialized, encrypted bytes
* @return a stream wrapping the plain bytes
*/
private DataInputStream decryptToStream(final byte[] cipherBytes) {
final byte[] plainSerializedBytes = encryptor.decrypt(cipherBytes, "[pending record ID]");
final ByteArrayInputStream byteArrayInputStream = new ByteArrayInputStream(plainSerializedBytes);
return new DataInputStream(byteArrayInputStream);
}
/**
* Returns the unique ID for the given record.
*
* @param record to obtain identifier for
* @return identifier of record
*/
@Override
public Object getRecordIdentifier(final SerializedRepositoryRecord record) {
return wrappedSerDe.getRecordIdentifier(record);
}
/**
* Returns the UpdateType for the given record.
*
* @param record to retrieve update type for
* @return update type
*/
@Override
public UpdateType getUpdateType(final SerializedRepositoryRecord record) {
return wrappedSerDe.getUpdateType(record);
}
/**
* Returns the external location of the given record; this is used when a
* record is moved away from WALI or is being re-introduced to WALI. For
* example, WALI can be updated with a record of type
* {@link UpdateType#SWAP_OUT} that indicates a Location of
* file://tmp/external1 and can then be re-introduced to WALI by updating
* WALI with a record of type {@link UpdateType#CREATE} that indicates a
* Location of file://tmp/external1
*
* @param record to get location of
* @return location
*/
@Override
public String getLocation(final SerializedRepositoryRecord record) {
return wrappedSerDe.getLocation(record);
}
/**
* Returns the version that this SerDe will use when writing. This used used
* when serializing/deserializing the edit logs so that if the version
* changes, we are still able to deserialize old versions
*
* @return version
*/
@Override
public int getVersion() {
return wrappedSerDe.getVersion();
}
}