blob: d9f8a3fef59a4ecfbf096a7f275ea4ec938a7d86 [file] [log] [blame]
/*
* Licensed to the Apache Software Foundation (ASF) under one or more
* contributor license agreements. See the NOTICE file distributed with
* this work for additional information regarding copyright ownership.
* The ASF licenses this file to You under the Apache License, Version 2.0
* (the "License") you may not use this file except in compliance with
* the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/
package org.apache.nifi.processors.standard
import org.apache.nifi.security.util.crypto.HashAlgorithm
import org.apache.nifi.security.util.crypto.HashService
import org.apache.nifi.util.MockFlowFile
import org.apache.nifi.util.TestRunner
import org.apache.nifi.util.TestRunners
import org.bouncycastle.jce.provider.BouncyCastleProvider
import org.junit.After
import org.junit.Before
import org.junit.BeforeClass
import org.junit.Test
import org.junit.runner.RunWith
import org.junit.runners.JUnit4
import org.slf4j.Logger
import org.slf4j.LoggerFactory
import java.nio.charset.Charset
import java.nio.charset.StandardCharsets
import java.security.Security
import java.time.ZonedDateTime
import java.time.format.DateTimeFormatter
@RunWith(JUnit4.class)
class CryptographicHashAttributeTest extends GroovyTestCase {
private static final Logger logger = LoggerFactory.getLogger(CryptographicHashAttributeTest.class)
@BeforeClass
static void setUpOnce() throws Exception {
Security.addProvider(new BouncyCastleProvider())
logger.metaClass.methodMissing = { String name, args ->
logger.info("[${name?.toUpperCase()}] ${(args as List).join(" ")}")
}
}
@Before
void setUp() throws Exception {
}
@After
void tearDown() throws Exception {
}
@Test
void testShouldCalculateHashOfPresentAttribute() {
// Arrange
def algorithms = HashAlgorithm.values()
final TestRunner runner = TestRunners.newTestRunner(new CryptographicHashAttribute())
// Create attributes for username and date
def attributes = [
username: "alopresto",
// FIXME groovy-datetime dependency not providing the "format" method for new Date().format("YYYY-MM-dd HH:mm:ss.SSS Z")
// adding the following workaround temporarily
date : ZonedDateTime.now().format(DateTimeFormatter.ofPattern('YYYY-MM-dd HH:mm:ss.SSS Z'))
]
def attributeKeys = attributes.keySet()
algorithms.each { HashAlgorithm algorithm ->
final EXPECTED_USERNAME_HASH = HashService.hashValue(algorithm, attributes["username"])
logger.expected("${algorithm.name.padLeft(11)}(${attributes["username"]}) = ${EXPECTED_USERNAME_HASH}")
final EXPECTED_DATE_HASH = HashService.hashValue(algorithm, attributes["date"])
logger.expected("${algorithm.name.padLeft(11)}(${attributes["date"]}) = ${EXPECTED_DATE_HASH}")
// Reset the processor
runner.clearProperties()
runner.clearProvenanceEvents()
runner.clearTransferState()
// Set the algorithm
logger.info("Setting hash algorithm to ${algorithm.name}")
runner.setProperty(CryptographicHashAttribute.HASH_ALGORITHM, algorithm.name)
// Add the desired dynamic properties
attributeKeys.each { String attr ->
runner.setProperty(attr, "${attr}_${algorithm.name}")
}
// Insert the attributes in the mock flowfile
runner.enqueue(new byte[0], attributes)
// Act
runner.run(1)
// Assert
runner.assertTransferCount(CryptographicHashAttribute.REL_FAILURE, 0)
runner.assertTransferCount(CryptographicHashAttribute.REL_SUCCESS, 1)
final List<MockFlowFile> successfulFlowfiles = runner.getFlowFilesForRelationship(CryptographicHashAttribute.REL_SUCCESS)
// Extract the generated attributes from the flowfile
MockFlowFile flowFile = successfulFlowfiles.first()
String hashedUsername = flowFile.getAttribute("username_${algorithm.name}")
logger.info("flowfile.username_${algorithm.name} = ${hashedUsername}")
String hashedDate = flowFile.getAttribute("date_${algorithm.name}")
logger.info("flowfile.date_${algorithm.name} = ${hashedDate}")
assert hashedUsername == EXPECTED_USERNAME_HASH
assert hashedDate == EXPECTED_DATE_HASH
}
}
@Test
void testShouldCalculateHashOfMissingAttribute() {
// Arrange
def algorithms = HashAlgorithm.values()
final TestRunner runner = TestRunners.newTestRunner(new CryptographicHashAttribute())
// Create attributes for username (empty string) and date (null)
def attributes = [
username: "",
date : null
]
def attributeKeys = attributes.keySet()
algorithms.each { HashAlgorithm algorithm ->
final EXPECTED_USERNAME_HASH = HashService.hashValue(algorithm, attributes["username"])
logger.expected("${algorithm.name.padLeft(11)}(${attributes["username"]}) = ${EXPECTED_USERNAME_HASH}")
final EXPECTED_DATE_HASH = null
logger.expected("${algorithm.name.padLeft(11)}(${attributes["date"]}) = ${EXPECTED_DATE_HASH}")
// Reset the processor
runner.clearProperties()
runner.clearProvenanceEvents()
runner.clearTransferState()
// Set the algorithm
logger.info("Setting hash algorithm to ${algorithm.name}")
runner.setProperty(CryptographicHashAttribute.HASH_ALGORITHM, algorithm.name)
// Add the desired dynamic properties
attributeKeys.each { String attr ->
runner.setProperty(attr, "${attr}_${algorithm.name}")
}
// Insert the attributes in the mock flowfile
runner.enqueue(new byte[0], attributes)
// Act
runner.run(1)
// Assert
runner.assertTransferCount(CryptographicHashAttribute.REL_FAILURE, 0)
runner.assertTransferCount(CryptographicHashAttribute.REL_SUCCESS, 1)
final List<MockFlowFile> successfulFlowfiles = runner.getFlowFilesForRelationship(CryptographicHashAttribute.REL_SUCCESS)
// Extract the generated attributes from the flowfile
MockFlowFile flowFile = successfulFlowfiles.first()
String hashedUsername = flowFile.getAttribute("username_${algorithm.name}")
logger.info("flowfile.username_${algorithm.name} = ${hashedUsername}")
String hashedDate = flowFile.getAttribute("date_${algorithm.name}")
logger.info("flowfile.date_${algorithm.name} = ${hashedDate}")
assert hashedUsername == EXPECTED_USERNAME_HASH
assert hashedDate == EXPECTED_DATE_HASH
}
}
@Test
void testShouldRouteToFailureOnProhibitedMissingAttribute() {
// Arrange
def algorithms = HashAlgorithm.values()
final TestRunner runner = TestRunners.newTestRunner(new CryptographicHashAttribute())
// Create attributes for username (empty string) and date (null)
def attributes = [
username: "",
date : null
]
def attributeKeys = attributes.keySet()
algorithms.each { HashAlgorithm algorithm ->
final EXPECTED_USERNAME_HASH = HashService.hashValue(algorithm, attributes["username"])
logger.expected("${algorithm.name.padLeft(11)}(${attributes["username"]}) = ${EXPECTED_USERNAME_HASH}")
final EXPECTED_DATE_HASH = null
logger.expected("${algorithm.name.padLeft(11)}(${attributes["date"]}) = ${EXPECTED_DATE_HASH}")
// Reset the processor
runner.clearProperties()
runner.clearProvenanceEvents()
runner.clearTransferState()
// Set the algorithm
logger.info("Setting hash algorithm to ${algorithm.name}")
runner.setProperty(CryptographicHashAttribute.HASH_ALGORITHM, algorithm.name)
// Set to fail if there are missing attributes
runner.setProperty(CryptographicHashAttribute.PARTIAL_ATTR_ROUTE_POLICY, CryptographicHashAttribute.PartialAttributePolicy.PROHIBIT.name())
// Add the desired dynamic properties
attributeKeys.each { String attr ->
runner.setProperty(attr, "${attr}_${algorithm.name}")
}
// Insert the attributes in the mock flowfile
runner.enqueue(new byte[0], attributes)
// Act
runner.run(1)
// Assert
runner.assertTransferCount(CryptographicHashAttribute.REL_FAILURE, 1)
runner.assertTransferCount(CryptographicHashAttribute.REL_SUCCESS, 0)
final List<MockFlowFile> failedFlowFiles = runner.getFlowFilesForRelationship(CryptographicHashAttribute.REL_FAILURE)
// Extract the generated attributes from the flowfile
MockFlowFile flowFile = failedFlowFiles.first()
logger.info("Failed flowfile has attributes ${flowFile.attributes}")
attributeKeys.each { String missingAttribute ->
flowFile.assertAttributeNotExists("${missingAttribute}_${algorithm.name}")
}
}
}
@Test
void testShouldRouteToFailureOnEmptyAttributes() {
// Arrange
def algorithms = HashAlgorithm.values()
final TestRunner runner = TestRunners.newTestRunner(new CryptographicHashAttribute())
// Create attributes for username (empty string) and date (null)
def attributes = [
username: "",
date : null
]
def attributeKeys = attributes.keySet()
algorithms.each { HashAlgorithm algorithm ->
// Reset the processor
runner.clearProperties()
runner.clearProvenanceEvents()
runner.clearTransferState()
// Set the algorithm
logger.info("Setting hash algorithm to ${algorithm.name}")
runner.setProperty(CryptographicHashAttribute.HASH_ALGORITHM, algorithm.name)
// Set to fail if all attributes are missing
runner.setProperty(CryptographicHashAttribute.FAIL_WHEN_EMPTY, "true")
// Insert the attributes in the mock flowfile
runner.enqueue(new byte[0], attributes)
// Act
runner.run(1)
// Assert
runner.assertTransferCount(CryptographicHashAttribute.REL_FAILURE, 1)
runner.assertTransferCount(CryptographicHashAttribute.REL_SUCCESS, 0)
final List<MockFlowFile> failedFlowFiles = runner.getFlowFilesForRelationship(CryptographicHashAttribute.REL_FAILURE)
// Extract the generated attributes from the flowfile
MockFlowFile flowFile = failedFlowFiles.first()
logger.info("Failed flowfile has attributes ${flowFile.attributes}")
attributeKeys.each { String missingAttribute ->
flowFile.assertAttributeNotExists("${missingAttribute}_${algorithm.name}")
}
}
}
@Test
void testShouldRouteToSuccessOnAllowPartial() {
// Arrange
def algorithms = HashAlgorithm.values()
final TestRunner runner = TestRunners.newTestRunner(new CryptographicHashAttribute())
// Create attributes for username (empty string) and date (null)
def attributes = [
username: ""
]
def attributeKeys = attributes.keySet()
algorithms.each { HashAlgorithm algorithm ->
final EXPECTED_USERNAME_HASH = HashService.hashValue(algorithm, attributes["username"])
logger.expected("${algorithm.name.padLeft(11)}(${attributes["username"]}) = ${EXPECTED_USERNAME_HASH}")
final EXPECTED_DATE_HASH = null
logger.expected("${algorithm.name.padLeft(11)}(${attributes["date"]}) = ${EXPECTED_DATE_HASH}")
// Reset the processor
runner.clearProperties()
runner.clearProvenanceEvents()
runner.clearTransferState()
// Set the algorithm
logger.info("Setting hash algorithm to ${algorithm.name}")
runner.setProperty(CryptographicHashAttribute.HASH_ALGORITHM, algorithm.name)
// Set to fail if there are missing attributes
runner.setProperty(CryptographicHashAttribute.PARTIAL_ATTR_ROUTE_POLICY, CryptographicHashAttribute.PartialAttributePolicy.ALLOW.name())
// Add the desired dynamic properties
attributeKeys.each { String attr ->
runner.setProperty(attr, "${attr}_${algorithm.name}")
}
// Insert the attributes in the mock flowfile
runner.enqueue(new byte[0], attributes)
// Act
runner.run(1)
// Assert
runner.assertTransferCount(CryptographicHashAttribute.REL_FAILURE, 0)
runner.assertTransferCount(CryptographicHashAttribute.REL_SUCCESS, 1)
final List<MockFlowFile> successfulFlowFiles = runner.getFlowFilesForRelationship(CryptographicHashAttribute.REL_SUCCESS)
// Extract the generated attributes from the flowfile
MockFlowFile flowFile = successfulFlowFiles.first()
logger.info("Successful flowfile has attributes ${flowFile.attributes}")
attributeKeys.each { String attribute ->
flowFile.assertAttributeExists("${attribute}_${algorithm.name}")
}
}
}
@Test
void testShouldCalculateHashWithVariousCharacterEncodings() {
// Arrange
final TestRunner runner = TestRunners.newTestRunner(new CryptographicHashAttribute())
// Create attributes
def attributes = [test_attribute: "apachenifi"]
def attributeKeys = attributes.keySet()
HashAlgorithm algorithm = HashAlgorithm.MD5
List<Charset> charsets = [StandardCharsets.UTF_8, StandardCharsets.UTF_16, StandardCharsets.UTF_16LE, StandardCharsets.UTF_16BE]
final def EXPECTED_MD5_HASHES = [
"utf_8" : "a968b5ec1d52449963dcc517789baaaf",
"utf_16" : "b8413d18f7e64042bb0322a1cd61eba2",
"utf_16be": "b8413d18f7e64042bb0322a1cd61eba2",
"utf_16le": "91c3b67f9f8ae77156f21f271cc09121",
]
EXPECTED_MD5_HASHES.each { k, hash ->
logger.expected("MD5(${k.padLeft(9)}(${attributes["test_attribute"]})) = ${hash}")
}
charsets.each { Charset charset ->
// Calculate the expected hash value given the character set
final EXPECTED_HASH = HashService.hashValue(algorithm, attributes["test_attribute"], charset)
logger.expected("${algorithm.name}(${attributes["test_attribute"]}, ${charset.name()}) = ${EXPECTED_HASH}")
// Sanity check
assert EXPECTED_HASH == EXPECTED_MD5_HASHES[translateEncodingToMapKey(charset.name())]
// Reset the processor
runner.clearProperties()
runner.clearProvenanceEvents()
runner.clearTransferState()
// Set the properties
logger.info("Setting hash algorithm to ${algorithm.name}")
runner.setProperty(CryptographicHashAttribute.HASH_ALGORITHM, algorithm.name)
logger.info("Setting character set to ${charset.name()}")
runner.setProperty(CryptographicHashAttribute.CHARACTER_SET, charset.name())
// Add the desired dynamic properties
attributeKeys.each { String attr ->
runner.setProperty(attr, "${attr}_${algorithm.name}")
}
// Insert the attributes in the mock flowfile
runner.enqueue(new byte[0], attributes)
// Act
runner.run(1)
// Assert
runner.assertTransferCount(CryptographicHashAttribute.REL_FAILURE, 0)
runner.assertTransferCount(CryptographicHashAttribute.REL_SUCCESS, 1)
final List<MockFlowFile> successfulFlowfiles = runner.getFlowFilesForRelationship(CryptographicHashAttribute.REL_SUCCESS)
// Extract the generated attributes from the flowfile
MockFlowFile flowFile = successfulFlowfiles.first()
String hashedAttribute = flowFile.getAttribute("test_attribute_${algorithm.name}")
logger.info("flowfile.test_attribute_${algorithm.name} = ${hashedAttribute}")
assert hashedAttribute == EXPECTED_HASH
}
}
static String translateEncodingToMapKey(String charsetName) {
charsetName.toLowerCase().replaceAll(/[-\/]/, '_')
}
}