NIFI-8774: Fixed NullPointerExceptions
Signed-off-by: Matthew Burgess <mattyb149@apache.org>
This closes #5208
diff --git a/nifi-nar-bundles/nifi-framework-bundle/nifi-framework/nifi-framework-components/src/main/java/org/apache/nifi/groups/StandardProcessGroup.java b/nifi-nar-bundles/nifi-framework-bundle/nifi-framework/nifi-framework-components/src/main/java/org/apache/nifi/groups/StandardProcessGroup.java
index 1a7bd2d..eb5b492 100644
--- a/nifi-nar-bundles/nifi-framework-bundle/nifi-framework/nifi-framework-components/src/main/java/org/apache/nifi/groups/StandardProcessGroup.java
+++ b/nifi-nar-bundles/nifi-framework-bundle/nifi-framework/nifi-framework-components/src/main/java/org/apache/nifi/groups/StandardProcessGroup.java
@@ -164,7 +164,6 @@
import java.util.UUID;
import java.util.concurrent.Callable;
import java.util.concurrent.CompletableFuture;
-import java.util.concurrent.ConcurrentHashMap;
import java.util.concurrent.Future;
import java.util.concurrent.TimeUnit;
import java.util.concurrent.atomic.AtomicBoolean;
@@ -229,7 +228,8 @@
private volatile FlowFileOutboundPolicy flowFileOutboundPolicy = FlowFileOutboundPolicy.STREAM_WHEN_AVAILABLE;
private volatile BatchCounts batchCounts = new NoOpBatchCounts();
private final DataValve dataValve;
- private final Map<String, String> niFiPropertiesBackPressure;
+ private final Long nifiPropertiesBackpressureCount;
+ private final String nifiPropertiesBackpressureSize;
private final ReentrantReadWriteLock rwLock = new ReentrantReadWriteLock();
private final Lock readLock = rwLock.readLock();
@@ -237,21 +237,15 @@
private static final Logger LOG = LoggerFactory.getLogger(StandardProcessGroup.class);
private static final String DEFAULT_FLOWFILE_EXPIRATION = "0 sec";
- private static final String DEFAULT_BACKPRESSURE_OBJECT = "0";
- private static final String DEFAULT_BACKPRESSURE_DATA_SIZE = "0 GB";
+ private static final long DEFAULT_BACKPRESSURE_OBJECT = 10_000L;
+ private static final String DEFAULT_BACKPRESSURE_DATA_SIZE = "1 GB";
+
public StandardProcessGroup(final String id, final ControllerServiceProvider serviceProvider, final ProcessScheduler scheduler,
final PropertyEncryptor encryptor, final ExtensionManager extensionManager,
final StateManagerProvider stateManagerProvider, final FlowManager flowManager, final FlowRegistryClient flowRegistryClient,
- final ReloadComponent reloadComponent, final MutableVariableRegistry variableRegistry, final NodeTypeProvider nodeTypeProvider) {
- this(id, serviceProvider, scheduler, encryptor, extensionManager, stateManagerProvider, flowManager, flowRegistryClient,
- reloadComponent, variableRegistry, nodeTypeProvider, null);
- }
- public StandardProcessGroup(final String id, final ControllerServiceProvider serviceProvider, final ProcessScheduler scheduler,
- final PropertyEncryptor encryptor, final ExtensionManager extensionManager,
- final StateManagerProvider stateManagerProvider, final FlowManager flowManager, final FlowRegistryClient flowRegistryClient,
final ReloadComponent reloadComponent, final MutableVariableRegistry variableRegistry, final NodeTypeProvider nodeTypeProvider,
- final NiFiProperties niFiProperties) {
+ final NiFiProperties nifiProperties) {
this.id = id;
this.controllerServiceProvider = serviceProvider;
@@ -276,14 +270,36 @@
this.defaultFlowFileExpiration = new AtomicReference<>();
this.defaultBackPressureObjectThreshold = new AtomicReference<>();
this.defaultBackPressureDataSizeThreshold = new AtomicReference<>();
+
// save only the nifi properties needed, and account for the possibility those properties are missing
- niFiPropertiesBackPressure = new ConcurrentHashMap<>();
- niFiPropertiesBackPressure.put(NiFiProperties.BACKPRESSURE_COUNT,
- niFiProperties.getProperty(NiFiProperties.BACKPRESSURE_COUNT) == null ? DEFAULT_BACKPRESSURE_OBJECT : niFiProperties.getProperty(NiFiProperties.BACKPRESSURE_COUNT));
- niFiPropertiesBackPressure.put(NiFiProperties.BACKPRESSURE_SIZE,
- niFiProperties.getProperty(NiFiProperties.BACKPRESSURE_SIZE) == null ? DEFAULT_BACKPRESSURE_DATA_SIZE : niFiProperties.getProperty(NiFiProperties.BACKPRESSURE_SIZE));
+ if (nifiProperties == null) {
+ nifiPropertiesBackpressureCount = DEFAULT_BACKPRESSURE_OBJECT;
+ nifiPropertiesBackpressureSize = DEFAULT_BACKPRESSURE_DATA_SIZE;
+ } else {
+ // Validate the property values.
+ Long count;
+ try {
+ final String explicitValue = nifiProperties.getProperty(NiFiProperties.BACKPRESSURE_COUNT, String.valueOf(DEFAULT_BACKPRESSURE_OBJECT));
+ count = Long.parseLong(explicitValue);
+ } catch (final Exception e) {
+ LOG.warn("nifi.properties has an invalid value for the '" + NiFiProperties.BACKPRESSURE_COUNT + "' property. Using default value instaed.");
+ count = DEFAULT_BACKPRESSURE_OBJECT;
+ }
+ nifiPropertiesBackpressureCount = count;
+
+ String size;
+ try {
+ size = nifiProperties.getProperty(NiFiProperties.BACKPRESSURE_SIZE, DEFAULT_BACKPRESSURE_DATA_SIZE);
+ DataUnit.parseDataSize(size, DataUnit.B);
+ } catch (final Exception e) {
+ LOG.warn("nifi.properties has an invalid value for the '" + NiFiProperties.BACKPRESSURE_SIZE + "' property. Using default value instaed.");
+ size = DEFAULT_BACKPRESSURE_DATA_SIZE;
+ }
+ nifiPropertiesBackpressureSize = size;
+ }
}
+
@Override
public ProcessGroup getParent() {
return parent.get();
@@ -5642,15 +5658,9 @@
public void setDefaultBackPressureObjectThreshold(final Long defaultBackPressureObjectThreshold) {
// use default if value not provided
if (defaultBackPressureObjectThreshold == null) {
- this.defaultBackPressureObjectThreshold.set(Long.parseLong(niFiPropertiesBackPressure.get(NiFiProperties.BACKPRESSURE_COUNT)));
+ this.defaultBackPressureObjectThreshold.set(nifiPropertiesBackpressureCount);
} else {
- // Validate field is numeric
- Pattern pattern = Pattern.compile("(\\d+)");
- if (pattern.matcher(String.valueOf(defaultBackPressureObjectThreshold)).matches()) {
- this.defaultBackPressureObjectThreshold.set(defaultBackPressureObjectThreshold);
- } else {
- throw new IllegalArgumentException("The Default Back Pressure Object Threshold of the process group must be numeric.");
- }
+ this.defaultBackPressureObjectThreshold.set(defaultBackPressureObjectThreshold);
}
}
@@ -5659,28 +5669,23 @@
// Use value in this object if it has been set. Otherwise, inherit from parent group; if at root group, obtain from nifi properties.
if (defaultBackPressureObjectThreshold.get() == null) {
if (isRootGroup()) {
- return Long.parseLong(niFiPropertiesBackPressure.get(NiFiProperties.BACKPRESSURE_COUNT));
+ return nifiPropertiesBackpressureCount;
} else {
return getParent().getDefaultBackPressureObjectThreshold();
}
}
- return defaultBackPressureObjectThreshold.get();
+
+ return defaultBackPressureObjectThreshold.get();
}
@Override
public void setDefaultBackPressureDataSizeThreshold(final String defaultBackPressureDataSizeThreshold) {
// use default if value not provided
if (StringUtils.isBlank(defaultBackPressureDataSizeThreshold)) {
- this.defaultBackPressureDataSizeThreshold.set(niFiPropertiesBackPressure.get(NiFiProperties.BACKPRESSURE_SIZE));
+ this.defaultBackPressureDataSizeThreshold.set(nifiPropertiesBackpressureSize);
} else {
- // Validate entry: must include size unit label
- Pattern pattern = Pattern.compile(DataUnit.DATA_SIZE_REGEX);
- String caseAdjustedSizeThreshold = defaultBackPressureDataSizeThreshold.toUpperCase();
- if (pattern.matcher(caseAdjustedSizeThreshold).matches()) {
- this.defaultBackPressureDataSizeThreshold.set(caseAdjustedSizeThreshold);
- } else {
- throw new IllegalArgumentException("The Default Back Pressure Data Size Threshold of the process group must contain a valid data size unit.");
- }
+ DataUnit.parseDataSize(defaultBackPressureDataSizeThreshold, DataUnit.B);
+ this.defaultBackPressureDataSizeThreshold.set(defaultBackPressureDataSizeThreshold.toUpperCase());
}
}
@@ -5689,11 +5694,12 @@
// Use value in this object if it has been set. Otherwise, inherit from parent group; if at root group, obtain from nifi properties.
if (StringUtils.isEmpty(defaultBackPressureDataSizeThreshold.get())) {
if (isRootGroup()) {
- return niFiPropertiesBackPressure.get(NiFiProperties.BACKPRESSURE_SIZE);
+ return nifiPropertiesBackpressureSize;
} else {
return parent.get().getDefaultBackPressureDataSizeThreshold();
}
}
+
return defaultBackPressureDataSizeThreshold.get();
}
}
diff --git a/nifi-stateless/nifi-stateless-bundle/nifi-stateless-engine/src/main/java/org/apache/nifi/stateless/engine/StatelessFlowManager.java b/nifi-stateless/nifi-stateless-bundle/nifi-stateless-engine/src/main/java/org/apache/nifi/stateless/engine/StatelessFlowManager.java
index cccd80b..b132e17 100644
--- a/nifi-stateless/nifi-stateless-bundle/nifi-stateless-engine/src/main/java/org/apache/nifi/stateless/engine/StatelessFlowManager.java
+++ b/nifi-stateless/nifi-stateless-bundle/nifi-stateless-engine/src/main/java/org/apache/nifi/stateless/engine/StatelessFlowManager.java
@@ -216,7 +216,8 @@
statelessEngine.getFlowRegistryClient(),
statelessEngine.getReloadComponent(),
mutableVariableRegistry,
- new StatelessNodeTypeProvider());
+ new StatelessNodeTypeProvider(),
+ null);
}
@Override
@@ -242,6 +243,7 @@
.source(requireNonNull(source))
.destination(destination)
.flowFileQueueFactory(flowFileQueueFactory)
+ .processGroup(destination.getProcessGroup())
.build();
return connection;