blob: d6a54185a2eb7fdd8dd7819aef04888bc0fc3ebb [file] [log] [blame]
/*
* Licensed to the Apache Software Foundation (ASF) under one or more
* contributor license agreements. See the NOTICE file distributed with
* this work for additional information regarding copyright ownership.
* The ASF licenses this file to You under the Apache License, Version 2.0
* (the "License"); you may not use this file except in compliance with
* the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/
package org.apache.nifi.controller.repository
import org.apache.commons.lang3.SystemUtils
import org.apache.nifi.controller.queue.FlowFileQueue
import org.apache.nifi.controller.repository.claim.ResourceClaimManager
import org.apache.nifi.controller.repository.claim.StandardResourceClaimManager
import org.apache.nifi.security.kms.CryptoUtils
import org.apache.nifi.security.repository.config.FlowFileRepositoryEncryptionConfiguration
import org.bouncycastle.jce.provider.BouncyCastleProvider
import org.bouncycastle.util.encoders.Hex
import org.junit.After
import org.junit.Assume
import org.junit.Before
import org.junit.BeforeClass
import org.junit.Rule
import org.junit.Test
import org.junit.rules.TestName
import org.junit.runner.RunWith
import org.junit.runners.JUnit4
import org.slf4j.Logger
import org.slf4j.LoggerFactory
import org.wali.SerDe
import java.security.Security
import static org.apache.nifi.security.kms.CryptoUtils.STATIC_KEY_PROVIDER_CLASS_NAME
@RunWith(JUnit4.class)
class EncryptedSchemaRepositoryRecordSerdeTest extends GroovyTestCase {
private static final Logger logger = LoggerFactory.getLogger(EncryptedSchemaRepositoryRecordSerdeTest.class)
public static final String TEST_QUEUE_IDENTIFIER = "testQueueIdentifier"
private ResourceClaimManager claimManager
private Map<String, FlowFileQueue> queueMap
private FlowFileQueue flowFileQueue
private ByteArrayOutputStream byteArrayOutputStream
private DataOutputStream dataOutputStream
// TODO: Mock the wrapped serde
// TODO: Make integration test with real wrapped serde
private SerDe<RepositoryRecord> wrappedSerDe
private static final String KPI = STATIC_KEY_PROVIDER_CLASS_NAME
private static final String KPL = ""
private static final String KEY_ID = "K1"
private static final Map<String, String> KEYS = [K1: "0123456789ABCDEFFEDCBA98765432100123456789ABCDEFFEDCBA9876543210"]
// TODO: Change to WAL impl name
private static final String REPO_IMPL = CryptoUtils.EWAFFR_CLASS_NAME
private FlowFileRepositoryEncryptionConfiguration flowFileREC
private EncryptedSchemaRepositoryRecordSerde esrrs
@Rule
public TestName testName = new TestName()
@BeforeClass
static void setUpOnce() throws Exception {
Assume.assumeTrue("Test only runs on *nix", !SystemUtils.IS_OS_WINDOWS)
Security.addProvider(new BouncyCastleProvider())
logger.metaClass.methodMissing = { String name, args ->
logger.info("[${name?.toUpperCase()}] ${(args as List).join(" ")}")
}
}
@Before
void setUp() throws Exception {
claimManager = new StandardResourceClaimManager()
queueMap = [:]
flowFileQueue = createAndRegisterMockQueue(TEST_QUEUE_IDENTIFIER)
byteArrayOutputStream = new ByteArrayOutputStream()
dataOutputStream = new DataOutputStream(byteArrayOutputStream)
wrappedSerDe = new SchemaRepositoryRecordSerde(claimManager)
wrappedSerDe.setQueueMap(queueMap)
flowFileREC = new FlowFileRepositoryEncryptionConfiguration(KPI, KPL, KEY_ID, KEYS, REPO_IMPL)
esrrs = new EncryptedSchemaRepositoryRecordSerde(wrappedSerDe, flowFileREC)
}
@After
void tearDown() throws Exception {
claimManager.purge()
queueMap.clear()
}
private FlowFileQueue createMockQueue(String identifier = testName.methodName + new Date().toString()) {
[getIdentifier: { ->
logger.mock("Retrieving flowfile queue identifier: ${identifier}")
identifier
}] as FlowFileQueue
}
private FlowFileQueue createAndRegisterMockQueue(String identifier = testName.methodName + new Date().toString()) {
FlowFileQueue queue = createMockQueue(identifier)
queueMap.put(identifier, queue)
queue
}
private RepositoryRecord buildCreateRecord(FlowFileQueue queue, Map<String, String> attributes = [:]) {
StandardRepositoryRecord record = new StandardRepositoryRecord(queue)
StandardFlowFileRecord.Builder ffrb = new StandardFlowFileRecord.Builder().id(System.nanoTime())
ffrb.addAttributes([uuid: getMockUUID()] + attributes as Map<String, String>)
record.setWorking(ffrb.build())
record
}
private String getMockUUID() {
"${testName.methodName ?: "no_test"}@${new Date().format("mmssSSS")}" as String
}
/** This test ensures that the creation of a flowfile record is applied to the specified output stream correctly */
@Test
void testShouldSerializeAndDeserializeRecord() {
// Arrange
RepositoryRecord newRecord = buildCreateRecord(flowFileQueue, [id: "1", firstName: "Andy", lastName: "LoPresto"])
DataOutputStream dos = dataOutputStream
esrrs.writeHeader(dataOutputStream)
// Act
esrrs.serializeRecord(newRecord, dos)
logger.info("Output stream: ${Hex.toHexString(byteArrayOutputStream.toByteArray())} ")
DataInputStream dis = new DataInputStream(new ByteArrayInputStream(byteArrayOutputStream.toByteArray()))
esrrs.readHeader(dis)
RepositoryRecord deserializedRecord = esrrs.deserializeRecord(dis, 2)
/* The records will not be identical, because the process of serializing/deserializing changes the application
* of the delta data. The CREATE with a "current" record containing attributes becomes an UPDATE with an
* "original" record containing attributes */
// Assert
logger.info(" Original record: ${newRecord.dump()}")
logger.info("Deserialized record: ${deserializedRecord.dump()}")
assert newRecord.type == RepositoryRecordType.CREATE
assert deserializedRecord.type == RepositoryRecordType.UPDATE
assert deserializedRecord.originalAttributes == newRecord.current.attributes
}
/** This test ensures that the creation of a flowfile record is applied to the specified output stream correctly */
@Test
void testShouldSerializeAndDeserializeEdit() {
// Arrange
RepositoryRecord newRecord = buildCreateRecord(flowFileQueue, [id: "1", firstName: "Andy", lastName: "LoPresto"])
DataOutputStream dos = dataOutputStream
esrrs.writeHeader(dataOutputStream)
// Act
esrrs.serializeEdit(null, newRecord, dos)
logger.info("Output stream: ${Hex.toHexString(byteArrayOutputStream.toByteArray())} ")
DataInputStream dis = new DataInputStream(new ByteArrayInputStream(byteArrayOutputStream.toByteArray()))
esrrs.readHeader(dis)
RepositoryRecord deserializedRecord = esrrs.deserializeEdit(dis, [:], 2)
/* The records will not be identical, because the process of serializing/deserializing changes the application
* of the delta data. The CREATE with a "current" record containing attributes becomes an UPDATE with an
* "original" record containing attributes */
// Assert
logger.info(" Original record: ${newRecord.dump()}")
logger.info("Deserialized record: ${deserializedRecord.dump()}")
assert newRecord.type == RepositoryRecordType.CREATE
assert deserializedRecord.type == RepositoryRecordType.UPDATE
assert deserializedRecord.originalAttributes == newRecord.current.attributes
}
/** This test ensures that the creation of a flowfile record is applied to the specified output stream correctly with encryption */
@Test
void testShouldEncryptOutput() {
// Arrange
RepositoryRecord newRecord = buildCreateRecord(flowFileQueue, [id: "1", firstName: "Andy", lastName: "LoPresto"])
DataOutputStream dos = dataOutputStream
esrrs.writeHeader(dataOutputStream)
// Act
esrrs.serializeRecord(newRecord, dos)
byte[] serializedBytes = byteArrayOutputStream.toByteArray()
def hexOutput = Hex.toHexString(serializedBytes)
logger.info("Output stream (${serializedBytes.length} bytes): ${hexOutput} ")
DataInputStream dis = new DataInputStream(new ByteArrayInputStream(serializedBytes))
esrrs.readHeader(dis)
RepositoryRecord deserializedRecord = esrrs.deserializeRecord(dis, 2)
/* The records will not be identical, because the process of serializing/deserializing changes the application
* of the delta data. The CREATE with a "current" record containing attributes becomes an UPDATE with an
* "original" record containing attributes */
// Assert
logger.info(" Original record: ${newRecord.dump()}")
logger.info("Deserialized record: ${deserializedRecord.dump()}")
assert newRecord.type == RepositoryRecordType.CREATE
assert deserializedRecord.type == RepositoryRecordType.UPDATE
assert deserializedRecord.originalAttributes == newRecord.current.attributes
// Ensure the value is not present in plaintext
assert !hexOutput.contains(Hex.toHexString("Andy".bytes))
// Ensure the value is not present in "simple" encryption (reversing bytes)
assert !hexOutput.contains(Hex.toHexString("ydnA".bytes))
// Ensure the encryption metadata is present in the output
assert hexOutput.contains(Hex.toHexString("org.apache.nifi.security.repository.block.BlockEncryptionMetadata".bytes))
}
/** This test ensures that multiple records can be serialized and deserialized */
@Test
void testShouldSerializeAndDeserializeMultipleRecords() {
// Arrange
RepositoryRecord record1 = buildCreateRecord(flowFileQueue, [id: "1", firstName: "Andy", lastName: "LoPresto"])
RepositoryRecord record2 = buildCreateRecord(flowFileQueue, [id: "2", firstName: "Mark", lastName: "Payne"])
DataOutputStream dos = dataOutputStream
// Act
esrrs.writeHeader(dos)
esrrs.serializeRecord(record1, dos)
esrrs.serializeRecord(record2, dos)
dos.flush()
logger.info("Output stream: ${Hex.toHexString(byteArrayOutputStream.toByteArray())} ")
DataInputStream dis = new DataInputStream(new ByteArrayInputStream(byteArrayOutputStream.toByteArray()))
esrrs.readHeader(dis)
RepositoryRecord deserializedRecord1 = esrrs.deserializeRecord(dis, esrrs.getVersion())
RepositoryRecord deserializedRecord2 = esrrs.deserializeRecord(dis, esrrs.getVersion())
/* The records will not be identical, because the process of serializing/deserializing changes the application
* of the delta data. The CREATE with a "current" record containing attributes becomes an UPDATE with an
* "original" record containing attributes */
logger.info("Original record 1: ${record1.dump()}")
logger.info("Original record 2: ${record2.dump()}")
logger.info("Deserialized record 1: ${deserializedRecord1.dump()}")
logger.info("Deserialized record 2: ${deserializedRecord2.dump()}")
// Assert
assert record1.type == RepositoryRecordType.CREATE
assert record2.type == RepositoryRecordType.CREATE
assert deserializedRecord1.type == RepositoryRecordType.UPDATE
assert deserializedRecord1.originalAttributes == record1.current.attributes
assert deserializedRecord2.type == RepositoryRecordType.UPDATE
assert deserializedRecord2.originalAttributes == record2.current.attributes
}
/** This test ensures that multiple records can be serialized and deserialized using different keys */
@Test
void testShouldSerializeAndDeserializeMultipleRecordsWithMultipleKeys() {
// Arrange
RepositoryRecord record1 = buildCreateRecord(flowFileQueue, [id: "1", firstName: "Andy", lastName: "LoPresto"])
RepositoryRecord record2 = buildCreateRecord(flowFileQueue, [id: "2", firstName: "Mark", lastName: "Payne"])
DataOutputStream dos = dataOutputStream
// Configure the serde with multiple keys available
def multipleKeys = KEYS + [K2: "0F" * 32]
FlowFileRepositoryEncryptionConfiguration multipleKeyFFREC = new FlowFileRepositoryEncryptionConfiguration(KPI, KPL, KEY_ID, multipleKeys, REPO_IMPL, null)
esrrs = new EncryptedSchemaRepositoryRecordSerde(wrappedSerDe, multipleKeyFFREC)
assert esrrs.getActiveKeyId() == "K1"
// Act
esrrs.writeHeader(dos)
esrrs.serializeRecord(record1, dos)
// Change the active key
esrrs.setActiveKeyId("K2")
esrrs.serializeRecord(record2, dos)
dos.flush()
logger.info("Output stream: ${Hex.toHexString(byteArrayOutputStream.toByteArray())} ")
DataInputStream dis = new DataInputStream(new ByteArrayInputStream(byteArrayOutputStream.toByteArray()))
esrrs.readHeader(dis)
RepositoryRecord deserializedRecord1 = esrrs.deserializeRecord(dis, esrrs.getVersion())
RepositoryRecord deserializedRecord2 = esrrs.deserializeRecord(dis, esrrs.getVersion())
/* The records will not be identical, because the process of serializing/deserializing changes the application
* of the delta data. The CREATE with a "current" record containing attributes becomes an UPDATE with an
* "original" record containing attributes */
logger.info("Original record 1: ${record1.dump()}")
logger.info("Original record 2: ${record2.dump()}")
logger.info("Deserialized record 1: ${deserializedRecord1.dump()}")
logger.info("Deserialized record 2: ${deserializedRecord2.dump()}")
// Assert
assert record1.type == RepositoryRecordType.CREATE
assert record2.type == RepositoryRecordType.CREATE
assert deserializedRecord1.type == RepositoryRecordType.UPDATE
assert deserializedRecord1.originalAttributes == record1.current.attributes
assert deserializedRecord2.type == RepositoryRecordType.UPDATE
assert deserializedRecord2.originalAttributes == record2.current.attributes
}
}