| /* |
| * Licensed to the Apache Software Foundation (ASF) under one or more |
| * contributor license agreements. See the NOTICE file distributed with |
| * this work for additional information regarding copyright ownership. |
| * The ASF licenses this file to You under the Apache License, Version 2.0 |
| * (the "License"); you may not use this file except in compliance with |
| * the License. You may obtain a copy of the License at |
| * |
| * http://www.apache.org/licenses/LICENSE-2.0 |
| * |
| * Unless required by applicable law or agreed to in writing, software |
| * distributed under the License is distributed on an "AS IS" BASIS, |
| * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. |
| * See the License for the specific language governing permissions and |
| * limitations under the License. |
| */ |
| package org.apache.nifi.provenance |
| |
| import org.apache.nifi.flowfile.FlowFile |
| import org.apache.nifi.provenance.serialization.RecordReader |
| import org.apache.nifi.provenance.serialization.RecordWriter |
| import org.apache.nifi.provenance.toc.StandardTocReader |
| import org.apache.nifi.provenance.toc.StandardTocWriter |
| import org.apache.nifi.provenance.toc.TocReader |
| import org.apache.nifi.provenance.toc.TocUtil |
| import org.apache.nifi.provenance.toc.TocWriter |
| import org.apache.nifi.repository.encryption.AesGcmByteArrayRepositoryEncryptor |
| import org.apache.nifi.repository.encryption.RepositoryEncryptor |
| import org.apache.nifi.repository.encryption.configuration.EncryptionMetadataHeader |
| import org.apache.nifi.security.kms.KeyProvider |
| import org.bouncycastle.jce.provider.BouncyCastleProvider |
| import org.bouncycastle.util.encoders.Hex |
| import org.junit.jupiter.api.BeforeAll |
| import org.junit.jupiter.api.BeforeEach |
| import org.junit.jupiter.api.Test |
| import org.slf4j.Logger |
| import org.slf4j.LoggerFactory |
| |
| import javax.crypto.spec.SecretKeySpec |
| import java.security.KeyManagementException |
| import java.security.Security |
| import java.util.concurrent.atomic.AtomicLong |
| |
| import static groovy.test.GroovyAssert.shouldFail |
| import static org.apache.nifi.provenance.TestUtil.createFlowFile |
| |
| class EncryptedSchemaRecordReaderWriterTest extends AbstractTestRecordReaderWriter { |
| private static final Logger logger = LoggerFactory.getLogger(EncryptedSchemaRecordReaderWriterTest.class) |
| |
| private static final String KEY_HEX_128 = "0123456789ABCDEFFEDCBA9876543210" |
| private static final String KEY_HEX = KEY_HEX_128 |
| private static final String KEY_ID = "K1" |
| |
| private static final String TRANSIT_URI = "nifi://unit-test" |
| private static final String PROCESSOR_TYPE = "Mock Processor" |
| private static final String COMPONENT_ID = "1234" |
| |
| private static final int UNCOMPRESSED_BLOCK_SIZE = 1024 * 32 |
| private static final int MAX_ATTRIBUTE_SIZE = 2048 |
| |
| private static final AtomicLong idGenerator = new AtomicLong(0L) |
| private File journalFile |
| private File tocFile |
| |
| private static KeyProvider mockKeyProvider |
| private static RepositoryEncryptor<byte[], byte[]> repositoryEncryptor |
| |
| @BeforeAll |
| static void setUpOnce() throws Exception { |
| Security.addProvider(new BouncyCastleProvider()) |
| |
| mockKeyProvider = [ |
| getKey : { String keyId -> |
| if (keyId == KEY_ID) { |
| new SecretKeySpec(Hex.decode(KEY_HEX), "AES") |
| } else { |
| throw new KeyManagementException("${keyId} is not available") |
| } |
| }, |
| getAvailableKeyIds: { -> |
| [KEY_ID] |
| }, |
| keyExists : { String keyId -> |
| keyId == KEY_ID |
| }] as KeyProvider |
| repositoryEncryptor = new AesGcmByteArrayRepositoryEncryptor(mockKeyProvider, EncryptionMetadataHeader.PROVENANCE) |
| } |
| |
| @BeforeEach |
| void setUp() throws Exception { |
| journalFile = File.createTempFile(getClass().simpleName, ".journal") |
| journalFile.deleteOnExit() |
| tocFile = TocUtil.getTocFile(journalFile) |
| idGenerator.set(0L) |
| } |
| |
| private static |
| final FlowFile buildFlowFile(Map attributes = [:], long id = idGenerator.getAndIncrement(), long fileSize = 3000L) { |
| if (!attributes?.uuid) { |
| attributes.uuid = UUID.randomUUID().toString() |
| } |
| createFlowFile(id, fileSize, attributes) |
| } |
| |
| private |
| static ProvenanceEventRecord buildEventRecord(FlowFile flowfile = buildFlowFile(), ProvenanceEventType eventType = ProvenanceEventType.RECEIVE, String transitUri = TRANSIT_URI, String componentId = COMPONENT_ID, String componentType = PROCESSOR_TYPE, long eventTime = System.currentTimeMillis()) { |
| final ProvenanceEventBuilder builder = new StandardProvenanceEventRecord.Builder() |
| builder.setEventTime(eventTime) |
| builder.setEventType(eventType) |
| builder.setTransitUri(transitUri) |
| builder.fromFlowFile(flowfile) |
| builder.setComponentId(componentId) |
| builder.setComponentType(componentType) |
| builder.build() |
| } |
| |
| @Override |
| protected RecordWriter createWriter( |
| final File file, |
| final TocWriter tocWriter, final boolean compressed, final int uncompressedBlockSize) throws IOException { |
| createWriter(file, tocWriter, compressed, uncompressedBlockSize, repositoryEncryptor) |
| } |
| |
| protected static RecordWriter createWriter( |
| final File file, |
| final TocWriter tocWriter, |
| final boolean compressed, |
| final int uncompressedBlockSize, RepositoryEncryptor<byte[], byte[]> encryptor) throws IOException { |
| return new EncryptedSchemaRecordWriter(file, idGenerator, tocWriter, compressed, uncompressedBlockSize, IdentifierLookup.EMPTY, encryptor, KEY_ID) |
| } |
| |
| @Override |
| protected RecordReader createReader( |
| final InputStream inputStream, |
| final String journalFilename, final TocReader tocReader, final int maxAttributeSize) throws IOException { |
| return new EncryptedSchemaRecordReader(inputStream, journalFilename, tocReader, maxAttributeSize, repositoryEncryptor) |
| } |
| |
| /** |
| * Build a record and write it to the repository with the encrypted writer. Recover with the encrypted reader and verify. |
| */ |
| @Test |
| void testShouldWriteAndReadEncryptedRecord() { |
| // Arrange |
| final ProvenanceEventRecord record = buildEventRecord() |
| logger.info("Built sample PER: ${record}") |
| |
| TocWriter tocWriter = new StandardTocWriter(tocFile, false, false) |
| |
| RecordWriter encryptedWriter = createWriter(journalFile, tocWriter, false, UNCOMPRESSED_BLOCK_SIZE) |
| logger.info("Generated encrypted writer: ${encryptedWriter}") |
| |
| // Act |
| long encryptedRecordId = idGenerator.get() |
| encryptedWriter.writeHeader(encryptedRecordId) |
| encryptedWriter.writeRecords(Collections.singletonList(record)) |
| encryptedWriter.close() |
| logger.info("Wrote encrypted record ${encryptedRecordId} to journal") |
| |
| // Assert |
| TocReader tocReader = new StandardTocReader(tocFile) |
| final FileInputStream fis = new FileInputStream(journalFile) |
| final RecordReader reader = createReader(fis, journalFile.getName(), tocReader, MAX_ATTRIBUTE_SIZE) |
| logger.info("Generated encrypted reader: ${reader}") |
| |
| ProvenanceEventRecord encryptedEvent = reader.nextRecord() |
| assert encryptedEvent |
| assert encryptedRecordId as long == encryptedEvent.getEventId() |
| assert record.componentId == encryptedEvent.getComponentId() |
| assert record.componentType == encryptedEvent.getComponentType() |
| logger.info("Successfully read encrypted record: ${encryptedEvent}") |
| |
| assert !reader.nextRecord() |
| } |
| |
| /** |
| * Build a record and write it with a standard writer and the encrypted writer to different repositories. Recover with the standard reader and the contents of the encrypted record should be unreadable. |
| */ |
| @Test |
| void testShouldWriteEncryptedRecordAndPlainRecord() { |
| // Arrange |
| final ProvenanceEventRecord record = buildEventRecord() |
| logger.info("Built sample PER: ${record}") |
| |
| TocWriter tocWriter = new StandardTocWriter(tocFile, false, false) |
| |
| RecordWriter standardWriter = new EventIdFirstSchemaRecordWriter(journalFile, idGenerator, tocWriter, false, UNCOMPRESSED_BLOCK_SIZE, IdentifierLookup.EMPTY) |
| logger.info("Generated standard writer: ${standardWriter}") |
| |
| File encryptedJournalFile = new File(journalFile.absolutePath + "_encrypted") |
| File encryptedTocFile = TocUtil.getTocFile(encryptedJournalFile) |
| TocWriter encryptedTocWriter = new StandardTocWriter(encryptedTocFile, false, false) |
| RecordWriter encryptedWriter = createWriter(encryptedJournalFile, encryptedTocWriter, false, UNCOMPRESSED_BLOCK_SIZE) |
| logger.info("Generated encrypted writer: ${encryptedWriter}") |
| |
| // Act |
| long standardRecordId = idGenerator.get() |
| standardWriter.writeHeader(standardRecordId) |
| standardWriter.writeRecords(Collections.singletonList(record)) |
| standardWriter.close() |
| logger.info("Wrote standard record ${standardRecordId} to journal") |
| |
| long encryptedRecordId = idGenerator.get() |
| encryptedWriter.writeHeader(encryptedRecordId) |
| encryptedWriter.writeRecords(Collections.singletonList(record)) |
| encryptedWriter.close() |
| logger.info("Wrote encrypted record ${encryptedRecordId} to journal") |
| |
| // Assert |
| TocReader tocReader = new StandardTocReader(tocFile) |
| final FileInputStream fis = new FileInputStream(journalFile) |
| final RecordReader reader = new EventIdFirstSchemaRecordReader(fis, journalFile.getName(), tocReader, MAX_ATTRIBUTE_SIZE) |
| logger.info("Generated standard reader: ${reader}") |
| |
| ProvenanceEventRecord standardEvent = reader.nextRecord() |
| assert standardEvent |
| assert standardRecordId as long == standardEvent.getEventId() |
| assert record.componentId == standardEvent.getComponentId() |
| assert record.componentType == standardEvent.getComponentType() |
| logger.info("Successfully read standard record: ${standardEvent}") |
| |
| assert !reader.nextRecord() |
| |
| // Demonstrate unable to read from encrypted file with standard reader |
| TocReader incompatibleTocReader = new StandardTocReader(encryptedTocFile) |
| final FileInputStream efis = new FileInputStream(encryptedJournalFile) |
| RecordReader incompatibleReader = new EventIdFirstSchemaRecordReader(efis, encryptedJournalFile.getName(), incompatibleTocReader, MAX_ATTRIBUTE_SIZE) |
| logger.info("Generated standard reader (attempting to read encrypted file): ${incompatibleReader}") |
| |
| shouldFail(EOFException) { incompatibleReader.nextRecord() } |
| } |
| } |