blob: ec25594648f627210082aa68b9eabbb5bf049466 [file] [log] [blame]
/*
* Licensed to the Apache Software Foundation (ASF) under one or more
* contributor license agreements. See the NOTICE file distributed with
* this work for additional information regarding copyright ownership.
* The ASF licenses this file to You under the Apache License, Version 2.0
* (the "License") you may not use this file except in compliance with
* the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/
package org.apache.nifi.processors.standard
import org.apache.nifi.security.util.crypto.HashAlgorithm
import org.apache.nifi.security.util.crypto.HashService
import org.apache.nifi.util.MockFlowFile
import org.apache.nifi.util.TestRunner
import org.apache.nifi.util.TestRunners
import org.bouncycastle.jce.provider.BouncyCastleProvider
import org.junit.After
import org.junit.Before
import org.junit.BeforeClass
import org.junit.Test
import org.junit.runner.RunWith
import org.junit.runners.JUnit4
import org.slf4j.Logger
import org.slf4j.LoggerFactory
import java.nio.charset.StandardCharsets
import java.security.Security
@RunWith(JUnit4.class)
class CryptographicHashContentTest extends GroovyTestCase {
private static final Logger logger = LoggerFactory.getLogger(CryptographicHashContentTest.class)
@BeforeClass
static void setUpOnce() throws Exception {
Security.addProvider(new BouncyCastleProvider())
logger.metaClass.methodMissing = { String name, args ->
logger.info("[${name?.toUpperCase()}] ${(args as List).join(" ")}")
}
}
@Before
void setUp() throws Exception {
}
@After
void tearDown() throws Exception {
}
@Test
void testShouldCalculateHashOfPresentContent() {
// Arrange
def algorithms = HashAlgorithm.values()
// Generate some long content (90 KB)
final String LONG_CONTENT = "apachenifi " * 8192
final TestRunner runner = TestRunners.newTestRunner(new CryptographicHashContent())
algorithms.each { HashAlgorithm algorithm ->
final String EXPECTED_CONTENT_HASH = HashService.hashValueStreaming(algorithm, new ByteArrayInputStream(LONG_CONTENT.bytes))
logger.info("Expected ${algorithm.name.padLeft(11)}: ${EXPECTED_CONTENT_HASH}")
// Reset the processor
runner.clearProperties()
runner.clearProvenanceEvents()
runner.clearTransferState()
// Set the algorithm
logger.info("Setting hash algorithm to ${algorithm.name}")
runner.setProperty(CryptographicHashContent.HASH_ALGORITHM, algorithm.name)
// Insert the content in the mock flowfile
runner.enqueue(LONG_CONTENT.getBytes(StandardCharsets.UTF_8),
[size: LONG_CONTENT.length() as String])
// Act
runner.run(1)
// Assert
runner.assertTransferCount(CryptographicHashContent.REL_FAILURE, 0)
runner.assertTransferCount(CryptographicHashContent.REL_SUCCESS, 1)
final List<MockFlowFile> successfulFlowfiles = runner.getFlowFilesForRelationship(CryptographicHashContent.REL_SUCCESS)
// Extract the generated attributes from the flowfile
MockFlowFile flowFile = successfulFlowfiles.first()
String hashAttribute = "content_${algorithm.name}"
flowFile.assertAttributeExists(hashAttribute)
String hashedContent = flowFile.getAttribute(hashAttribute)
logger.info("flowfile.${hashAttribute} = ${hashedContent}")
assert hashedContent == EXPECTED_CONTENT_HASH
}
}
@Test
void testShouldCalculateHashOfEmptyContent() {
// Arrange
def algorithms = HashAlgorithm.values()
final String EMPTY_CONTENT = ""
final TestRunner runner = TestRunners.newTestRunner(new CryptographicHashContent())
algorithms.each { HashAlgorithm algorithm ->
final String EXPECTED_CONTENT_HASH = HashService.hashValueStreaming(algorithm, new ByteArrayInputStream(EMPTY_CONTENT.bytes))
logger.info("Expected ${algorithm.name.padLeft(11)}: ${EXPECTED_CONTENT_HASH}")
// Reset the processor
runner.clearProperties()
runner.clearProvenanceEvents()
runner.clearTransferState()
// Set the algorithm
logger.info("Setting hash algorithm to ${algorithm.name}")
runner.setProperty(CryptographicHashContent.HASH_ALGORITHM, algorithm.name)
// Insert the content in the mock flowfile
runner.enqueue(EMPTY_CONTENT.getBytes(StandardCharsets.UTF_8), [size: "0"])
// Act
runner.run(1)
// Assert
runner.assertTransferCount(CryptographicHashContent.REL_FAILURE, 0)
runner.assertTransferCount(CryptographicHashContent.REL_SUCCESS, 1)
final List<MockFlowFile> successfulFlowfiles = runner.getFlowFilesForRelationship(CryptographicHashContent.REL_SUCCESS)
// Extract the generated attributes from the flowfile
MockFlowFile flowFile = successfulFlowfiles.first()
String hashAttribute = "content_${algorithm.name}"
flowFile.assertAttributeExists(hashAttribute)
String hashedContent = flowFile.getAttribute(hashAttribute)
logger.info("flowfile.${hashAttribute} = ${hashedContent}")
assert hashedContent == EXPECTED_CONTENT_HASH
}
}
/**
* This test works because {@link MockFlowFile} uses the actual internal {@code data.size} for {@code getSize ( )}, while {@code StandardFlowFileRecord} uses a separate {@code size} field. May need to use {@code flowfile.getContentClaim ( ) .getLength ( )}.
*/
@Test
void testShouldCalculateHashOfContentWithIncorrectSizeAttribute() {
// Arrange
def algorithms = HashAlgorithm.values()
final String NON_EMPTY_CONTENT = "apachenifi"
final TestRunner runner = TestRunners.newTestRunner(new CryptographicHashContent())
algorithms.each { HashAlgorithm algorithm ->
final String EXPECTED_CONTENT_HASH = HashService.hashValueStreaming(algorithm, new ByteArrayInputStream(NON_EMPTY_CONTENT.bytes))
logger.info("Expected ${algorithm.name.padLeft(11)}: ${EXPECTED_CONTENT_HASH}")
// Reset the processor
runner.clearProperties()
runner.clearProvenanceEvents()
runner.clearTransferState()
// Set the algorithm
logger.info("Setting hash algorithm to ${algorithm.name}")
runner.setProperty(CryptographicHashContent.HASH_ALGORITHM, algorithm.name)
// Insert the content in the mock flowfile (with the wrong size attribute)
runner.enqueue(NON_EMPTY_CONTENT.getBytes(StandardCharsets.UTF_8), [size: "0"])
// Act
runner.run(1)
// Assert
runner.assertTransferCount(CryptographicHashContent.REL_FAILURE, 0)
runner.assertTransferCount(CryptographicHashContent.REL_SUCCESS, 1)
final List<MockFlowFile> successfulFlowfiles = runner.getFlowFilesForRelationship(CryptographicHashContent.REL_SUCCESS)
// Extract the generated attributes from the flowfile
MockFlowFile flowFile = successfulFlowfiles.first()
String hashAttribute = "content_${algorithm.name}"
flowFile.assertAttributeExists(hashAttribute)
String hashedContent = flowFile.getAttribute(hashAttribute)
logger.info("flowfile.${hashAttribute} = ${hashedContent}")
assert hashedContent == EXPECTED_CONTENT_HASH
}
}
@Test
void testShouldOverwriteExistingAttribute() {
// Arrange
final String NON_EMPTY_CONTENT = "apachenifi"
final String OLD_HASH_ATTRIBUTE_VALUE = "OLD VALUE"
HashAlgorithm algorithm = HashAlgorithm.SHA256
final TestRunner runner = TestRunners.newTestRunner(new CryptographicHashContent())
final String EXPECTED_CONTENT_HASH = HashService.hashValue(algorithm, NON_EMPTY_CONTENT)
logger.info("Expected ${algorithm.name.padLeft(11)}: ${EXPECTED_CONTENT_HASH}")
// Set the algorithm
logger.info("Setting hash algorithm to ${algorithm.name}")
runner.setProperty(CryptographicHashContent.HASH_ALGORITHM, algorithm.name)
// Insert the content in the mock flowfile (with an existing attribute)
def oldAttributes = [("content_${algorithm.name}".toString()): OLD_HASH_ATTRIBUTE_VALUE]
runner.enqueue(NON_EMPTY_CONTENT.getBytes(StandardCharsets.UTF_8),
oldAttributes)
// Act
runner.run(1)
// Assert
runner.assertTransferCount(CryptographicHashContent.REL_FAILURE, 0)
runner.assertTransferCount(CryptographicHashContent.REL_SUCCESS, 1)
final List<MockFlowFile> successfulFlowfiles = runner.getFlowFilesForRelationship(CryptographicHashContent.REL_SUCCESS)
// Extract the generated attributes from the flowfile
MockFlowFile flowFile = successfulFlowfiles.first()
String hashAttribute = "content_${algorithm.name}"
flowFile.assertAttributeExists(hashAttribute)
String hashedContent = flowFile.getAttribute(hashAttribute)
logger.info("flowfile.${hashAttribute} = ${hashedContent}")
assert hashedContent != OLD_HASH_ATTRIBUTE_VALUE
assert hashedContent == EXPECTED_CONTENT_HASH
}
@Test
void testShouldRouteToFailureOnEmptyContent() {
// Arrange
def algorithms = HashAlgorithm.values()
final String EMPTY_CONTENT = ""
final TestRunner runner = TestRunners.newTestRunner(new CryptographicHashContent())
algorithms.each { HashAlgorithm algorithm ->
final String EXPECTED_CONTENT_HASH = HashService.hashValueStreaming(algorithm, new ByteArrayInputStream(EMPTY_CONTENT.bytes))
logger.info("Expected ${algorithm.name.padLeft(11)}: ${EXPECTED_CONTENT_HASH}")
// Reset the processor
runner.clearProperties()
runner.clearProvenanceEvents()
runner.clearTransferState()
// Set the failure property
logger.info("Setting fail when empty to true")
runner.setProperty(CryptographicHashContent.FAIL_WHEN_EMPTY, "true")
// Set the algorithm
logger.info("Setting hash algorithm to ${algorithm.name}")
runner.setProperty(CryptographicHashContent.HASH_ALGORITHM, algorithm.name)
// Insert the content in the mock flowfile
runner.enqueue(EMPTY_CONTENT.getBytes(StandardCharsets.UTF_8))
// Act
runner.run(1)
// Assert
runner.assertTransferCount(CryptographicHashContent.REL_FAILURE, 1)
runner.assertTransferCount(CryptographicHashContent.REL_SUCCESS, 0)
final List<MockFlowFile> failedFlowfiles = runner.getFlowFilesForRelationship(CryptographicHashContent.REL_FAILURE)
// Extract the generated attributes from the flowfile
MockFlowFile flowFile = failedFlowfiles.first()
String hashAttribute = "content_${algorithm.name}"
flowFile.assertAttributeNotExists(hashAttribute)
}
}
}