blob: da35e8c60d86ba31f9a60e8156deee2a75657889 [file] [log] [blame]
/*
* Licensed to the Apache Software Foundation (ASF) under one or more
* contributor license agreements. See the NOTICE file distributed with
* this work for additional information regarding copyright ownership.
* The ASF licenses this file to You under the Apache License, Version 2.0
* (the "License"); you may not use this file except in compliance with
* the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/
package org.apache.nifi.tests.system.parameters;
import org.apache.nifi.tests.system.NiFiSystemIT;
import org.apache.nifi.toolkit.cli.impl.client.nifi.NiFiClientException;
import org.apache.nifi.toolkit.cli.impl.client.nifi.ParamContextClient;
import org.apache.nifi.web.api.dto.ParameterContextDTO;
import org.apache.nifi.web.api.dto.ParameterDTO;
import org.apache.nifi.web.api.dto.ProcessorConfigDTO;
import org.apache.nifi.web.api.entity.AffectedComponentEntity;
import org.apache.nifi.web.api.entity.ControllerServiceEntity;
import org.apache.nifi.web.api.entity.ParameterContextEntity;
import org.apache.nifi.web.api.entity.ParameterContextUpdateRequestEntity;
import org.apache.nifi.web.api.entity.ParameterEntity;
import org.apache.nifi.web.api.entity.ProcessGroupEntity;
import org.apache.nifi.web.api.entity.ProcessorEntity;
import org.junit.Test;
import java.io.File;
import java.io.IOException;
import java.nio.file.Files;
import java.util.Arrays;
import java.util.Collections;
import java.util.HashMap;
import java.util.HashSet;
import java.util.List;
import java.util.Map;
import java.util.Set;
import java.util.stream.Collectors;
import static org.junit.Assert.assertEquals;
import static org.junit.Assert.assertFalse;
import static org.junit.Assert.assertNull;
import static org.junit.Assert.assertSame;
import static org.junit.Assert.assertTrue;
public class ParameterContextIT extends NiFiSystemIT {
@Test
public void testCreateParameterContext() throws NiFiClientException, IOException {
final Set<ParameterEntity> parameterEntities = new HashSet<>();
parameterEntities.add(createParameterEntity("foo", null, false, "bar"));
final ParameterContextEntity entity = createParameterContextEntity(getTestName(), "System Test for verifying creation of Parameter Context", parameterEntities);
final ParamContextClient paramContextClient = getNifiClient().getParamContextClient();
final ParameterContextEntity returned = paramContextClient.createParamContext(entity);
assertSingleFooCreation(returned);
final String contextId = returned.getId();
final ParameterContextEntity fetched = paramContextClient.getParamContext(contextId);
assertSingleFooCreation(fetched);
}
private void assertSingleFooCreation(final ParameterContextEntity entity) {
final ParameterContextDTO returnedDto = entity.getComponent();
assertEquals(getTestName(), returnedDto.getName());
final Set<ParameterEntity> returnedParamEntities = returnedDto.getParameters();
assertEquals(1, returnedParamEntities.size());
final ParameterDTO returnedParamDto = returnedParamEntities.iterator().next().getParameter();
assertEquals("foo", returnedParamDto.getName());
assertNull(returnedParamDto.getDescription());
assertSame(Boolean.FALSE,returnedParamDto.getSensitive());
assertEquals("bar", returnedParamDto.getValue());
}
@Test
public void testSensitiveParametersNotReturned() throws NiFiClientException, IOException {
final Set<ParameterEntity> parameterEntities = new HashSet<>();
parameterEntities.add(createParameterEntity("foo", null, true, "bar"));
final ParameterContextEntity entity = createParameterContextEntity(getTestName(), null, parameterEntities);
final ParamContextClient paramContextClient = getNifiClient().getParamContextClient();
final ParameterContextEntity returned = paramContextClient.createParamContext(entity);
assertSensitiveParametersNotReturned(returned);
final String contextId = returned.getId();
final ParameterContextEntity fetched = paramContextClient.getParamContext(contextId);
assertSensitiveParametersNotReturned(fetched);
}
private void assertSensitiveParametersNotReturned(final ParameterContextEntity entity) {
final ParameterContextDTO dto = entity.getComponent();
assertEquals(getTestName(), dto.getName());
final Set<ParameterEntity> returnedParamEntities = dto.getParameters();
assertEquals(1, returnedParamEntities.size());
final ParameterDTO returnedParamDto = returnedParamEntities.iterator().next().getParameter();
assertEquals("foo", returnedParamDto.getName());
assertNull(returnedParamDto.getDescription());
assertSame(Boolean.TRUE,returnedParamDto.getSensitive());
assertEquals("********", returnedParamDto.getValue());
}
@Test
public void testAddingMissingParameterMakesProcessorValid() throws NiFiClientException, IOException, InterruptedException {
final ProcessorEntity createdProcessorEntity = createProcessor(TEST_PROCESSORS_PACKAGE + ".CountEvents", NIFI_GROUP_ID, TEST_EXTENSIONS_ARTIFACT_ID, getNiFiVersion());
final String processorId = createdProcessorEntity.getId();
final ProcessorConfigDTO config = createdProcessorEntity.getComponent().getConfig();
config.setProperties(Collections.singletonMap("Name", "#{foo}"));
getNifiClient().getProcessorClient().updateProcessor(createdProcessorEntity);
waitForInvalidProcessor(processorId);
final Set<ParameterEntity> parameters = new HashSet<>();
parameters.add(createParameterEntity("foo", null, false, "bar"));
final ParameterContextEntity contextEntity = createParameterContextEntity(getTestName(), null, parameters);
final ParameterContextEntity createdContextEntity = getNifiClient().getParamContextClient().createParamContext(contextEntity);
setParameterContext("root", createdContextEntity);
waitForValidProcessor(processorId);
}
@Test
public void testValidationWithRequiredPropertiesAndDefault() throws NiFiClientException, IOException, InterruptedException {
final ProcessorEntity generate = getClientUtil().createProcessor("GenerateFlowFile");
getClientUtil().updateProcessorProperties(generate, Collections.singletonMap("File Size", "#{foo}"));
getClientUtil().setAutoTerminatedRelationships(generate, "success");
final String processorId = generate.getId();
waitForInvalidProcessor(processorId);
final ParameterEntity fooKB = createParameterEntity("foo", null, false, "1 KB");
final Set<ParameterEntity> parameters = new HashSet<>();
parameters.add(fooKB);
final ParameterContextEntity contextEntity = createParameterContextEntity(getTestName(), null, parameters);
final ParameterContextEntity createdContextEntity = getNifiClient().getParamContextClient().createParamContext(contextEntity);
setParameterContext("root", createdContextEntity);
waitForValidProcessor(processorId);
final ParameterEntity fooNull = createParameterEntity("foo", null, false, null);
createdContextEntity.getComponent().setParameters(Collections.singleton(fooNull));
getNifiClient().getParamContextClient().updateParamContext(createdContextEntity);
// Should remain valid because property has a default.
waitForValidProcessor(processorId);
}
@Test(timeout=30000)
public void testValidationWithRequiredPropertiesAndNoDefault() throws NiFiClientException, IOException, InterruptedException {
final ProcessorEntity generate = getClientUtil().createProcessor("DependOnProperties");
final Map<String, String> properties = new HashMap<>();
properties.put("Always Required", "#{foo}");
properties.put("Required If Always Required Is Bar Or Baz", "15");
getClientUtil().updateProcessorProperties(generate, properties);
final String processorId = generate.getId();
waitForInvalidProcessor(processorId);
final ParameterEntity fooBar = createParameterEntity("foo", null, false, "bar");
final Set<ParameterEntity> parameters = new HashSet<>();
parameters.add(fooBar);
final ParameterContextEntity contextEntity = createParameterContextEntity(getTestName(), null, parameters);
final ParameterContextEntity createdContextEntity = getNifiClient().getParamContextClient().createParamContext(contextEntity);
setParameterContext("root", createdContextEntity);
waitForValidProcessor(processorId);
// Create a Parameter that sets the 'foo' value to null and denote that the parameter's value should be explicitly removed.
final ParameterEntity fooNull = createParameterEntity("foo", null, false, null);
fooNull.getParameter().setValueRemoved(true);
createdContextEntity.getComponent().setParameters(Collections.singleton(fooNull));
getNifiClient().getParamContextClient().updateParamContext(createdContextEntity);
// Should become invalid because property is required and has no default
waitForInvalidProcessor(processorId);
}
@Test
public void testParametersReferencingEL() throws NiFiClientException, IOException, InterruptedException {
final ProcessorEntity generate = getClientUtil().createProcessor("GenerateFlowFile");
getClientUtil().updateProcessorProperties(generate, Collections.singletonMap("a", "1"));
getClientUtil().updateProcessorSchedulingPeriod(generate, "10 min");
final ProcessorEntity evaluate = getClientUtil().createProcessor("EvaluatePropertiesWithDifferentELScopes");
final Map<String, String> evaluateProperties = new HashMap<>();
evaluateProperties.put("FlowFile Context", "#{A}");
evaluateProperties.put("Variable Registry Context", "#{A Replace With 5}");
evaluateProperties.put("Expression Language Not Evaluated", "#{Eleven A}");
getClientUtil().updateProcessorProperties(evaluate, evaluateProperties);
getClientUtil().createConnection(generate, evaluate, "success");
getClientUtil().setAutoTerminatedRelationships(evaluate, "success");
final Set<ParameterEntity> parameters = new HashSet<>();
parameters.add(createParameterEntity("A", null, false, "${a}"));
parameters.add(createParameterEntity("A Replace With 5", null, false, "${a:replaceNull(5)}"));
parameters.add(createParameterEntity("Eleven A", null, false, "11${a}"));
final ParameterContextEntity contextEntity = createParameterContextEntity(getTestName(), null, parameters);
final ParameterContextEntity createdContextEntity = getNifiClient().getParamContextClient().createParamContext(contextEntity);
setParameterContext("root", createdContextEntity);
waitForValidProcessor(generate.getId());
waitForValidProcessor(evaluate.getId());
getClientUtil().startProcessGroupComponents("root");
waitFor(() -> {
try {
return getClientUtil().getCountersAsMap(evaluate.getId()).get("flowfile") == getNumberOfNodes();
} catch (final Exception e) {
return false;
}
});
final Map<String, Long> counters = getClientUtil().getCountersAsMap(evaluate.getId());
assertEquals(getNumberOfNodes(), counters.get("flowfile").longValue());
assertEquals(5L * getNumberOfNodes(), counters.get("variable.registry").longValue()); // Since no value present in variable registry, will replace null with 5.
assertFalse(counters.containsKey("no.el.evaluation")); // Should not be evaluated
// Update parameters so that Eleven A has the value 11 without evaluating EL.
final Set<ParameterEntity> updatedParameters = new HashSet<>();
updatedParameters.add(createParameterEntity("A", null, false, "${a}"));
updatedParameters.add(createParameterEntity("A Replace With 5", null, false, "${a:replaceNull(5)}"));
updatedParameters.add(createParameterEntity("Eleven A", null, false, "11"));
final ParameterContextEntity updatedContextEntity = createParameterContextEntity(getTestName() + "-2", null, updatedParameters);
final ParameterContextEntity secondContextEntity = getNifiClient().getParamContextClient().createParamContext(updatedContextEntity);
// Stop process group so we can change the Parameter Context, then change the context and restart.
getClientUtil().stopProcessGroupComponents("root");
setParameterContext("root", secondContextEntity);
getClientUtil().startProcessGroupComponents("root");
// Wait for the 'no.el.evaluation' counter to be set
waitFor(() -> {
try {
return getClientUtil().getCountersAsMap(evaluate.getId()).get("no.el.evaluation") == 11 * getNumberOfNodes();
} catch (final Exception e) {
return false;
}
});
}
@Test
public void testParameterWithOptionalProperty() throws NiFiClientException, IOException, InterruptedException {
final ProcessorEntity generate = getClientUtil().createProcessor("GenerateFlowFile");
getClientUtil().updateProcessorSchedulingPeriod(generate, "10 min");
final Map<String, String> generateProperties = new HashMap<>();
generateProperties.put("Text", "#{Text}");
generateProperties.put("File Size", "1 KB");
getClientUtil().updateProcessorProperties(generate, generateProperties);
final ProcessorEntity writeFile = getClientUtil().createProcessor("WriteToFile");
final File file = new File("target/testParameterWithOptionalProperty.txt");
getClientUtil().updateProcessorProperties(writeFile, Collections.singletonMap("Filename", file.getAbsolutePath()));
getClientUtil().createConnection(generate, writeFile, "success");
getClientUtil().setAutoTerminatedRelationships(writeFile, new HashSet<>(Arrays.asList("success", "failure")));
final Set<ParameterEntity> parameters = new HashSet<>();
final ParameterContextEntity contextEntity = createParameterContextEntity(getTestName(), null, parameters);
final ParameterContextEntity createdContextEntity = getNifiClient().getParamContextClient().createParamContext(contextEntity);
setParameterContext("root", createdContextEntity);
// Processor should be invalid because it references a Parameter (Text) that does not exist
waitForInvalidProcessor(generate.getId());
// Update the Parameter Context to add new parameter but with null value.
final ParameterEntity nullText = createParameterEntity("Text", "Text", false, null);
createdContextEntity.getComponent().getParameters().add(nullText);
getNifiClient().getParamContextClient().updateParamContext(createdContextEntity);
waitForValidProcessor(generate.getId());
getClientUtil().startProcessGroupComponents("root");
// Ensure that the file is written with a file size of 1 KB
waitFor(() -> {
try {
return file.exists() && file.length() == 1024;
} catch (final Exception e) {
return false;
}
});
// Update Parameter to have a specific value
createdContextEntity.getComponent().getParameters().remove(nullText);
final String customText = "Some Custom Text";
createdContextEntity.getComponent().getParameters().add(createParameterEntity("Text", "Text", false, customText));
getNifiClient().getParamContextClient().updateParamContext(createdContextEntity);
// Wait for file to be written out
waitFor(() -> {
try {
final boolean correctSize = file.exists() && file.length() == customText.length();
if (!correctSize) {
return false;
}
final List<String> lines = Files.readAllLines(file.toPath());
if (lines.size() != 1) {
return false;
}
return customText.equals(lines.get(0));
} catch (final Exception e) {
return false;
}
});
}
@Test
public void testProcessorStartedAfterLongValidationPeriod() throws NiFiClientException, IOException, InterruptedException {
final ParameterContextEntity createdContextEntity = createParameterContext("sleep", "6 secs");
// Set the Parameter Context on the root Process Group
setParameterContext("root", createdContextEntity);
// Create a Processor and update it to reference Parameter "name"
ProcessorEntity processorEntity = createProcessor(TEST_PROCESSORS_PACKAGE + ".Sleep", NIFI_GROUP_ID, TEST_EXTENSIONS_ARTIFACT_ID, getNiFiVersion());
final String processorId = processorEntity.getId();
// Update processor to reference Parameter "name"
final ProcessorConfigDTO config = processorEntity.getComponent().getConfig();
config.setProperties(Collections.singletonMap("Validate Sleep Time", "#{sleep}"));
config.setAutoTerminatedRelationships(Collections.singleton("success"));
getNifiClient().getProcessorClient().updateProcessor(processorEntity);
waitForValidProcessor(processorId);
// Start Processors
getNifiClient().getProcessorClient().startProcessor(processorId, processorEntity.getRevision().getClientId(), 1);
try {
// Update Parameter Context to a long validation time.
final ParameterContextUpdateRequestEntity updateRequestEntity = updateParameterContext(createdContextEntity, "sleep", "6 sec");
final Set<AffectedComponentEntity> affectedComponents = updateRequestEntity.getRequest().getReferencingComponents();
assertEquals(1, affectedComponents.size());
final String affectedComponentId = affectedComponents.iterator().next().getId();
assertEquals(processorId, affectedComponentId);
getClientUtil().waitForParameterContextRequestToComplete(createdContextEntity.getId(), updateRequestEntity.getRequest().getRequestId());
waitForRunningProcessor(processorId);
} finally {
// Ensure that we stop the processor so that other tests are allowed to change the Parameter Context, etc.
getNifiClient().getProcessorClient().stopProcessor(processorId, processorEntity.getRevision().getClientId(), 2);
waitForStoppedProcessor(processorId);
getNifiClient().getProcessorClient().deleteProcessor(processorId, processorEntity.getRevision().getClientId(), 3);
}
}
@Test
public void testProcessorRestartedAfterLongDependentServiceValidationPeriod() throws NiFiClientException, IOException, InterruptedException {
final ParameterContextEntity createdContextEntity = createParameterContext("sleep", "0 secs");
// Set the Parameter Context on the root Process Group
setParameterContext("root", createdContextEntity);
final ControllerServiceEntity serviceEntity = createControllerService(TEST_CS_PACKAGE + ".StandardSleepService", "root", NIFI_GROUP_ID, TEST_EXTENSIONS_ARTIFACT_ID, getNiFiVersion());
final String serviceId = serviceEntity.getId();
// Set service's sleep time to the parameter.
serviceEntity.getComponent().setProperties(Collections.singletonMap("Validate Sleep Time", "#{sleep}"));
getNifiClient().getControllerServicesClient().updateControllerService(serviceEntity);
getClientUtil().enableControllerService(serviceEntity);
try {
// Create a Processor
ProcessorEntity processorEntity = createProcessor(TEST_PROCESSORS_PACKAGE + ".Sleep", NIFI_GROUP_ID, TEST_EXTENSIONS_ARTIFACT_ID, getNiFiVersion());
final String processorId = processorEntity.getId();
processorEntity.getComponent().getConfig().setProperties(Collections.singletonMap("Sleep Service", serviceId));
processorEntity.getComponent().getConfig().setAutoTerminatedRelationships(Collections.singleton("success"));
getNifiClient().getProcessorClient().updateProcessor(processorEntity);
getNifiClient().getProcessorClient().startProcessor(processorId, processorEntity.getRevision().getClientId(), 1L);
try {
final ParameterContextUpdateRequestEntity requestEntity = updateParameterContext(createdContextEntity, "sleep", "6 secs");
final Set<AffectedComponentEntity> affectedComponentEntities = requestEntity.getRequest().getReferencingComponents();
assertEquals(2, affectedComponentEntities.size());
final Set<String> affectedComponentIds = affectedComponentEntities.stream()
.map(AffectedComponentEntity::getId)
.collect(Collectors.toSet());
assertTrue(affectedComponentIds.contains(serviceId));
assertTrue(affectedComponentIds.contains(processorId));
waitForRunningProcessor(processorId);
} finally {
getNifiClient().getProcessorClient().stopProcessor(processorId, processorEntity.getRevision().getClientId(), 1L);
waitForStoppedProcessor(processorId);
getNifiClient().getProcessorClient().deleteProcessor(processorId, processorEntity.getRevision().getClientId(), 3);
}
} finally {
getClientUtil().disableControllerService(serviceEntity);
getNifiClient().getControllerServicesClient().deleteControllerService(serviceEntity);
}
}
@Test
public void testParamChangeWhileReferencingControllerServiceEnabling() throws NiFiClientException, IOException, InterruptedException {
final ParameterContextEntity createdContextEntity = createParameterContext("sleep", "7 sec");
// Set the Parameter Context on the root Process Group
setParameterContext("root", createdContextEntity);
final ControllerServiceEntity serviceEntity = createControllerService(TEST_CS_PACKAGE + ".StandardSleepService", "root", NIFI_GROUP_ID, TEST_EXTENSIONS_ARTIFACT_ID, getNiFiVersion());
// Set service's sleep time to the parameter.
serviceEntity.getComponent().setProperties(Collections.singletonMap("@OnEnabled Sleep Time", "#{sleep}"));
getNifiClient().getControllerServicesClient().updateControllerService(serviceEntity);
// Enable the service. It should take 7 seconds for the service to fully enable.
getClientUtil().enableControllerService(serviceEntity);
// Wait for the service to reach of state of ENABLING but not enabled. We want to change the parameter that it references while it's enabling.
getClientUtil().waitForControllerServiceState(serviceEntity.getParentGroupId(), "ENABLING", Collections.emptyList());
// While the service is enabling, change the parameter
final ParameterContextUpdateRequestEntity paramUpdateRequestEntity = updateParameterContext(createdContextEntity, "sleep", "1 sec");
// Wait for the update to complete
getClientUtil().waitForParameterContextRequestToComplete(createdContextEntity.getId(), paramUpdateRequestEntity.getRequest().getRequestId());
}
@Test
public void testParamChangeWhileReferencingControllerServiceDisabling() throws NiFiClientException, IOException, InterruptedException {
testParamChangeWhileReferencingControllerServiceDisabling(true);
}
@Test
public void testParamChangeWhileReferencingControllerServiceEnabled() throws NiFiClientException, IOException, InterruptedException {
testParamChangeWhileReferencingControllerServiceDisabling(false);
}
private void testParamChangeWhileReferencingControllerServiceDisabling(final boolean disableServiceBeforeUpdate) throws NiFiClientException, IOException, InterruptedException {
final ParameterContextEntity createdContextEntity = createParameterContext("sleep", "7 sec");
// Set the Parameter Context on the root Process Group
final ProcessGroupEntity childGroup = getClientUtil().createProcessGroup("child", "root");
setParameterContext(childGroup.getId(), createdContextEntity);
final ControllerServiceEntity serviceEntity = createControllerService(TEST_CS_PACKAGE + ".StandardSleepService", childGroup.getId(),
NIFI_GROUP_ID, TEST_EXTENSIONS_ARTIFACT_ID, getNiFiVersion());
// Set service's sleep time to the parameter.
serviceEntity.getComponent().setProperties(Collections.singletonMap("@OnDisabled Sleep Time", "#{sleep}"));
getNifiClient().getControllerServicesClient().updateControllerService(serviceEntity);
// Enable the service.
getClientUtil().enableControllerService(serviceEntity);
// Wait for the service to reach of state of ENABLED.
getClientUtil().waitForControllerServiceState(serviceEntity.getParentGroupId(), "ENABLED", Collections.emptyList());
if (disableServiceBeforeUpdate) {
// Disable the service.
getClientUtil().disableControllerService(serviceEntity);
// Wait for service to reach state of DISABLING but not DISABLED. We want to change the parameter that it references while it's disabling.
getClientUtil().waitForControllerServiceState(serviceEntity.getParentGroupId(), "DISABLING", Collections.emptyList());
}
// Change the parameter
final ParameterContextUpdateRequestEntity paramUpdateRequestEntity = updateParameterContext(createdContextEntity, "sleep", "1 sec");
// Wait for the update to complete
getClientUtil().waitForParameterContextRequestToComplete(createdContextEntity.getId(), paramUpdateRequestEntity.getRequest().getRequestId());
}
@Test
public void testParamChangeWhileReferencingProcessorStartingButInvalid() throws NiFiClientException, IOException, InterruptedException {
final ParameterContextEntity contextEntity = createParameterContext("clone", "true");
// Set the Parameter Context on the root Process Group
setParameterContext("root", contextEntity);
// Create simple dataflow: GenerateFlowFile -> SplitByLine -> <auto-terminate>
// Set SplitByLine to use a parameter for the "Use Clone" property such that it's valid.
ProcessorEntity generate = getClientUtil().createProcessor("GenerateFlowFile");
ProcessorEntity splitByLine = getClientUtil().createProcessor("SplitByLine");
getClientUtil().updateProcessorProperties(splitByLine, Collections.singletonMap("Use Clone", "#{clone}"));
getClientUtil().setAutoTerminatedRelationships(splitByLine, Collections.singleton("success"));
getClientUtil().createConnection(generate, splitByLine, "success");
getNifiClient().getProcessorClient().startProcessor(splitByLine);
// Change parameter to an invalid value. This will result in the processor being stopped, becoming invalid, and then being transitioned to a 'starting' state while invalid.
final ParameterContextUpdateRequestEntity updateToInvalidRequestEntity = updateParameterContext(contextEntity, "clone", "invalid");
getClientUtil().waitForParameterContextRequestToComplete(contextEntity.getId(), updateToInvalidRequestEntity.getRequest().getRequestId());
// Change back to a valid value and wait for the update to complete
final ParameterContextUpdateRequestEntity updateToValidRequestEntity = updateParameterContext(contextEntity, "clone", "true");
getClientUtil().waitForParameterContextRequestToComplete(contextEntity.getId(), updateToValidRequestEntity.getRequest().getRequestId());
}
@Test
public void testProcessorRestartedWhenParameterChanged() throws NiFiClientException, IOException, InterruptedException {
testProcessorRestartedWhenParameterChanged("#{name}");
}
@Test
public void testProcessorRestartedWhenParameterChangedWhenReferencedThroughEL() throws NiFiClientException, IOException, InterruptedException {
testProcessorRestartedWhenParameterChanged("${'hello':equals(#{name})}");
}
private void testProcessorRestartedWhenParameterChanged(final String propertyValue) throws NiFiClientException, IOException, InterruptedException {
final Set<ParameterEntity> parameters = new HashSet<>();
parameters.add(createParameterEntity("name", null, false, "bar"));
final ParameterContextEntity contextEntity = createParameterContextEntity(getTestName(), null, parameters);
final ParameterContextEntity createdContextEntity = getNifiClient().getParamContextClient().createParamContext(contextEntity);
// Set the Parameter Context on the root Process Group
setParameterContext("root", createdContextEntity);
// Create a Processor and update it to reference Parameter "name"
ProcessorEntity processorEntity = createProcessor(TEST_PROCESSORS_PACKAGE + ".CountEvents", NIFI_GROUP_ID, TEST_EXTENSIONS_ARTIFACT_ID, getNiFiVersion());
final String processorId = processorEntity.getId();
// Update processor to reference Parameter "name"
getClientUtil().updateProcessorProperties(processorEntity, Collections.singletonMap("Name", propertyValue));
waitForValidProcessor(processorId);
// Create another processor, and start it. We will not reference any Parameters with this one.
final ProcessorEntity secondProcessorEntity = createProcessor(TEST_PROCESSORS_PACKAGE + ".CountEvents", NIFI_GROUP_ID, TEST_EXTENSIONS_ARTIFACT_ID, getNiFiVersion());
// Start Processors
getNifiClient().getProcessorClient().startProcessor(processorEntity.getId(), processorEntity.getRevision().getClientId(), 1L);
getNifiClient().getProcessorClient().startProcessor(secondProcessorEntity.getId(), secondProcessorEntity.getRevision().getClientId(), 1L);
Map<String, Long> counterValues = waitForCounter(processorEntity.getId(), "Scheduled", getNumberOfNodes());
assertFalse(counterValues.containsKey("Stopped"));
final Set<ParameterEntity> createdParameters = createdContextEntity.getComponent().getParameters();
createdParameters.clear();
createdParameters.add(createParameterEntity("name", "Changed Value from bar to baz", false, "baz"));
final ParameterContextUpdateRequestEntity updateRequestEntity = getNifiClient().getParamContextClient().updateParamContext(createdContextEntity);
final String requestId = updateRequestEntity.getRequest().getRequestId();
// Ensure that the Processor is the only Affected Component.
final Set<AffectedComponentEntity> affectedComponents = updateRequestEntity.getRequest().getReferencingComponents();
assertEquals(1, affectedComponents.size());
final AffectedComponentEntity affectedComponentEntity = affectedComponents.iterator().next();
assertEquals(processorEntity.getId(), affectedComponentEntity.getId());
// Wait for the update to complete
getClientUtil().waitForParameterContextRequestToComplete(createdContextEntity.getId(), requestId);
// Delete the update request
getNifiClient().getParamContextClient().deleteParamContextUpdateRequest(createdContextEntity.getId(), requestId);
// Ensure that the Processor is running
processorEntity = getNifiClient().getProcessorClient().getProcessor(processorId);
assertEquals("RUNNING", processorEntity.getComponent().getState());
// Ensure that it has been stopped once and started twice (i.e., it has been restarted). The counters may not immediately
// reflect that the Processor has been scheduled twice, depending on timing, so loop while waiting for this to happen.
counterValues = getCountersAsMap(processorEntity.getId());
assertEquals(getNumberOfNodes(), counterValues.get("Stopped").longValue());
waitForCounter(processorEntity.getId(), "Scheduled", getNumberOfNodes() * 2);
// Ensure that the other Processor has been scheduled only once and not stopped.
counterValues = getCountersAsMap(secondProcessorEntity.getId());
assertFalse(counterValues.containsKey("Stopped"));
assertEquals(getNumberOfNodes(), counterValues.get("Scheduled").longValue());
}
private Map<String, Long> waitForCounter(final String context, final String counterName, final long expectedValue) throws NiFiClientException, IOException, InterruptedException {
return getClientUtil().waitForCounter(context, counterName, expectedValue);
}
private Map<String, Long> getCountersAsMap(final String processorId) throws NiFiClientException, IOException {
return getClientUtil().getCountersAsMap(processorId);
}
private ProcessorEntity createProcessor(final String type, final String groupId, final String artifactId, final String version) throws NiFiClientException, IOException {
return getClientUtil().createProcessor(type, groupId, artifactId, version);
}
public ControllerServiceEntity createControllerService(final String type, final String processGroupId, final String bundleGroupId, final String artifactId, final String version)
throws NiFiClientException, IOException {
return getClientUtil().createControllerService(type, processGroupId, bundleGroupId, artifactId, version);
}
public ParameterEntity createParameterEntity(final String name, final String description, final boolean sensitive, final String value) {
return getClientUtil().createParameterEntity(name, description, sensitive, value);
}
public ParameterContextEntity createParameterContextEntity(final String name, final String description, final Set<ParameterEntity> parameters) {
return getClientUtil().createParameterContextEntity(name, description, parameters);
}
private ProcessGroupEntity setParameterContext(final String groupId, final ParameterContextEntity parameterContext) throws NiFiClientException, IOException {
return getClientUtil().setParameterContext(groupId, parameterContext);
}
public ParameterContextEntity createParameterContext(final String parameterName, final String parameterValue) throws NiFiClientException, IOException {
return createParameterContext(Collections.singletonMap(parameterName, parameterValue));
}
public ParameterContextEntity createParameterContext(final Map<String, String> parameters) throws NiFiClientException, IOException {
return getClientUtil().createParameterContext(getTestName(), parameters);
}
public ParameterContextUpdateRequestEntity updateParameterContext(final ParameterContextEntity existingEntity, final String paramName, final String paramValue)
throws NiFiClientException, IOException {
return updateParameterContext(existingEntity, Collections.singletonMap(paramName, paramValue));
}
public ParameterContextUpdateRequestEntity updateParameterContext(final ParameterContextEntity existingEntity, final Map<String, String> parameters) throws NiFiClientException, IOException {
return getClientUtil().updateParameterContext(existingEntity, parameters);
}
private void waitForValidProcessor(String id) throws InterruptedException, IOException, NiFiClientException {
getClientUtil().waitForValidProcessor(id);
}
private void waitForInvalidProcessor(String id) throws NiFiClientException, IOException, InterruptedException {
getClientUtil().waitForInvalidProcessor(id);
}
private void waitForRunningProcessor(final String processorId) throws InterruptedException, IOException, NiFiClientException {
getClientUtil().waitForRunningProcessor(processorId);
}
private void waitForStoppedProcessor(final String processorId) throws InterruptedException, IOException, NiFiClientException {
getClientUtil().waitForStoppedProcessor(processorId);
}
}