blob: e9fa0558e5d9f33f2dbabe223a6226cefb1aaf76 [file] [log] [blame]
/*
* Licensed to the Apache Software Foundation (ASF) under one or more
* contributor license agreements. See the NOTICE file distributed with
* this work for additional information regarding copyright ownership.
* The ASF licenses this file to You under the Apache License, Version 2.0
* (the "License") you may not use this file except in compliance with
* the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/
package org.apache.nifi.processors.standard
import org.apache.nifi.components.PropertyDescriptor
import org.apache.nifi.flowfile.FlowFile
import org.apache.nifi.util.MockComponentLog
import org.apache.nifi.util.MockProcessSession
import org.apache.nifi.util.TestRunner
import org.apache.nifi.util.TestRunners
import org.bouncycastle.jce.provider.BouncyCastleProvider
import org.junit.After
import org.junit.Before
import org.junit.BeforeClass
import org.junit.Test
import org.junit.runner.RunWith
import org.junit.runners.JUnit4
import org.mockito.Mockito
import org.slf4j.Logger
import org.slf4j.LoggerFactory
import java.nio.charset.StandardCharsets
import java.security.Security
import static org.mockito.ArgumentMatchers.anyBoolean
import static org.mockito.ArgumentMatchers.anyString
import static org.mockito.Mockito.doReturn
import static org.mockito.Mockito.spy
import static org.mockito.Mockito.when
@RunWith(JUnit4.class)
class CountTextTest extends GroovyTestCase {
private static final Logger logger = LoggerFactory.getLogger(CountTextTest.class)
private static final String TLC = "text.line.count"
private static final String TLNEC = "text.line.nonempty.count"
private static final String TWC = "text.word.count"
private static final String TCC = "text.character.count"
@BeforeClass
static void setUpOnce() throws Exception {
Security.addProvider(new BouncyCastleProvider())
logger.metaClass.methodMissing = { String name, args ->
logger.info("[${name?.toUpperCase()}] ${(args as List).join(" ")}")
}
}
@Before
void setUp() throws Exception {
}
@After
void tearDown() throws Exception {
}
@Test
void testShouldCountAllMetrics() throws Exception {
// Arrange
final TestRunner runner = TestRunners.newTestRunner(CountText.class)
runner.setProperty(CountText.TEXT_LINE_COUNT_PD, "true")
runner.setProperty(CountText.TEXT_LINE_NONEMPTY_COUNT_PD, "true")
runner.setProperty(CountText.TEXT_WORD_COUNT_PD, "true")
runner.setProperty(CountText.TEXT_CHARACTER_COUNT_PD, "true")
// This text is the same as in src/test/resources/TestCountText/jabberwocky.txt but is copied here
// to ensure that reading from a file vs. static text doesn't cause line break issues
String INPUT_TEXT = """’Twas brillig, and the slithy toves
Did gyre and gimble in the wade;
All mimsy were the borogoves,
And the mome raths outgrabe.
"Beware the Jabberwock, my son!
The jaws that bite, the claws that catch!
Beware the Jubjub bird, and shun
The frumious Bandersnatch!"
He took his vorpal sword in hand:
Long time the manxome foe he sought—
So rested he by the Tumtum tree,
And stood awhile in thought.
And as in uffish thought he stood,
The Jabberwock, with eyes of flame,
Came whiffling through the tulgey wood.
And burbled as it came!
One, two! One, two! And through and through
The vorpal blade went snicker-snack!
He left it dead, and with its head
He went galumphing back.
"And hast thou slain the Jabberwock?
Come to my arms, my beamish boy!
O frabjous day! Callooh! Callay!"
He chortled in his joy.
’Twas brillig, and the slithy toves
Did gyre and gimble in the wabe;
All mimsy were the borogoves,
And the mome raths outgrabe."""
runner.enqueue(INPUT_TEXT.bytes)
// Act
runner.run()
// Assert
runner.assertAllFlowFilesTransferred(CountText.REL_SUCCESS, 1)
FlowFile flowFile = runner.getFlowFilesForRelationship(CountText.REL_SUCCESS).first()
assert flowFile.attributes."$TLC" == 34 as String
assert flowFile.attributes."$TLNEC" == 28 as String
assert flowFile.attributes."$TWC" == 166 as String
assert flowFile.attributes."$TCC" == 900 as String
}
@Test
void testShouldCountEachMetric() throws Exception {
// Arrange
final TestRunner runner = TestRunners.newTestRunner(CountText.class)
String INPUT_TEXT = new File("src/test/resources/TestCountText/jabberwocky.txt").text
final def EXPECTED_VALUES = [
(TLC) : 34,
(TLNEC): 28,
(TWC) : 166,
(TCC) : 900,
]
def linesOnly = [(CountText.TEXT_LINE_COUNT_PD): "true"]
def linesNonEmptyOnly = [(CountText.TEXT_LINE_NONEMPTY_COUNT_PD): "true"]
def wordsOnly = [(CountText.TEXT_WORD_COUNT_PD): "true"]
def charactersOnly = [(CountText.TEXT_CHARACTER_COUNT_PD): "true"]
final List<Map<PropertyDescriptor, String>> SCENARIOS = [linesOnly, linesNonEmptyOnly, wordsOnly, charactersOnly]
SCENARIOS.each { map ->
// Reset the processor properties
runner.setProperty(CountText.TEXT_LINE_COUNT_PD, "false")
runner.setProperty(CountText.TEXT_LINE_NONEMPTY_COUNT_PD, "false")
runner.setProperty(CountText.TEXT_WORD_COUNT_PD, "false")
runner.setProperty(CountText.TEXT_CHARACTER_COUNT_PD, "false")
// Apply the scenario-specific properties
map.each { key, value ->
runner.setProperty(key, value)
}
runner.clearProvenanceEvents()
runner.clearTransferState()
runner.enqueue(INPUT_TEXT.bytes)
// Act
runner.run()
// Assert
runner.assertAllFlowFilesTransferred(CountText.REL_SUCCESS, 1)
FlowFile flowFile = runner.getFlowFilesForRelationship(CountText.REL_SUCCESS).first()
logger.info("Generated flowfile: ${flowFile} | ${flowFile.attributes}")
EXPECTED_VALUES.each { key, value ->
if (flowFile.attributes.containsKey(key)) {
assert flowFile.attributes.get(key) == value as String
}
}
}
}
@Test
void testShouldCountWordsSplitOnSymbol() throws Exception {
// Arrange
final TestRunner runner = TestRunners.newTestRunner(CountText.class)
String INPUT_TEXT = new File("src/test/resources/TestCountText/jabberwocky.txt").text
final int EXPECTED_WORD_COUNT = 167
// Reset the processor properties
runner.setProperty(CountText.TEXT_LINE_COUNT_PD, "false")
runner.setProperty(CountText.TEXT_LINE_NONEMPTY_COUNT_PD, "false")
runner.setProperty(CountText.TEXT_WORD_COUNT_PD, "true")
runner.setProperty(CountText.TEXT_CHARACTER_COUNT_PD, "false")
runner.setProperty(CountText.SPLIT_WORDS_ON_SYMBOLS_PD, "true")
runner.clearProvenanceEvents()
runner.clearTransferState()
runner.enqueue(INPUT_TEXT.bytes)
// Act
runner.run()
// Assert
runner.assertAllFlowFilesTransferred(CountText.REL_SUCCESS, 1)
FlowFile flowFile = runner.getFlowFilesForRelationship(CountText.REL_SUCCESS).first()
logger.info("Generated flowfile: ${flowFile} | ${flowFile.attributes}")
assert flowFile.attributes.get(CountText.TEXT_WORD_COUNT) == EXPECTED_WORD_COUNT as String
}
@Test
void testShouldCountIndependentlyPerFlowFile() throws Exception {
// Arrange
final TestRunner runner = TestRunners.newTestRunner(CountText.class)
String INPUT_TEXT = new File("src/test/resources/TestCountText/jabberwocky.txt").text
final def EXPECTED_VALUES = [
(TLC) : 34,
(TLNEC): 28,
(TWC) : 166,
(TCC) : 900,
]
// Reset the processor properties
runner.setProperty(CountText.TEXT_LINE_COUNT_PD, "true")
runner.setProperty(CountText.TEXT_LINE_NONEMPTY_COUNT_PD, "true")
runner.setProperty(CountText.TEXT_WORD_COUNT_PD, "true")
runner.setProperty(CountText.TEXT_CHARACTER_COUNT_PD, "true")
2.times { int i ->
runner.clearProvenanceEvents()
runner.clearTransferState()
runner.enqueue(INPUT_TEXT.bytes)
// Act
runner.run()
// Assert
runner.assertAllFlowFilesTransferred(CountText.REL_SUCCESS, 1)
FlowFile flowFile = runner.getFlowFilesForRelationship(CountText.REL_SUCCESS).first()
logger.info("Generated flowfile: ${flowFile} | ${flowFile.attributes}")
EXPECTED_VALUES.each { key, value ->
if (flowFile.attributes.containsKey(key)) {
assert flowFile.attributes.get(key) == value as String
}
}
}
}
@Test
void testShouldTrackSessionCountersAcrossMultipleFlowfiles() throws Exception {
// Arrange
final TestRunner runner = TestRunners.newTestRunner(CountText.class)
String INPUT_TEXT = new File("src/test/resources/TestCountText/jabberwocky.txt").text
final def EXPECTED_VALUES = [
(TLC) : 34,
(TLNEC): 28,
(TWC) : 166,
(TCC) : 900,
]
// Reset the processor properties
runner.setProperty(CountText.TEXT_LINE_COUNT_PD, "true")
runner.setProperty(CountText.TEXT_LINE_NONEMPTY_COUNT_PD, "true")
runner.setProperty(CountText.TEXT_WORD_COUNT_PD, "true")
runner.setProperty(CountText.TEXT_CHARACTER_COUNT_PD, "true")
MockProcessSession mockPS = runner.processSessionFactory.createSession()
def sessionCounters = mockPS.sharedState.counterMap
logger.info("Session counters (0): ${sessionCounters}")
int n = 2
n.times { int i ->
runner.clearTransferState()
runner.enqueue(INPUT_TEXT.bytes)
// Act
runner.run()
logger.info("Session counters (${i + 1}): ${sessionCounters}")
// Assert
runner.assertAllFlowFilesTransferred(CountText.REL_SUCCESS, 1)
FlowFile flowFile = runner.getFlowFilesForRelationship(CountText.REL_SUCCESS).first()
logger.info("Generated flowfile: ${flowFile} | ${flowFile.attributes}")
EXPECTED_VALUES.each { key, value ->
if (flowFile.attributes.containsKey(key)) {
assert flowFile.attributes.get(key) == value as String
}
}
}
assert sessionCounters.get("Lines Counted").get() == EXPECTED_VALUES[TLC] * n as long
assert sessionCounters.get("Lines (non-empty) Counted").get() == EXPECTED_VALUES[TLNEC] * n as long
assert sessionCounters.get("Words Counted").get() == EXPECTED_VALUES[TWC] * n as long
assert sessionCounters.get("Characters Counted").get() == EXPECTED_VALUES[TCC] * n as long
}
@Test
void testShouldHandleInternalError() throws Exception {
// Arrange
CountText ct = new CountText()
ct.countLines = true
ct.countLinesNonEmpty = true
ct.countWords = true
ct.countCharacters = true
CountText ctSpy = Mockito.spy(ct)
when(ctSpy.countWordsInLine(anyString(), anyBoolean())).thenThrow(new IOException("Expected exception"))
final TestRunner runner = TestRunners.newTestRunner(ctSpy)
final String INPUT_TEXT = "This flowfile should throw an error"
// Reset the processor properties
runner.setProperty(CountText.TEXT_LINE_COUNT_PD, "true")
runner.setProperty(CountText.TEXT_LINE_NONEMPTY_COUNT_PD, "true")
runner.setProperty(CountText.TEXT_WORD_COUNT_PD, "true")
runner.setProperty(CountText.TEXT_CHARACTER_COUNT_PD, "true")
runner.setProperty(CountText.CHARACTER_ENCODING_PD, StandardCharsets.US_ASCII.displayName())
runner.enqueue(INPUT_TEXT.bytes)
// Act
// Need initialize = true to run #onScheduled()
runner.run(1, true, true)
// Assert
runner.assertAllFlowFilesTransferred(CountText.REL_FAILURE, 1)
FlowFile flowFile = runner.getFlowFilesForRelationship(CountText.REL_FAILURE).first()
logger.info("Generated flowfile: ${flowFile} | ${flowFile.attributes}")
}
@Test
void testShouldIgnoreWhitespaceWordsWhenCounting() throws Exception {
// Arrange
final TestRunner runner = TestRunners.newTestRunner(CountText.class)
String INPUT_TEXT = "a b c"
final int EXPECTED_WORD_COUNT = 3
// Reset the processor properties
runner.setProperty(CountText.TEXT_LINE_COUNT_PD, "false")
runner.setProperty(CountText.TEXT_LINE_NONEMPTY_COUNT_PD, "false")
runner.setProperty(CountText.TEXT_WORD_COUNT_PD, "true")
runner.setProperty(CountText.TEXT_CHARACTER_COUNT_PD, "false")
runner.setProperty(CountText.SPLIT_WORDS_ON_SYMBOLS_PD, "true")
runner.clearProvenanceEvents()
runner.clearTransferState()
runner.enqueue(INPUT_TEXT.bytes)
// Act
runner.run()
// Assert
runner.assertAllFlowFilesTransferred(CountText.REL_SUCCESS, 1)
FlowFile flowFile = runner.getFlowFilesForRelationship(CountText.REL_SUCCESS).first()
logger.info("Generated flowfile: ${flowFile} | ${flowFile.attributes}")
assert flowFile.attributes.get(CountText.TEXT_WORD_COUNT) == EXPECTED_WORD_COUNT as String
}
@Test
void testShouldIgnoreWhitespaceWordsWhenCountingDebugMode() throws Exception {
// Arrange
MockComponentLog componentLogger = spy(new MockComponentLog("processorId", new CountText()))
doReturn(true).when(componentLogger).isDebugEnabled()
final TestRunner runner = TestRunners.newTestRunner(CountText.class, componentLogger)
String INPUT_TEXT = "a b c"
final int EXPECTED_WORD_COUNT = 3
// Reset the processor properties
runner.setProperty(CountText.TEXT_LINE_COUNT_PD, "false")
runner.setProperty(CountText.TEXT_LINE_NONEMPTY_COUNT_PD, "false")
runner.setProperty(CountText.TEXT_WORD_COUNT_PD, "true")
runner.setProperty(CountText.TEXT_CHARACTER_COUNT_PD, "false")
runner.setProperty(CountText.SPLIT_WORDS_ON_SYMBOLS_PD, "true")
runner.clearProvenanceEvents()
runner.clearTransferState()
runner.enqueue(INPUT_TEXT.bytes)
// Act
runner.run()
// Assert
runner.assertAllFlowFilesTransferred(CountText.REL_SUCCESS, 1)
FlowFile flowFile = runner.getFlowFilesForRelationship(CountText.REL_SUCCESS).first()
logger.info("Generated flowfile: ${flowFile} | ${flowFile.attributes}")
assert flowFile.attributes.get(CountText.TEXT_WORD_COUNT) == EXPECTED_WORD_COUNT as String
}
}