blob: bb00309ec0e79a88b7103029c0f004839d94d584 [file] [log] [blame]
/*
* Licensed to the Apache Software Foundation (ASF) under one or more
* contributor license agreements. See the NOTICE file distributed with
* this work for additional information regarding copyright ownership.
* The ASF licenses this file to You under the Apache License, Version 2.0
* (the "License"); you may not use this file except in compliance with
* the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/
package org.apache.nifi.integration.processgroup;
import org.apache.nifi.components.PropertyDescriptor;
import org.apache.nifi.components.PropertyDescriptor.Builder;
import org.apache.nifi.connectable.Connection;
import org.apache.nifi.controller.ComponentNode;
import org.apache.nifi.controller.ProcessorNode;
import org.apache.nifi.controller.queue.DropFlowFileState;
import org.apache.nifi.controller.queue.DropFlowFileStatus;
import org.apache.nifi.expression.ExpressionLanguageScope;
import org.apache.nifi.groups.ProcessGroup;
import org.apache.nifi.integration.FrameworkIntegrationTest;
import org.apache.nifi.processor.AbstractProcessor;
import org.apache.nifi.processor.ProcessContext;
import org.apache.nifi.processor.ProcessSession;
import org.apache.nifi.processor.exception.ProcessException;
import org.apache.nifi.processor.util.StandardValidators;
import org.apache.nifi.util.NiFiProperties;
import org.junit.Assert;
import org.junit.Test;
import java.util.Collections;
import java.util.List;
import java.util.Set;
import java.util.concurrent.TimeUnit;
import static org.junit.Assert.assertEquals;
import static org.junit.Assert.assertTrue;
public class StandardProcessGroupIT extends FrameworkIntegrationTest {
@Test
public void testProcessGroupDefaults() {
// Connect two processors with default settings of the root process group
ProcessorNode sourceProcessor = createGenerateProcessor(1);
ProcessorNode destinationProcessor = createProcessorNode((context, session) -> {});
Connection connection = connect(sourceProcessor, destinationProcessor, Collections.singleton(REL_SUCCESS));
// Verify all defaults are in place on the process group and the connection
assertEquals("0 sec", getRootGroup().getDefaultFlowFileExpiration());
assertEquals(NiFiProperties.DEFAULT_BACKPRESSURE_COUNT, (long) getRootGroup().getDefaultBackPressureObjectThreshold());
assertEquals(NiFiProperties.DEFAULT_BACKPRESSURE_SIZE, getRootGroup().getDefaultBackPressureDataSizeThreshold());
assertEquals("0 sec", connection.getFlowFileQueue().getFlowFileExpiration());
assertEquals(NiFiProperties.DEFAULT_BACKPRESSURE_COUNT, (long) connection.getFlowFileQueue().getBackPressureObjectThreshold());
assertEquals(NiFiProperties.DEFAULT_BACKPRESSURE_SIZE, connection.getFlowFileQueue().getBackPressureDataSizeThreshold());
// Update default settings of the process group, and create a new connection
getRootGroup().setDefaultFlowFileExpiration("99 min");
getRootGroup().setDefaultBackPressureObjectThreshold(99L);
getRootGroup().setDefaultBackPressureDataSizeThreshold("99 MB");
ProcessorNode sourceProcessor1 = createGenerateProcessor(1);
ProcessorNode destinationProcessor1 = createProcessorNode((context, session) -> {});
Connection connection1 = connect(sourceProcessor1, destinationProcessor1, Collections.singleton(REL_SUCCESS));
// Verify updated settings are in place on the process group and the connection
assertEquals("99 min", getRootGroup().getDefaultFlowFileExpiration());
assertEquals(99, (long) getRootGroup().getDefaultBackPressureObjectThreshold());
assertEquals("99 MB", getRootGroup().getDefaultBackPressureDataSizeThreshold());
assertEquals("99 min", connection1.getFlowFileQueue().getFlowFileExpiration());
assertEquals(99, (long) connection1.getFlowFileQueue().getBackPressureObjectThreshold());
assertEquals("99 MB", connection1.getFlowFileQueue().getBackPressureDataSizeThreshold());
}
@Test
public void testDropAllFlowFilesFromOneConnection() throws Exception {
ProcessorNode sourceProcessGroup = createGenerateProcessor(1);
ProcessorNode destinationProcessGroup = createProcessorNode((context, session) -> {});
Connection connection = connect(sourceProcessGroup, destinationProcessGroup, Collections.singleton(REL_SUCCESS));
for (int i = 0; i < 5; i++) {
triggerOnce(sourceProcessGroup);
}
assertEquals(5, connection.getFlowFileQueue().size().getObjectCount());
// WHEN
DropFlowFileStatus dropFlowFileStatus = getRootGroup().dropAllFlowFiles("requestId", "unimportant");
while (dropFlowFileStatus.getState() != DropFlowFileState.COMPLETE) {
TimeUnit.MILLISECONDS.sleep(10);
dropFlowFileStatus = getRootGroup().getDropAllFlowFilesStatus("requestId");
}
// THEN
assertEquals(0, connection.getFlowFileQueue().size().getObjectCount());
}
@Test
public void testDropAllFlowFilesFromOneConnectionInChildProcessGroup() throws Exception {
ProcessGroup childProcessGroup = getFlowController().getFlowManager().createProcessGroup("childProcessGroup");
childProcessGroup.setName("ChildProcessGroup");
getRootGroup().addProcessGroup(childProcessGroup);
ProcessorNode sourceProcessGroup = createGenerateProcessor(1);
moveProcessor(sourceProcessGroup, childProcessGroup);
ProcessorNode destinationProcessGroup = createProcessorNode((context, session) -> {});
moveProcessor(destinationProcessGroup, childProcessGroup);
Connection connection = connect(childProcessGroup, sourceProcessGroup, destinationProcessGroup, Collections.singleton(REL_SUCCESS));
for (int i = 0; i < 5; i++) {
triggerOnce(sourceProcessGroup);
}
assertEquals(5, connection.getFlowFileQueue().size().getObjectCount());
// WHEN
DropFlowFileStatus dropFlowFileStatus = getRootGroup().dropAllFlowFiles("requestId", "unimportant");
while (dropFlowFileStatus.getState() != DropFlowFileState.COMPLETE) {
TimeUnit.MILLISECONDS.sleep(10);
dropFlowFileStatus = childProcessGroup.getDropAllFlowFilesStatus("requestId");
}
// THEN
assertEquals(0, connection.getFlowFileQueue().size().getObjectCount());
}
@Test
public void testDropAllFlowFilesFromMultipleConnections() throws Exception {
ProcessGroup childProcessGroup = getFlowController().getFlowManager().createProcessGroup("childProcessGroup");
childProcessGroup.setName("ChildProcessGroup");
getRootGroup().addProcessGroup(childProcessGroup);
ProcessorNode sourceProcessGroup1 = createGenerateProcessor(4);
moveProcessor(sourceProcessGroup1, childProcessGroup);
ProcessorNode sourceProcessGroup2 = createGenerateProcessor(5);
moveProcessor(sourceProcessGroup2, childProcessGroup);
ProcessorNode destinationProcessGroup = createProcessorNode((context, session) -> {});
moveProcessor(destinationProcessGroup, childProcessGroup);
Connection connection1 = connect(childProcessGroup, sourceProcessGroup1, destinationProcessGroup, Collections.singleton(REL_SUCCESS));
Connection connection2 = connect(childProcessGroup, sourceProcessGroup2, destinationProcessGroup, Collections.singleton(REL_SUCCESS));
for (int i = 0; i < 5; i++) {
triggerOnce(sourceProcessGroup1);
}
assertEquals(5, connection1.getFlowFileQueue().size().getObjectCount());
for (int i = 0; i < 10; i++) {
triggerOnce(sourceProcessGroup2);
}
assertEquals(10, connection2.getFlowFileQueue().size().getObjectCount());
// WHEN
DropFlowFileStatus dropFlowFileStatus = getRootGroup().dropAllFlowFiles("requestId", "unimportant");
while (dropFlowFileStatus.getState() != DropFlowFileState.COMPLETE) {
TimeUnit.MILLISECONDS.sleep(10);
dropFlowFileStatus = childProcessGroup.getDropAllFlowFilesStatus("requestId");
}
// THEN
assertEquals(5 + 10, dropFlowFileStatus.getOriginalSize().getObjectCount());
assertEquals(20 + 50, dropFlowFileStatus.getOriginalSize().getByteCount());
assertEquals(0, dropFlowFileStatus.getCurrentSize().getObjectCount());
assertEquals(0, dropFlowFileStatus.getCurrentSize().getByteCount());
assertEquals(5 + 10, dropFlowFileStatus.getDroppedSize().getObjectCount());
assertEquals(20 + 50, dropFlowFileStatus.getDroppedSize().getByteCount());
assertEquals(0, connection1.getFlowFileQueue().size().getObjectCount());
assertEquals(0, connection2.getFlowFileQueue().size().getObjectCount());
}
@Test
public void testComponentsAffectedByVariableOverridden() {
final ProcessGroup child = getFlowController().getFlowManager().createProcessGroup("child");
child.setName("Child");
child.setVariables(Collections.singletonMap("number", "5"));
getRootGroup().setVariables(Collections.singletonMap("number", "1"));
getRootGroup().addProcessGroup(child);
final ProcessorNode processor = createProcessorNode(NumberRefProcessor.class);
processor.setProperties(Collections.singletonMap(NumberRefProcessor.NUMBER.getName(), "${number}"));
moveProcessor(processor, child);
final Set<ComponentNode> componentsAffected = child.getComponentsAffectedByVariable("number");
assertEquals(1, componentsAffected.size());
assertTrue(componentsAffected.contains(processor));
final Set<ComponentNode> rootAffected = getRootGroup().getComponentsAffectedByVariable("number");
assertTrue(rootAffected.isEmpty());
processor.setScheduldingPeriod("1 hour");
child.startProcessor(processor, false);
getRootGroup().setVariables(Collections.singletonMap("number", "2"));
try {
child.setVariables(Collections.singletonMap("number", "10"));
Assert.fail("Updated variable that is referenced by a running processor");
} catch (final IllegalStateException ise) {
// Expected
}
child.stopProcessor(processor);
}
public static class NumberRefProcessor extends AbstractProcessor {
static final PropertyDescriptor NUMBER = new Builder()
.name("Number")
.displayName("Number")
.description("A Number")
.required(true)
.addValidator(StandardValidators.NUMBER_VALIDATOR)
.expressionLanguageSupported(ExpressionLanguageScope.VARIABLE_REGISTRY)
.defaultValue("1")
.build();
@Override
protected List<PropertyDescriptor> getSupportedPropertyDescriptors() {
return Collections.singletonList(NUMBER);
}
@Override
public void onTrigger(final ProcessContext context, final ProcessSession session) throws ProcessException {
}
}
}