NIFI-9183: Add a command-line option to save status history
Fixed typo error.
Fixed error.
Fixed review items.
Fix contrib-check. Added missing test file to excludes.
Fix review items.
Fix review items: error handling, input validation, added more unit tests.
Improved status history repository creation in HeadlessNiFiServer.
diff --git a/nifi-bootstrap/src/main/java/org/apache/nifi/bootstrap/RunNiFi.java b/nifi-bootstrap/src/main/java/org/apache/nifi/bootstrap/RunNiFi.java
index 7d3ecc8..8abc83c 100644
--- a/nifi-bootstrap/src/main/java/org/apache/nifi/bootstrap/RunNiFi.java
+++ b/nifi-bootstrap/src/main/java/org/apache/nifi/bootstrap/RunNiFi.java
@@ -18,6 +18,7 @@
import org.apache.commons.lang3.StringUtils;
import org.apache.nifi.bootstrap.notification.NotificationType;
+import org.apache.nifi.bootstrap.util.DumpFileValidator;
import org.apache.nifi.bootstrap.util.OSUtils;
import org.apache.nifi.bootstrap.util.SecureNiFiConfigUtil;
import org.apache.nifi.util.file.FileUtils;
@@ -44,6 +45,7 @@
import java.nio.file.FileAlreadyExistsException;
import java.nio.file.FileSystems;
import java.nio.file.Files;
+import java.nio.file.InvalidPathException;
import java.nio.file.Path;
import java.nio.file.Paths;
import java.nio.file.attribute.FileAttribute;
@@ -90,6 +92,7 @@
public static final String DEFAULT_JAVA_CMD = "java";
public static final String DEFAULT_PID_DIR = "bin";
public static final String DEFAULT_LOG_DIR = "./logs";
+ public static final String DEFAULT_STATUS_HISTORY_DAYS = "1";
public static final String GRACEFUL_SHUTDOWN_PROP = "graceful.shutdown.seconds";
public static final String DEFAULT_GRACEFUL_SHUTDOWN_VALUE = "20";
@@ -119,9 +122,12 @@
public static final String DUMP_CMD = "DUMP";
public static final String DIAGNOSTICS_CMD = "DIAGNOSTICS";
public static final String IS_LOADED_CMD = "IS_LOADED";
+ public static final String STATUS_HISTORY_CMD = "STATUS_HISTORY";
private static final int UNINITIALIZED_CC_PORT = -1;
+ private static final int INVALID_CMD_ARGUMENT = -1;
+
private volatile boolean autoRestartNiFi = true;
private volatile int ccPort = UNINITIALIZED_CC_PORT;
private volatile long nifiPid = -1L;
@@ -175,7 +181,9 @@
System.out.println("Status : Determine if there is a running instance of Apache NiFi");
System.out.println("Dump : Write a Thread Dump to the file specified by [options], or to the log if no file is given");
System.out.println("Diagnostics : Write diagnostic information to the file specified by [options], or to the log if no file is given. The --verbose flag may be provided as an option before " +
- "the filename, which may result in additional diagnostic information being written.");
+ "the filename, which may result in additional diagnostic information being written.");
+ System.out.println("Status-history : Save the status history to the file specified by [options]. The expected command parameters are: " +
+ "status-history <number of days> <dumpFile>. The <number of days> parameter is optional and defaults to 1 day.");
System.out.println("Run : Start a new instance of Apache NiFi and monitor the Process, restarting if the instance dies");
System.out.println();
}
@@ -192,6 +200,7 @@
File dumpFile = null;
boolean verbose = false;
+ String statusHistoryDays = null;
final String cmd = args[0];
if (cmd.equalsIgnoreCase("dump")) {
@@ -216,6 +225,41 @@
dumpFile = null;
verbose = false;
}
+ } else if (cmd.equalsIgnoreCase("status-history")) {
+ if (args.length < 2) {
+ System.err.printf("Wrong number of arguments: %d instead of 1 or 2, the command parameters are: " +
+ "status-history <number of days> <dumpFile>%n", 0);
+ System.exit(INVALID_CMD_ARGUMENT);
+ }
+ if (args.length == 3) {
+ statusHistoryDays = args[1];
+ try {
+ final int numberOfDays = Integer.parseInt(statusHistoryDays);
+ if (numberOfDays < 1) {
+ System.err.println("The <number of days> parameter must be positive and greater than zero. The command parameters are:" +
+ " status-history <number of days> <dumpFile>");
+ System.exit(INVALID_CMD_ARGUMENT);
+ }
+ } catch (NumberFormatException e) {
+ System.err.println("The <number of days> parameter value is not a number. The command parameters are: status-history <number of days> <dumpFile>");
+ System.exit(INVALID_CMD_ARGUMENT);
+ }
+ try {
+ Paths.get(args[2]);
+ } catch (InvalidPathException e) {
+ System.err.println("Invalid filename. The command parameters are: status-history <number of days> <dumpFile>");
+ System.exit(INVALID_CMD_ARGUMENT);
+ }
+ dumpFile = new File(args[2]);
+ } else {
+ final boolean isValid = DumpFileValidator.validate(args[1]);
+ if (isValid) {
+ statusHistoryDays = DEFAULT_STATUS_HISTORY_DAYS;
+ dumpFile = new File(args[1]);
+ } else {
+ System.exit(INVALID_CMD_ARGUMENT);
+ }
+ }
}
switch (cmd.toLowerCase()) {
@@ -227,6 +271,7 @@
case "is_loaded":
case "dump":
case "diagnostics":
+ case "status-history":
case "restart":
case "env":
break;
@@ -272,6 +317,9 @@
case "diagnostics":
runNiFi.diagnostics(dumpFile, verbose);
break;
+ case "status-history":
+ runNiFi.statusHistory(dumpFile, statusHistoryDays);
+ break;
case "env":
runNiFi.env();
break;
@@ -729,6 +777,17 @@
makeRequest(DUMP_CMD, null, dumpFile, "thread dump");
}
+ /**
+ * Writes NiFi status history information to the given file.
+ *
+ * @param dumpFile the file to write the dump content to
+ * @throws IOException if any issues occur while writing the dump file
+ */
+ public void statusHistory(final File dumpFile, final String days) throws IOException {
+ // Due to input validation, the dumpFile cannot currently be null in this scenario.
+ makeRequest(STATUS_HISTORY_CMD, days, dumpFile, "status history information");
+ }
+
private boolean isNiFiFullyLoaded() throws IOException, NiFiNotRunningException {
final Logger logger = defaultLogger;
final Integer port = getCurrentPort(logger);
@@ -752,6 +811,7 @@
final Logger logger = defaultLogger; // dump to bootstrap log file by default
final Integer port = getCurrentPort(logger);
if (port == null) {
+ cmdLogger.info("Apache NiFi is not currently running");
logger.info("Apache NiFi is not currently running");
return;
}
diff --git a/nifi-bootstrap/src/main/java/org/apache/nifi/bootstrap/util/DumpFileValidator.java b/nifi-bootstrap/src/main/java/org/apache/nifi/bootstrap/util/DumpFileValidator.java
new file mode 100644
index 0000000..8eaef04
--- /dev/null
+++ b/nifi-bootstrap/src/main/java/org/apache/nifi/bootstrap/util/DumpFileValidator.java
@@ -0,0 +1,59 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.nifi.bootstrap.util;
+
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+
+import java.io.Closeable;
+import java.io.FileNotFoundException;
+import java.io.FileOutputStream;
+import java.io.IOException;
+import java.nio.file.Files;
+import java.nio.file.InvalidPathException;
+import java.nio.file.Path;
+import java.nio.file.Paths;
+
+public final class DumpFileValidator {
+
+ private static final Logger logger = LoggerFactory.getLogger(DumpFileValidator.class);
+
+ private DumpFileValidator() {
+ }
+
+ public static boolean validate(final String filePath) {
+ try {
+ final Path path = Paths.get(filePath);
+ return checkFileCanBeCreated(path);
+ } catch (InvalidPathException e) {
+ System.err.println("Invalid filename. The command parameters are: status-history <number of days> <dumpFile>");
+ return false;
+ }
+ }
+
+ private static boolean checkFileCanBeCreated(final Path path) {
+ try (final FileOutputStream outputStream = new FileOutputStream(path.toString());
+ final Closeable onClose = () -> Files.delete(path)) {
+ } catch (FileNotFoundException e) {
+ System.err.println("Invalid filename or there's no write permission to the currently selected file path.");
+ return false;
+ } catch (IOException e) {
+ logger.error("Could not delete file while validating file path.");
+ }
+ return true;
+ }
+}
diff --git a/nifi-framework-api/src/main/java/org/apache/nifi/controller/status/history/StatusHistoryDump.java b/nifi-framework-api/src/main/java/org/apache/nifi/controller/status/history/StatusHistoryDump.java
new file mode 100644
index 0000000..c218115
--- /dev/null
+++ b/nifi-framework-api/src/main/java/org/apache/nifi/controller/status/history/StatusHistoryDump.java
@@ -0,0 +1,34 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.nifi.controller.status.history;
+
+import java.io.IOException;
+import java.io.OutputStream;
+
+/**
+ * Container for status history data which is capable to write it in an implementation dependent format.
+ */
+public interface StatusHistoryDump {
+
+ /**
+ * Writes a status history dump to an output stream.
+ *
+ * @param out the output stream
+ * @throws IOException if cannot serialize
+ */
+ void writeTo(final OutputStream out) throws IOException;
+}
\ No newline at end of file
diff --git a/nifi-framework-api/src/main/java/org/apache/nifi/controller/status/history/StatusHistoryDumpFactory.java b/nifi-framework-api/src/main/java/org/apache/nifi/controller/status/history/StatusHistoryDumpFactory.java
new file mode 100644
index 0000000..83f8076
--- /dev/null
+++ b/nifi-framework-api/src/main/java/org/apache/nifi/controller/status/history/StatusHistoryDumpFactory.java
@@ -0,0 +1,31 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.nifi.controller.status.history;
+
+/**
+ * Factory class to create StatusHistoryDump instance.
+ */
+public interface StatusHistoryDumpFactory {
+
+ /**
+ * Creates a status history dump object.
+ *
+ * @param days number of backdating days
+ * @return the status history dump
+ */
+ StatusHistoryDump create(final int days);
+}
diff --git a/nifi-nar-bundles/nifi-framework-bundle/nifi-framework/nifi-documentation/src/test/java/org/apache/nifi/documentation/example/NiFiServerStub.java b/nifi-nar-bundles/nifi-framework-bundle/nifi-framework/nifi-documentation/src/test/java/org/apache/nifi/documentation/example/NiFiServerStub.java
index d82bec9..2a1964a 100644
--- a/nifi-nar-bundles/nifi-framework-bundle/nifi-framework/nifi-documentation/src/test/java/org/apache/nifi/documentation/example/NiFiServerStub.java
+++ b/nifi-nar-bundles/nifi-framework-bundle/nifi-framework/nifi-documentation/src/test/java/org/apache/nifi/documentation/example/NiFiServerStub.java
@@ -19,6 +19,7 @@
import org.apache.nifi.NiFiServer;
import org.apache.nifi.bundle.Bundle;
import org.apache.nifi.controller.DecommissionTask;
+import org.apache.nifi.controller.status.history.StatusHistoryDumpFactory;
import org.apache.nifi.diagnostics.DiagnosticsFactory;
import org.apache.nifi.nar.ExtensionMapping;
import org.apache.nifi.util.NiFiProperties;
@@ -60,4 +61,11 @@
public DecommissionTask getDecommissionTask() {
return null;
}
+
+ @Override
+ public StatusHistoryDumpFactory getStatusHistoryDumpFactory() {
+ return null;
+ }
+
+
}
diff --git a/nifi-nar-bundles/nifi-framework-bundle/nifi-framework/nifi-framework-cluster/src/test/java/org/apache/nifi/cluster/integration/Cluster.java b/nifi-nar-bundles/nifi-framework-bundle/nifi-framework/nifi-framework-cluster/src/test/java/org/apache/nifi/cluster/integration/Cluster.java
index 00b76f1..7260927 100644
--- a/nifi-nar-bundles/nifi-framework-bundle/nifi-framework/nifi-framework-cluster/src/test/java/org/apache/nifi/cluster/integration/Cluster.java
+++ b/nifi-nar-bundles/nifi-framework-bundle/nifi-framework/nifi-framework-cluster/src/test/java/org/apache/nifi/cluster/integration/Cluster.java
@@ -25,6 +25,8 @@
import org.apache.nifi.cluster.coordination.flow.FlowElection;
import org.apache.nifi.cluster.coordination.flow.PopularVoteFlowElection;
import org.apache.nifi.cluster.coordination.node.ClusterRoles;
+import org.apache.nifi.controller.status.history.StatusHistoryRepository;
+import org.apache.nifi.controller.status.history.VolatileComponentStatusRepository;
import org.apache.nifi.encrypt.PropertyEncryptor;
import org.apache.nifi.encrypt.PropertyEncryptorFactory;
import org.apache.nifi.encrypt.SensitiveValueEncoder;
@@ -141,8 +143,8 @@
final ExtensionDiscoveringManager extensionManager = new StandardExtensionDiscoveringManager();
final FingerprintFactory fingerprintFactory = new FingerprintFactory(encryptor, extensionManager, sensitiveValueEncoder);
final FlowElection flowElection = new PopularVoteFlowElection(flowElectionTimeoutMillis, TimeUnit.MILLISECONDS, flowElectionMaxNodes, fingerprintFactory);
-
- final Node node = new Node(nifiProperties, extensionManager, flowElection);
+ final StatusHistoryRepository statusHistoryRepository = new VolatileComponentStatusRepository(nifiProperties);
+ final Node node = new Node(nifiProperties, extensionManager, flowElection, statusHistoryRepository);
node.start();
nodes.add(node);
diff --git a/nifi-nar-bundles/nifi-framework-bundle/nifi-framework/nifi-framework-cluster/src/test/java/org/apache/nifi/cluster/integration/Node.java b/nifi-nar-bundles/nifi-framework-bundle/nifi-framework/nifi-framework-cluster/src/test/java/org/apache/nifi/cluster/integration/Node.java
index 8cec815..5e239b2 100644
--- a/nifi-nar-bundles/nifi-framework-bundle/nifi-framework/nifi-framework-cluster/src/test/java/org/apache/nifi/cluster/integration/Node.java
+++ b/nifi-nar-bundles/nifi-framework-bundle/nifi-framework/nifi-framework-cluster/src/test/java/org/apache/nifi/cluster/integration/Node.java
@@ -46,6 +46,7 @@
import org.apache.nifi.controller.leader.election.CuratorLeaderElectionManager;
import org.apache.nifi.controller.leader.election.LeaderElectionManager;
import org.apache.nifi.controller.repository.FlowFileEventRepository;
+import org.apache.nifi.controller.status.history.StatusHistoryRepository;
import org.apache.nifi.encrypt.PropertyEncryptorFactory;
import org.apache.nifi.engine.FlowEngine;
import org.apache.nifi.events.EventReporter;
@@ -84,6 +85,7 @@
private final List<ReportedEvent> reportedEvents = Collections.synchronizedList(new ArrayList<ReportedEvent>());
private final RevisionManager revisionManager;
private final FlowElection flowElection;
+ private final StatusHistoryRepository statusHistoryRepository;
private NodeClusterCoordinator clusterCoordinator;
private NodeProtocolSender protocolSender;
@@ -98,11 +100,13 @@
private ScheduledExecutorService executor = new FlowEngine(8, "Node tasks", true);
- public Node(final NiFiProperties properties, final ExtensionDiscoveringManager extensionManager, final FlowElection flowElection) {
- this(createNodeId(), properties, extensionManager, flowElection);
+ public Node(final NiFiProperties properties, final ExtensionDiscoveringManager extensionManager, final FlowElection flowElection,
+ final StatusHistoryRepository statusHistoryRepository) {
+ this(createNodeId(), properties, extensionManager, flowElection, statusHistoryRepository);
}
- public Node(final NodeIdentifier nodeId, final NiFiProperties properties, final ExtensionDiscoveringManager extensionManager, final FlowElection flowElection) {
+ public Node(final NodeIdentifier nodeId, final NiFiProperties properties, final ExtensionDiscoveringManager extensionManager,
+ final FlowElection flowElection, final StatusHistoryRepository statusHistoryRepository) {
this.nodeId = nodeId;
this.nodeProperties = new NiFiProperties() {
@Override
@@ -137,6 +141,7 @@
electionManager = new CuratorLeaderElectionManager(4, nodeProperties);
this.flowElection = flowElection;
+ this.statusHistoryRepository = statusHistoryRepository;
}
@@ -156,7 +161,7 @@
flowController = FlowController.createClusteredInstance(Mockito.mock(FlowFileEventRepository.class), nodeProperties,
null, null, PropertyEncryptorFactory.getPropertyEncryptor(nodeProperties), protocolSender, Mockito.mock(BulletinRepository.class), clusterCoordinator,
heartbeatMonitor, electionManager, VariableRegistry.EMPTY_REGISTRY, Mockito.mock(FlowRegistryClient.class), extensionManager,
- revisionManager);
+ revisionManager, statusHistoryRepository);
try {
flowController.initializeFlow();
diff --git a/nifi-nar-bundles/nifi-framework-bundle/nifi-framework/nifi-framework-core/src/main/java/org/apache/nifi/controller/FlowController.java b/nifi-nar-bundles/nifi-framework-bundle/nifi-framework/nifi-framework-core/src/main/java/org/apache/nifi/controller/FlowController.java
index 751b6aa..b409103 100644
--- a/nifi-nar-bundles/nifi-framework-bundle/nifi-framework/nifi-framework-core/src/main/java/org/apache/nifi/controller/FlowController.java
+++ b/nifi-nar-bundles/nifi-framework-bundle/nifi-framework/nifi-framework-core/src/main/java/org/apache/nifi/controller/FlowController.java
@@ -240,7 +240,6 @@
public static final String DEFAULT_CONTENT_REPO_IMPLEMENTATION = "org.apache.nifi.controller.repository.FileSystemRepository";
public static final String DEFAULT_PROVENANCE_REPO_IMPLEMENTATION = "org.apache.nifi.provenance.VolatileProvenanceRepository";
public static final String DEFAULT_SWAP_MANAGER_IMPLEMENTATION = "org.apache.nifi.controller.FileSystemSwapManager";
- public static final String DEFAULT_COMPONENT_STATUS_REPO_IMPLEMENTATION = "org.apache.nifi.controller.status.history.VolatileComponentStatusRepository";
private static final String ENCRYPTED_PROVENANCE_REPO_IMPLEMENTATION = "org.apache.nifi.provenance.EncryptedWriteAheadProvenanceRepository";
private static final String ENCRYPTED_CONTENT_REPO_IMPLEMENTATION = "org.apache.nifi.controller.repository.crypto.EncryptedFileSystemRepository";
@@ -393,7 +392,8 @@
final BulletinRepository bulletinRepo,
final VariableRegistry variableRegistry,
final FlowRegistryClient flowRegistryClient,
- final ExtensionManager extensionManager) {
+ final ExtensionManager extensionManager,
+ final StatusHistoryRepository statusHistoryRepository) {
return new FlowController(
flowFileEventRepo,
@@ -410,7 +410,8 @@
/* variable registry */ variableRegistry,
flowRegistryClient,
extensionManager,
- null);
+ null,
+ statusHistoryRepository);
}
public static FlowController createClusteredInstance(
@@ -427,7 +428,8 @@
final VariableRegistry variableRegistry,
final FlowRegistryClient flowRegistryClient,
final ExtensionManager extensionManager,
- final RevisionManager revisionManager) {
+ final RevisionManager revisionManager,
+ final StatusHistoryRepository statusHistoryRepository) {
final FlowController flowController = new FlowController(
flowFileEventRepo,
@@ -444,7 +446,8 @@
variableRegistry,
flowRegistryClient,
extensionManager,
- revisionManager);
+ revisionManager,
+ statusHistoryRepository);
return flowController;
}
@@ -465,7 +468,8 @@
final VariableRegistry variableRegistry,
final FlowRegistryClient flowRegistryClient,
final ExtensionManager extensionManager,
- final RevisionManager revisionManager) {
+ final RevisionManager revisionManager,
+ final StatusHistoryRepository statusHistoryRepository) {
maxTimerDrivenThreads = new AtomicInteger(10);
maxEventDrivenThreads = new AtomicInteger(1);
@@ -481,6 +485,7 @@
this.configuredForClustering = configuredForClustering;
this.flowRegistryClient = flowRegistryClient;
this.revisionManager = revisionManager;
+ this.statusHistoryRepository = statusHistoryRepository;
try {
// Form the container object from the properties
@@ -638,8 +643,6 @@
zooKeeperStateServer = null;
}
- statusHistoryRepository = createStatusHistoryRepository();
-
final boolean analyticsEnabled = Boolean.parseBoolean(nifiProperties.getProperty(NiFiProperties.ANALYTICS_PREDICTION_ENABLED, NiFiProperties.DEFAULT_ANALYTICS_PREDICTION_ENABLED));
if (analyticsEnabled) {
@@ -1185,22 +1188,6 @@
}
}
- private StatusHistoryRepository createStatusHistoryRepository() {
- final String implementationClassName = nifiProperties.getProperty(NiFiProperties.COMPONENT_STATUS_REPOSITORY_IMPLEMENTATION, DEFAULT_COMPONENT_STATUS_REPO_IMPLEMENTATION);
- if (implementationClassName == null) {
- throw new RuntimeException("Cannot create Status History Repository because the NiFi Properties is missing the following property: "
- + NiFiProperties.COMPONENT_STATUS_REPOSITORY_IMPLEMENTATION);
- }
-
- try {
- final StatusHistoryRepository repository = NarThreadContextClassLoader.createInstance(extensionManager, implementationClassName, StatusHistoryRepository.class, nifiProperties);
- repository.start();
- return repository;
- } catch (final Exception e) {
- throw new RuntimeException(e);
- }
- }
-
public KerberosConfig createKerberosConfig(final NiFiProperties nifiProperties) {
final String principal = nifiProperties.getKerberosServicePrincipal();
final String keytabLocation = nifiProperties.getKerberosServiceKeytabLocation();
diff --git a/nifi-nar-bundles/nifi-framework-bundle/nifi-framework/nifi-framework-core/src/main/java/org/apache/nifi/controller/status/history/JsonNodeStatusHistoryDump.java b/nifi-nar-bundles/nifi-framework-bundle/nifi-framework/nifi-framework-core/src/main/java/org/apache/nifi/controller/status/history/JsonNodeStatusHistoryDump.java
new file mode 100644
index 0000000..7215a03
--- /dev/null
+++ b/nifi-nar-bundles/nifi-framework-bundle/nifi-framework/nifi-framework-core/src/main/java/org/apache/nifi/controller/status/history/JsonNodeStatusHistoryDump.java
@@ -0,0 +1,43 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.nifi.controller.status.history;
+
+import com.fasterxml.jackson.core.util.DefaultIndenter;
+import com.fasterxml.jackson.core.util.DefaultPrettyPrinter;
+import com.fasterxml.jackson.databind.ObjectMapper;
+import org.apache.nifi.web.api.dto.status.StatusHistoryDTO;
+
+import java.io.IOException;
+import java.io.OutputStream;
+
+final class JsonNodeStatusHistoryDump implements StatusHistoryDump {
+
+ private final StatusHistory nodeStatusHistory;
+
+ JsonNodeStatusHistoryDump(final StatusHistory nodeStatusHistory) {
+ this.nodeStatusHistory = nodeStatusHistory;
+ }
+
+ @Override
+ public void writeTo(final OutputStream out) throws IOException {
+ final ObjectMapper objectMapper = new ObjectMapper();
+ final DefaultPrettyPrinter prettyPrinter = new DefaultPrettyPrinter();
+ prettyPrinter.indentArraysWith(DefaultIndenter.SYSTEM_LINEFEED_INSTANCE);
+ final StatusHistoryDTO statusHistoryDTO = StatusHistoryUtil.createStatusHistoryDTO(nodeStatusHistory);
+ objectMapper.writer(prettyPrinter).writeValue(out, statusHistoryDTO);
+ }
+}
diff --git a/nifi-nar-bundles/nifi-framework-bundle/nifi-framework/nifi-framework-core/src/main/java/org/apache/nifi/controller/status/history/JsonNodeStatusHistoryDumpFactory.java b/nifi-nar-bundles/nifi-framework-bundle/nifi-framework/nifi-framework-core/src/main/java/org/apache/nifi/controller/status/history/JsonNodeStatusHistoryDumpFactory.java
new file mode 100644
index 0000000..fac963a
--- /dev/null
+++ b/nifi-nar-bundles/nifi-framework-bundle/nifi-framework/nifi-framework-core/src/main/java/org/apache/nifi/controller/status/history/JsonNodeStatusHistoryDumpFactory.java
@@ -0,0 +1,46 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.nifi.controller.status.history;
+
+import com.google.common.base.Preconditions;
+
+import java.time.LocalDateTime;
+import java.time.LocalTime;
+import java.time.ZoneId;
+import java.util.Date;
+
+public class JsonNodeStatusHistoryDumpFactory implements StatusHistoryDumpFactory {
+
+ private StatusHistoryRepository statusHistoryRepository;
+
+ @Override
+ public StatusHistoryDump create(int days) {
+ Preconditions.checkArgument(days > 0, String.format("The number of days shall be greater than 0. The current value is %s.", days));
+ final LocalDateTime endOfToday = LocalDateTime.now().with(LocalTime.MAX);
+ final LocalDateTime startOfDaysBefore = endOfToday.minusDays(days).with(LocalTime.MIN);
+
+ final Date endOfTodayDate = Date.from(endOfToday.atZone(ZoneId.systemDefault()).toInstant());
+ final Date startOfDaysBeforeDate = Date.from(startOfDaysBefore.atZone(ZoneId.systemDefault()).toInstant());
+
+ final StatusHistory nodeStatusHistory = statusHistoryRepository.getNodeStatusHistory(startOfDaysBeforeDate, endOfTodayDate);
+ return new JsonNodeStatusHistoryDump(nodeStatusHistory);
+ }
+
+ public void setStatusHistoryRepository(StatusHistoryRepository statusHistoryRepository) {
+ this.statusHistoryRepository = statusHistoryRepository;
+ }
+}
diff --git a/nifi-nar-bundles/nifi-framework-bundle/nifi-framework/nifi-framework-core/src/main/java/org/apache/nifi/spring/FlowControllerFactoryBean.java b/nifi-nar-bundles/nifi-framework-bundle/nifi-framework/nifi-framework-core/src/main/java/org/apache/nifi/spring/FlowControllerFactoryBean.java
index c373b23..6e005a5 100644
--- a/nifi-nar-bundles/nifi-framework-bundle/nifi-framework/nifi-framework-core/src/main/java/org/apache/nifi/spring/FlowControllerFactoryBean.java
+++ b/nifi-nar-bundles/nifi-framework-bundle/nifi-framework/nifi-framework-core/src/main/java/org/apache/nifi/spring/FlowControllerFactoryBean.java
@@ -24,6 +24,7 @@
import org.apache.nifi.controller.FlowController;
import org.apache.nifi.controller.leader.election.LeaderElectionManager;
import org.apache.nifi.controller.repository.FlowFileEventRepository;
+import org.apache.nifi.controller.status.history.StatusHistoryRepository;
import org.apache.nifi.encrypt.PropertyEncryptor;
import org.apache.nifi.nar.ExtensionManager;
import org.apache.nifi.registry.VariableRegistry;
@@ -55,6 +56,7 @@
private FlowRegistryClient flowRegistryClient;
private ExtensionManager extensionManager;
private RevisionManager revisionManager;
+ private StatusHistoryRepository statusHistoryRepository;
@Override
public Object getObject() throws Exception {
@@ -78,7 +80,8 @@
variableRegistry,
flowRegistryClient,
extensionManager,
- revisionManager);
+ revisionManager,
+ statusHistoryRepository);
} else {
flowController = FlowController.createStandaloneInstance(
flowFileEventRepository,
@@ -89,7 +92,8 @@
bulletinRepository,
variableRegistry,
flowRegistryClient,
- extensionManager);
+ extensionManager,
+ statusHistoryRepository);
}
}
@@ -157,4 +161,8 @@
public void setRevisionManager(final RevisionManager revisionManager) {
this.revisionManager = revisionManager;
}
+
+ public void setStatusHistoryRepository(StatusHistoryRepository statusHistoryRepository) {
+ this.statusHistoryRepository = statusHistoryRepository;
+ }
}
diff --git a/nifi-nar-bundles/nifi-framework-bundle/nifi-framework/nifi-framework-core/src/main/java/org/apache/nifi/spring/StatusHistoryRepositoryFactoryBean.java b/nifi-nar-bundles/nifi-framework-bundle/nifi-framework/nifi-framework-core/src/main/java/org/apache/nifi/spring/StatusHistoryRepositoryFactoryBean.java
new file mode 100644
index 0000000..79e0deb
--- /dev/null
+++ b/nifi-nar-bundles/nifi-framework-bundle/nifi-framework/nifi-framework-core/src/main/java/org/apache/nifi/spring/StatusHistoryRepositoryFactoryBean.java
@@ -0,0 +1,75 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.nifi.spring;
+
+import org.apache.nifi.controller.status.history.StatusHistoryRepository;
+import org.apache.nifi.nar.ExtensionManager;
+import org.apache.nifi.nar.NarThreadContextClassLoader;
+import org.apache.nifi.util.NiFiProperties;
+import org.springframework.beans.BeansException;
+import org.springframework.beans.factory.BeanCreationException;
+import org.springframework.beans.factory.FactoryBean;
+import org.springframework.context.ApplicationContext;
+import org.springframework.context.ApplicationContextAware;
+
+/**
+ * Factory bean for creating a singleton StatusHistoryRepository instance.
+ */
+public class StatusHistoryRepositoryFactoryBean implements FactoryBean<StatusHistoryRepository>, ApplicationContextAware {
+
+ private static final String DEFAULT_COMPONENT_STATUS_REPO_IMPLEMENTATION = "org.apache.nifi.controller.status.history.VolatileComponentStatusRepository";
+
+ private ApplicationContext applicationContext;
+ private NiFiProperties nifiProperties;
+ private ExtensionManager extensionManager;
+ private StatusHistoryRepository statusHistoryRepository;
+
+ @Override
+ public StatusHistoryRepository getObject() throws Exception {
+ final String implementationClassName = nifiProperties.getProperty(NiFiProperties.COMPONENT_STATUS_REPOSITORY_IMPLEMENTATION, DEFAULT_COMPONENT_STATUS_REPO_IMPLEMENTATION);
+ if (implementationClassName == null) {
+ throw new BeanCreationException("Cannot create Status History Repository because the NiFi Properties is missing the following property: "
+ + NiFiProperties.COMPONENT_STATUS_REPOSITORY_IMPLEMENTATION);
+ }
+
+ try {
+ statusHistoryRepository = NarThreadContextClassLoader.createInstance(extensionManager, implementationClassName, StatusHistoryRepository.class, nifiProperties);
+ statusHistoryRepository.start();
+ return statusHistoryRepository;
+ } catch (final Exception e) {
+ throw new RuntimeException(e);
+ }
+ }
+
+ @Override
+ public Class<?> getObjectType() {
+ return StatusHistoryRepository.class;
+ }
+
+ @Override
+ public void setApplicationContext(ApplicationContext applicationContext) throws BeansException {
+ this.applicationContext = applicationContext;
+ }
+
+ public void setNifiProperties(NiFiProperties nifiProperties) {
+ this.nifiProperties = nifiProperties;
+ }
+
+ public void setExtensionManager(ExtensionManager extensionManager) {
+ this.extensionManager = extensionManager;
+ }
+}
diff --git a/nifi-nar-bundles/nifi-framework-bundle/nifi-framework/nifi-framework-core/src/main/resources/nifi-context.xml b/nifi-nar-bundles/nifi-framework-bundle/nifi-framework/nifi-framework-core/src/main/resources/nifi-context.xml
index ef1810c..040329b 100644
--- a/nifi-nar-bundles/nifi-framework-bundle/nifi-framework/nifi-framework-core/src/main/resources/nifi-context.xml
+++ b/nifi-nar-bundles/nifi-framework-bundle/nifi-framework/nifi-framework-core/src/main/resources/nifi-context.xml
@@ -57,6 +57,7 @@
<property name="flowRegistryClient" ref="flowRegistryClient" />
<property name="extensionManager" ref="extensionManager" />
<property name="revisionManager" ref="revisionManager" />
+ <property name="statusHistoryRepository" ref="statusHistoryRepository" />
</bean>
<!-- flow service -->
@@ -77,4 +78,13 @@
<bean id="eventReporter" class="org.apache.nifi.events.StandardEventReporter">
<constructor-arg ref="bulletinRepository" />
</bean>
+
+ <bean id="statusHistoryRepository" class="org.apache.nifi.spring.StatusHistoryRepositoryFactoryBean">
+ <property name="nifiProperties" ref="nifiProperties"/>
+ <property name="extensionManager" ref="extensionManager" />
+ </bean>
+
+ <bean id="statusHistoryDumpFactory" class="org.apache.nifi.controller.status.history.JsonNodeStatusHistoryDumpFactory">
+ <property name="statusHistoryRepository" ref="statusHistoryRepository" />
+ </bean>
</beans>
diff --git a/nifi-nar-bundles/nifi-framework-bundle/nifi-framework/nifi-framework-core/src/test/java/org/apache/nifi/controller/StandardFlowServiceTest.java b/nifi-nar-bundles/nifi-framework-bundle/nifi-framework/nifi-framework-core/src/test/java/org/apache/nifi/controller/StandardFlowServiceTest.java
index 1870841..3abe468 100644
--- a/nifi-nar-bundles/nifi-framework-bundle/nifi-framework/nifi-framework-core/src/test/java/org/apache/nifi/controller/StandardFlowServiceTest.java
+++ b/nifi-nar-bundles/nifi-framework-bundle/nifi-framework/nifi-framework-core/src/test/java/org/apache/nifi/controller/StandardFlowServiceTest.java
@@ -24,6 +24,7 @@
import org.apache.nifi.controller.serialization.FlowSerializationException;
import org.apache.nifi.controller.serialization.ScheduledStateLookup;
import org.apache.nifi.controller.serialization.StandardFlowSerializer;
+import org.apache.nifi.controller.status.history.StatusHistoryRepository;
import org.apache.nifi.encrypt.PropertyEncryptor;
import org.apache.nifi.events.VolatileBulletinRepository;
import org.apache.nifi.nar.ExtensionDiscoveringManager;
@@ -72,6 +73,7 @@
private RevisionManager revisionManager;
private VariableRegistry variableRegistry;
private ExtensionManager extensionManager;
+ private StatusHistoryRepository statusHistoryRepository;
@BeforeClass
public static void setupSuite() {
@@ -91,8 +93,9 @@
revisionManager = mock(RevisionManager.class);
extensionManager = mock(ExtensionDiscoveringManager.class);
flowController = FlowController.createStandaloneInstance(mockFlowFileEventRepository, properties, authorizer, mockAuditService, mockEncryptor,
- new VolatileBulletinRepository(), variableRegistry, mock(FlowRegistryClient.class), extensionManager);
+ new VolatileBulletinRepository(), variableRegistry, mock(FlowRegistryClient.class), extensionManager, statusHistoryRepository);
flowService = StandardFlowService.createStandaloneInstance(flowController, properties, mockEncryptor, revisionManager, authorizer);
+ statusHistoryRepository = mock(StatusHistoryRepository.class);
}
@Test
diff --git a/nifi-nar-bundles/nifi-framework-bundle/nifi-framework/nifi-framework-core/src/test/java/org/apache/nifi/controller/StandardProcessorNodeIT.java b/nifi-nar-bundles/nifi-framework-bundle/nifi-framework/nifi-framework-core/src/test/java/org/apache/nifi/controller/StandardProcessorNodeIT.java
index bc468ff3..832050a 100644
--- a/nifi-nar-bundles/nifi-framework-bundle/nifi-framework/nifi-framework-core/src/test/java/org/apache/nifi/controller/StandardProcessorNodeIT.java
+++ b/nifi-nar-bundles/nifi-framework-bundle/nifi-framework/nifi-framework-core/src/test/java/org/apache/nifi/controller/StandardProcessorNodeIT.java
@@ -33,6 +33,7 @@
import org.apache.nifi.controller.reporting.ReportingTaskInstantiationException;
import org.apache.nifi.controller.repository.FlowFileEventRepository;
import org.apache.nifi.controller.service.ControllerServiceNode;
+import org.apache.nifi.controller.status.history.StatusHistoryRepository;
import org.apache.nifi.engine.FlowEngine;
import org.apache.nifi.events.VolatileBulletinRepository;
import org.apache.nifi.expression.ExpressionLanguageCompiler;
@@ -231,7 +232,7 @@
final FlowController flowController = FlowController.createStandaloneInstance(mock(FlowFileEventRepository.class), nifiProperties,
mock(Authorizer.class), mock(AuditService.class), null, new VolatileBulletinRepository(),
new FileBasedVariableRegistry(nifiProperties.getVariableRegistryPropertiesPaths()),
- mock(FlowRegistryClient.class), extensionManager);
+ mock(FlowRegistryClient.class), extensionManager, mock(StatusHistoryRepository.class));
// Init processor
final PropertyDescriptor classpathProp = new PropertyDescriptor.Builder().name("Classpath Resources")
diff --git a/nifi-nar-bundles/nifi-framework-bundle/nifi-framework/nifi-framework-core/src/test/java/org/apache/nifi/controller/TestFlowController.java b/nifi-nar-bundles/nifi-framework-bundle/nifi-framework/nifi-framework-core/src/test/java/org/apache/nifi/controller/TestFlowController.java
index db6051b..c0c86bd 100644
--- a/nifi-nar-bundles/nifi-framework-bundle/nifi-framework/nifi-framework-core/src/test/java/org/apache/nifi/controller/TestFlowController.java
+++ b/nifi-nar-bundles/nifi-framework-bundle/nifi-framework/nifi-framework-core/src/test/java/org/apache/nifi/controller/TestFlowController.java
@@ -44,6 +44,7 @@
import org.apache.nifi.controller.service.mock.DummyReportingTask;
import org.apache.nifi.controller.service.mock.ServiceA;
import org.apache.nifi.controller.service.mock.ServiceB;
+import org.apache.nifi.controller.status.history.StatusHistoryRepository;
import org.apache.nifi.encrypt.PropertyEncryptor;
import org.apache.nifi.encrypt.PropertyEncryptorFactory;
import org.apache.nifi.groups.ProcessGroup;
@@ -79,7 +80,6 @@
import org.junit.Assert;
import org.junit.Before;
import org.junit.Test;
-import org.mockito.Mockito;
import java.io.File;
import java.io.FileInputStream;
@@ -124,12 +124,13 @@
private BulletinRepository bulletinRepo;
private VariableRegistry variableRegistry;
private ExtensionDiscoveringManager extensionManager;
+ private StatusHistoryRepository statusHistoryRepository;
@Before
public void setup() {
- flowFileEventRepo = Mockito.mock(FlowFileEventRepository.class);
- auditService = Mockito.mock(AuditService.class);
+ flowFileEventRepo = mock(FlowFileEventRepository.class);
+ auditService = mock(AuditService.class);
final Map<String, String> otherProps = new HashMap<>();
otherProps.put(NiFiProperties.PROVENANCE_REPO_IMPLEMENTATION_CLASS, MockProvenanceRepository.class.getName());
otherProps.put("nifi.remote.input.socket.port", "");
@@ -143,6 +144,8 @@
extensionManager = new StandardExtensionDiscoveringManager();
extensionManager.discoverExtensions(systemBundle, Collections.emptySet());
+ statusHistoryRepository = mock(StatusHistoryRepository.class);
+
User user1 = new User.Builder().identifier("user-id-1").identity("user-1").build();
User user2 = new User.Builder().identifier("user-id-2").identity("user-2").build();
@@ -182,9 +185,9 @@
authorizer = new MockPolicyBasedAuthorizer(groups1, users1, policies1);
variableRegistry = new FileBasedVariableRegistry(nifiProperties.getVariableRegistryPropertiesPaths());
- bulletinRepo = Mockito.mock(BulletinRepository.class);
+ bulletinRepo = mock(BulletinRepository.class);
controller = FlowController.createStandaloneInstance(flowFileEventRepo, nifiProperties, authorizer,
- auditService, encryptor, bulletinRepo, variableRegistry, Mockito.mock(FlowRegistryClient.class), extensionManager);
+ auditService, encryptor, bulletinRepo, variableRegistry, mock(FlowRegistryClient.class), extensionManager, statusHistoryRepository);
}
@After
@@ -205,7 +208,7 @@
final DataFlow proposedDataFlow = new StandardDataFlow(flow.getBytes(StandardCharsets.UTF_8), null, authFingerprint.getBytes(StandardCharsets.UTF_8), Collections.emptySet());
- controller.synchronize(standardFlowSynchronizer, proposedDataFlow, Mockito.mock(FlowService.class));
+ controller.synchronize(standardFlowSynchronizer, proposedDataFlow, mock(FlowService.class));
// should be two controller services
final Set<ControllerServiceNode> controllerServiceNodes = controller.getFlowManager().getAllControllerServices();
@@ -267,7 +270,7 @@
final DataFlow proposedDataFlow = new StandardDataFlow(flow.getBytes(StandardCharsets.UTF_8), null, authFingerprint.getBytes(StandardCharsets.UTF_8), Collections.emptySet());
- controller.synchronize(standardFlowSynchronizer, proposedDataFlow, Mockito.mock(FlowService.class));
+ controller.synchronize(standardFlowSynchronizer, proposedDataFlow, mock(FlowService.class));
try {
// should be two controller services
@@ -287,10 +290,10 @@
// verify the processor is still pointing at the controller service that got moved to the root group
final ProcessorNode processorNode = processorNodes.stream().findFirst().get();
final PropertyDescriptor procControllerServiceProp = processorNode.getRawPropertyValues().entrySet().stream()
- .filter(e -> e.getValue().equals(rootGroupCs.getIdentifier()))
- .map(Map.Entry::getKey)
- .findFirst()
- .get();
+ .filter(e -> e.getValue().equals(rootGroupCs.getIdentifier()))
+ .map(Map.Entry::getKey)
+ .findFirst()
+ .get();
assertNotNull(procControllerServiceProp);
} finally {
purgeFlow();
@@ -304,10 +307,10 @@
// create a mock proposed data flow with the same auth fingerprint as the current authorizer
final String authFingerprint = authorizer.getFingerprint();
- final DataFlow proposedDataFlow = Mockito.mock(DataFlow.class);
+ final DataFlow proposedDataFlow = mock(DataFlow.class);
when(proposedDataFlow.getAuthorizerFingerprint()).thenReturn(authFingerprint.getBytes(StandardCharsets.UTF_8));
- controller.synchronize(standardFlowSynchronizer, proposedDataFlow, Mockito.mock(FlowService.class));
+ controller.synchronize(standardFlowSynchronizer, proposedDataFlow, mock(FlowService.class));
assertEquals(authFingerprint, authorizer.getFingerprint());
}
@@ -323,11 +326,11 @@
final String authFingerprint = "<authorizations></authorizations>";
final DataFlow proposedDataFlow = new StandardDataFlow(flow.getBytes(StandardCharsets.UTF_8), null, authFingerprint.getBytes(StandardCharsets.UTF_8), Collections.emptySet());
- controller.synchronize(standardFlowSynchronizer, proposedDataFlow, Mockito.mock(FlowService.class));
+ controller.synchronize(standardFlowSynchronizer, proposedDataFlow, mock(FlowService.class));
controller.initializeFlow();
try {
- controller.synchronize(standardFlowSynchronizer, proposedDataFlow, Mockito.mock(FlowService.class));
+ controller.synchronize(standardFlowSynchronizer, proposedDataFlow, mock(FlowService.class));
assertNotEquals(authFingerprint, authorizer.getFingerprint());
} finally {
purgeFlow();
@@ -346,7 +349,7 @@
final DataFlow proposedDataFlow = new StandardDataFlow(flow.getBytes(StandardCharsets.UTF_8), null, authFingerprint.getBytes(StandardCharsets.UTF_8), Collections.emptySet());
try {
- controller.synchronize(standardFlowSynchronizer, proposedDataFlow, Mockito.mock(FlowService.class));
+ controller.synchronize(standardFlowSynchronizer, proposedDataFlow, mock(FlowService.class));
controller.initializeFlow();
} finally {
purgeFlow();
@@ -365,7 +368,7 @@
final DataFlow proposedDataFlow = new StandardDataFlow(flow.getBytes(StandardCharsets.UTF_8), null, authFingerprint.getBytes(StandardCharsets.UTF_8), Collections.emptySet());
try {
- controller.synchronize(standardFlowSynchronizer, proposedDataFlow, Mockito.mock(FlowService.class));
+ controller.synchronize(standardFlowSynchronizer, proposedDataFlow, mock(FlowService.class));
controller.initializeFlow();
ParameterContext parameterContext = controller.getFlowManager().getParameterContextManager().getParameterContext("context");
@@ -390,7 +393,7 @@
final DataFlow proposedDataFlow = new StandardDataFlow(flow.getBytes(StandardCharsets.UTF_8), null, authFingerprint.getBytes(StandardCharsets.UTF_8), Collections.emptySet());
try {
- controller.synchronize(standardFlowSynchronizer, proposedDataFlow, Mockito.mock(FlowService.class));
+ controller.synchronize(standardFlowSynchronizer, proposedDataFlow, mock(FlowService.class));
controller.initializeFlow();
final Map<String, Parameter> parameters = new HashMap<>();
@@ -436,10 +439,10 @@
// create a mock proposed data flow with different auth fingerprint as the current authorizer
final String authFingerprint = "<authorizations></authorizations>";
- final DataFlow proposedDataFlow = Mockito.mock(DataFlow.class);
+ final DataFlow proposedDataFlow = mock(DataFlow.class);
when(proposedDataFlow.getAuthorizerFingerprint()).thenReturn(authFingerprint.getBytes(StandardCharsets.UTF_8));
- controller.synchronize(standardFlowSynchronizer, proposedDataFlow, Mockito.mock(FlowService.class));
+ controller.synchronize(standardFlowSynchronizer, proposedDataFlow, mock(FlowService.class));
assertNotEquals(authFingerprint, authorizer.getFingerprint());
assertTrue(authorizer.getGroups().isEmpty());
@@ -457,14 +460,14 @@
final String authFingerprint = "<authorizations></authorizations>";
final DataFlow proposedDataFlow = new StandardDataFlow(flow.getBytes(StandardCharsets.UTF_8), null, authFingerprint.getBytes(StandardCharsets.UTF_8), Collections.emptySet());
- controller.synchronize(standardFlowSynchronizer, proposedDataFlow, Mockito.mock(FlowService.class));
+ controller.synchronize(standardFlowSynchronizer, proposedDataFlow, mock(FlowService.class));
controller.initializeFlow();
final DataFlow dataflowWithNullAuthorizations = new StandardDataFlow(flow.getBytes(StandardCharsets.UTF_8), null, null, Collections.emptySet());
try {
- controller.synchronize(standardFlowSynchronizer, dataflowWithNullAuthorizations, Mockito.mock(FlowService.class));
+ controller.synchronize(standardFlowSynchronizer, dataflowWithNullAuthorizations, mock(FlowService.class));
Assert.fail("Was able to synchronize controller with null authorizations but dataflow wasn't empty");
} catch (final UninheritableFlowException expected) {
// expected
@@ -478,10 +481,10 @@
final FlowSynchronizer standardFlowSynchronizer = new StandardFlowSynchronizer(
PropertyEncryptorFactory.getPropertyEncryptor(nifiProperties), nifiProperties, extensionManager);
- final DataFlow proposedDataFlow = Mockito.mock(DataFlow.class);
+ final DataFlow proposedDataFlow = mock(DataFlow.class);
when(proposedDataFlow.getAuthorizerFingerprint()).thenReturn(null);
- controller.synchronize(standardFlowSynchronizer, proposedDataFlow, Mockito.mock(FlowService.class));
+ controller.synchronize(standardFlowSynchronizer, proposedDataFlow, mock(FlowService.class));
assertTrue(authorizer.getGroups().isEmpty());
assertTrue(authorizer.getUsers().isEmpty());
@@ -512,7 +515,7 @@
// create a mock proposed data flow with the same auth fingerprint as the current authorizer
final String authFingerprint = authorizer.getFingerprint();
- final DataFlow proposedDataFlow = Mockito.mock(DataFlow.class);
+ final DataFlow proposedDataFlow = mock(DataFlow.class);
when(proposedDataFlow.getAuthorizerFingerprint()).thenReturn(authFingerprint.getBytes(StandardCharsets.UTF_8));
authorizer = new MockPolicyBasedAuthorizer();
@@ -520,8 +523,8 @@
controller.shutdown(true);
controller = FlowController.createStandaloneInstance(flowFileEventRepo, nifiProperties, authorizer,
- auditService, encryptor, bulletinRepo, variableRegistry, Mockito.mock(FlowRegistryClient.class), extensionManager);
- controller.synchronize(standardFlowSynchronizer, proposedDataFlow, Mockito.mock(FlowService.class));
+ auditService, encryptor, bulletinRepo, variableRegistry, mock(FlowRegistryClient.class), extensionManager, statusHistoryRepository);
+ controller.synchronize(standardFlowSynchronizer, proposedDataFlow, mock(FlowService.class));
assertEquals(authFingerprint, authorizer.getFingerprint());
}
@@ -534,11 +537,11 @@
missingComponents.add("1");
missingComponents.add("2");
- final DataFlow proposedDataFlow = Mockito.mock(DataFlow.class);
+ final DataFlow proposedDataFlow = mock(DataFlow.class);
when(proposedDataFlow.getMissingComponents()).thenReturn(missingComponents);
try {
- controller.synchronize(standardFlowSynchronizer, proposedDataFlow, Mockito.mock(FlowService.class));
+ controller.synchronize(standardFlowSynchronizer, proposedDataFlow, mock(FlowService.class));
Assert.fail("Should have thrown exception");
} catch (UninheritableFlowException e) {
assertTrue(e.getMessage().contains("Proposed flow has missing components that are not considered missing in the current flow (1,2)"));
@@ -568,7 +571,7 @@
final SnippetManager mockSnippetManager = mock(SnippetManager.class);
when(mockSnippetManager.export()).thenReturn(new byte[0]);
- final FlowManager flowManager = Mockito.mock(FlowManager.class);
+ final FlowManager flowManager = mock(FlowManager.class);
final FlowController mockFlowController = mock(FlowController.class);
when(mockFlowController.getFlowManager()).thenReturn(flowManager);
@@ -579,11 +582,11 @@
when(mockFlowController.getAuthorizer()).thenReturn(authorizer);
when(mockFlowController.getSnippetManager()).thenReturn(mockSnippetManager);
- final DataFlow proposedDataFlow = Mockito.mock(DataFlow.class);
+ final DataFlow proposedDataFlow = mock(DataFlow.class);
when(proposedDataFlow.getMissingComponents()).thenReturn(new HashSet<>());
try {
- standardFlowSynchronizer.sync(mockFlowController, proposedDataFlow, encryptor, Mockito.mock(FlowService.class));
+ standardFlowSynchronizer.sync(mockFlowController, proposedDataFlow, encryptor, mock(FlowService.class));
Assert.fail("Should have thrown exception");
} catch (UninheritableFlowException e) {
assertTrue(e.getMessage(), e.getMessage().contains("Current flow has missing components that are not considered missing in the proposed flow (1,2,3)"));
@@ -636,7 +639,7 @@
final byte[] authFingerprintBytes = authFingerprint.getBytes(StandardCharsets.UTF_8);
final DataFlow proposedDataFlow1 = new StandardDataFlow(flowBytes, null, authFingerprintBytes, Collections.emptySet());
- controller.synchronize(standardFlowSynchronizer, proposedDataFlow1, Mockito.mock(FlowService.class));
+ controller.synchronize(standardFlowSynchronizer, proposedDataFlow1, mock(FlowService.class));
}
@Test
diff --git a/nifi-nar-bundles/nifi-framework-bundle/nifi-framework/nifi-framework-core/src/test/java/org/apache/nifi/controller/reporting/TestStandardReportingContext.java b/nifi-nar-bundles/nifi-framework-bundle/nifi-framework/nifi-framework-core/src/test/java/org/apache/nifi/controller/reporting/TestStandardReportingContext.java
index 11a0ad5..a14a730 100644
--- a/nifi-nar-bundles/nifi-framework-bundle/nifi-framework/nifi-framework-core/src/test/java/org/apache/nifi/controller/reporting/TestStandardReportingContext.java
+++ b/nifi-nar-bundles/nifi-framework-bundle/nifi-framework/nifi-framework-core/src/test/java/org/apache/nifi/controller/reporting/TestStandardReportingContext.java
@@ -31,6 +31,7 @@
import org.apache.nifi.controller.FlowController;
import org.apache.nifi.controller.ReportingTaskNode;
import org.apache.nifi.controller.repository.FlowFileEventRepository;
+import org.apache.nifi.controller.status.history.StatusHistoryRepository;
import org.apache.nifi.encrypt.PropertyEncryptor;
import org.apache.nifi.encrypt.PropertyEncryptorFactory;
import org.apache.nifi.nar.ExtensionDiscoveringManager;
@@ -69,6 +70,7 @@
private BulletinRepository bulletinRepo;
private VariableRegistry variableRegistry;
private FlowRegistryClient flowRegistry;
+ private StatusHistoryRepository statusHistoryRepository;
private volatile String propsFile = TestStandardReportingContext.class.getResource("/flowcontrollertest.nifi.properties").getFile();
@Before
@@ -87,6 +89,8 @@
extensionManager = new StandardExtensionDiscoveringManager();
extensionManager.discoverExtensions(systemBundle, Collections.emptySet());
+ statusHistoryRepository = Mockito.mock(StatusHistoryRepository.class);
+
User user1 = new User.Builder().identifier("user-id-1").identity("user-1").build();
User user2 = new User.Builder().identifier("user-id-2").identity("user-2").build();
@@ -129,7 +133,7 @@
bulletinRepo = Mockito.mock(BulletinRepository.class);
controller = FlowController.createStandaloneInstance(flowFileEventRepo, nifiProperties, authorizer, auditService, encryptor,
- bulletinRepo, variableRegistry, flowRegistry, extensionManager);
+ bulletinRepo, variableRegistry, flowRegistry, extensionManager, statusHistoryRepository);
}
@After
diff --git a/nifi-nar-bundles/nifi-framework-bundle/nifi-framework/nifi-framework-core/src/test/java/org/apache/nifi/controller/scheduling/ProcessorLifecycleIT.java b/nifi-nar-bundles/nifi-framework-bundle/nifi-framework/nifi-framework-core/src/test/java/org/apache/nifi/controller/scheduling/ProcessorLifecycleIT.java
index 1c7f187..b2f38fe 100644
--- a/nifi-nar-bundles/nifi-framework-bundle/nifi-framework/nifi-framework-core/src/test/java/org/apache/nifi/controller/scheduling/ProcessorLifecycleIT.java
+++ b/nifi-nar-bundles/nifi-framework-bundle/nifi-framework/nifi-framework-core/src/test/java/org/apache/nifi/controller/scheduling/ProcessorLifecycleIT.java
@@ -35,6 +35,7 @@
import org.apache.nifi.controller.ScheduledState;
import org.apache.nifi.controller.flow.FlowManager;
import org.apache.nifi.controller.repository.FlowFileEventRepository;
+import org.apache.nifi.controller.status.history.StatusHistoryRepository;
import org.apache.nifi.events.VolatileBulletinRepository;
import org.apache.nifi.groups.ProcessGroup;
import org.apache.nifi.nar.ExtensionDiscoveringManager;
@@ -558,9 +559,9 @@
extensionManager.discoverExtensions(systemBundle, Collections.emptySet());
final FlowController flowController = FlowController.createStandaloneInstance(mock(FlowFileEventRepository.class), nifiProperties,
- mock(Authorizer.class), mock(AuditService.class), null, new VolatileBulletinRepository(),
- new FileBasedVariableRegistry(nifiProperties.getVariableRegistryPropertiesPaths()),
- mock(FlowRegistryClient.class), extensionManager);
+ mock(Authorizer.class), mock(AuditService.class), null, new VolatileBulletinRepository(),
+ new FileBasedVariableRegistry(nifiProperties.getVariableRegistryPropertiesPaths()),
+ mock(FlowRegistryClient.class), extensionManager, mock(StatusHistoryRepository.class));
final FlowManager flowManager = flowController.getFlowManager();
this.processScheduler = flowController.getProcessScheduler();
@@ -593,9 +594,11 @@
}
/**
+ *
*/
public static class TestProcessor extends AbstractProcessor {
- private static final Runnable NOP = () -> {};
+ private static final Runnable NOP = () -> {
+ };
private Runnable onScheduleCallback = NOP;
private Runnable onUnscheduleCallback = NOP;
@@ -613,8 +616,7 @@
private final List<String> operationNames = new LinkedList<>();
- void setScenario(Runnable onScheduleCallback, Runnable onUnscheduleCallback, Runnable onStopCallback,
- Runnable onTriggerCallback) {
+ void setScenario(Runnable onScheduleCallback, Runnable onUnscheduleCallback, Runnable onStopCallback, Runnable onTriggerCallback) {
this.onScheduleCallback = onScheduleCallback;
this.onUnscheduleCallback = onUnscheduleCallback;
this.onStopCallback = onStopCallback;
@@ -731,6 +733,7 @@
this.delayLimit = delayLimit;
this.randomDelay = randomDelay;
}
+
Random random = new Random();
@Override
diff --git a/nifi-nar-bundles/nifi-framework-bundle/nifi-framework/nifi-framework-core/src/test/java/org/apache/nifi/controller/serialization/StandardFlowSerializerTest.java b/nifi-nar-bundles/nifi-framework-bundle/nifi-framework/nifi-framework-core/src/test/java/org/apache/nifi/controller/serialization/StandardFlowSerializerTest.java
index 4129f12..377778d 100644
--- a/nifi-nar-bundles/nifi-framework-bundle/nifi-framework/nifi-framework-core/src/test/java/org/apache/nifi/controller/serialization/StandardFlowSerializerTest.java
+++ b/nifi-nar-bundles/nifi-framework-bundle/nifi-framework/nifi-framework-core/src/test/java/org/apache/nifi/controller/serialization/StandardFlowSerializerTest.java
@@ -25,6 +25,7 @@
import org.apache.nifi.controller.FlowController;
import org.apache.nifi.controller.ProcessorNode;
import org.apache.nifi.controller.repository.FlowFileEventRepository;
+import org.apache.nifi.controller.status.history.StatusHistoryRepository;
import org.apache.nifi.encrypt.PropertyEncryptor;
import org.apache.nifi.encrypt.PropertyEncryptorFactory;
import org.apache.nifi.nar.ExtensionDiscoveringManager;
@@ -100,7 +101,7 @@
final BulletinRepository bulletinRepo = Mockito.mock(BulletinRepository.class);
controller = FlowController.createStandaloneInstance(flowFileEventRepo, nifiProperties, authorizer,
- auditService, encryptor, bulletinRepo, variableRegistry, Mockito.mock(FlowRegistryClient.class), extensionManager);
+ auditService, encryptor, bulletinRepo, variableRegistry, Mockito.mock(FlowRegistryClient.class), extensionManager, Mockito.mock(StatusHistoryRepository.class));
serializer = new StandardFlowSerializer(encryptor);
}
diff --git a/nifi-nar-bundles/nifi-framework-bundle/nifi-framework/nifi-framework-core/src/test/java/org/apache/nifi/controller/status/history/JsonNodeStatusHistoryDumpFactoryTest.java b/nifi-nar-bundles/nifi-framework-bundle/nifi-framework/nifi-framework-core/src/test/java/org/apache/nifi/controller/status/history/JsonNodeStatusHistoryDumpFactoryTest.java
new file mode 100644
index 0000000..fc331bf
--- /dev/null
+++ b/nifi-nar-bundles/nifi-framework-bundle/nifi-framework/nifi-framework-core/src/test/java/org/apache/nifi/controller/status/history/JsonNodeStatusHistoryDumpFactoryTest.java
@@ -0,0 +1,82 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.nifi.controller.status.history;
+
+import org.junit.Assert;
+import org.junit.Test;
+import org.mockito.ArgumentCaptor;
+
+import java.time.LocalDateTime;
+import java.time.LocalTime;
+import java.time.ZoneId;
+import java.util.Date;
+
+import static org.junit.Assert.assertEquals;
+import static org.mockito.Mockito.mock;
+import static org.mockito.Mockito.verify;
+
+public class JsonNodeStatusHistoryDumpFactoryTest {
+
+ private static final String EXPECTED_EXCEPTION_MESSAGE = "The number of days shall be greater than 0. The current value is %s.";
+
+ @Test
+ public void testJsonNodeStatusDumpFactory() {
+ final int days = 3;
+ final StatusHistoryRepository statusHistoryRepository = mock(StatusHistoryRepository.class);
+ final ArgumentCaptor<Date> fromArgumentCaptor = ArgumentCaptor.forClass(Date.class);
+ final ArgumentCaptor<Date> toArgumentCaptor = ArgumentCaptor.forClass(Date.class);
+
+ JsonNodeStatusHistoryDumpFactory factory = new JsonNodeStatusHistoryDumpFactory();
+ factory.setStatusHistoryRepository(statusHistoryRepository);
+
+ factory.create(days);
+
+ verify(statusHistoryRepository).getNodeStatusHistory(fromArgumentCaptor.capture(), toArgumentCaptor.capture());
+
+ final LocalDateTime endOfToday = LocalDateTime.now().with(LocalTime.MAX);
+ final LocalDateTime startOfDaysBefore = endOfToday.minusDays(days).with(LocalTime.MIN);
+
+ final Date endOfTodayDate = Date.from(endOfToday.atZone(ZoneId.systemDefault()).toInstant());
+ final Date startOfDaysBeforeDate = Date.from(startOfDaysBefore.atZone(ZoneId.systemDefault()).toInstant());
+
+ assertEquals(endOfTodayDate, toArgumentCaptor.getValue());
+ assertEquals(startOfDaysBeforeDate, fromArgumentCaptor.getValue());
+ }
+
+ @Test
+ public void testJsonNodeStatusDumpFactoryWithLessThanOneDayThrowsException() {
+ final int zeroDays = 0;
+ final int negativeDays = -1;
+ final StatusHistoryRepository statusHistoryRepository = mock(StatusHistoryRepository.class);
+
+ JsonNodeStatusHistoryDumpFactory factory = new JsonNodeStatusHistoryDumpFactory();
+ factory.setStatusHistoryRepository(statusHistoryRepository);
+
+ final IllegalArgumentException zeroDaysException = Assert.assertThrows(IllegalArgumentException.class,
+ () -> factory.create(zeroDays)
+ );
+
+ assertEquals(String.format(EXPECTED_EXCEPTION_MESSAGE, zeroDays), zeroDaysException.getMessage());
+
+ final IllegalArgumentException negativeDaysException = Assert.assertThrows(IllegalArgumentException.class,
+ () -> factory.create(negativeDays)
+ );
+
+ assertEquals(String.format(EXPECTED_EXCEPTION_MESSAGE, negativeDays), negativeDaysException.getMessage());
+ }
+
+}
diff --git a/nifi-nar-bundles/nifi-framework-bundle/nifi-framework/nifi-framework-core/src/test/java/org/apache/nifi/integration/FrameworkIntegrationTest.java b/nifi-nar-bundles/nifi-framework-bundle/nifi-framework/nifi-framework-core/src/test/java/org/apache/nifi/integration/FrameworkIntegrationTest.java
index f5505b6..3f4e7d6 100644
--- a/nifi-nar-bundles/nifi-framework-bundle/nifi-framework/nifi-framework-core/src/test/java/org/apache/nifi/integration/FrameworkIntegrationTest.java
+++ b/nifi-nar-bundles/nifi-framework-bundle/nifi-framework/nifi-framework-core/src/test/java/org/apache/nifi/integration/FrameworkIntegrationTest.java
@@ -157,6 +157,7 @@
private Bundle systemBundle;
private ClusterCoordinator clusterCoordinator;
private NiFiProperties nifiProperties;
+ private StatusHistoryRepository statusHistoryRepository;
public static final Relationship REL_SUCCESS = new Relationship.Builder().name("success").build();
@@ -231,6 +232,8 @@
systemBundle = SystemBundle.create(nifiProperties);
extensionManager.discoverExtensions(systemBundle, Collections.emptySet());
+ statusHistoryRepository = Mockito.mock(StatusHistoryRepository.class);
+
final PropertyEncryptor encryptor = createEncryptor();
final Authorizer authorizer = new AlwaysAuthorizedAuthorizer();
final AuditService auditService = new NopAuditService();
@@ -266,8 +269,9 @@
Mockito.when(clusterCoordinator.getNodeIdentifiers()).thenReturn(nodeIdentifiers);
Mockito.when(clusterCoordinator.getLocalNodeIdentifier()).thenReturn(localNodeId);
- flowController = FlowController.createClusteredInstance(flowFileEventRepository, nifiProperties, authorizer, auditService, encryptor, protocolSender, bulletinRepo, clusterCoordinator,
- heartbeatMonitor, leaderElectionManager, VariableRegistry.ENVIRONMENT_SYSTEM_REGISTRY, flowRegistryClient, extensionManager, Mockito.mock(RevisionManager.class));
+ flowController = FlowController.createClusteredInstance(flowFileEventRepository, nifiProperties, authorizer, auditService, encryptor, protocolSender,
+ bulletinRepo, clusterCoordinator, heartbeatMonitor, leaderElectionManager, VariableRegistry.ENVIRONMENT_SYSTEM_REGISTRY, flowRegistryClient,
+ extensionManager, Mockito.mock(RevisionManager.class), statusHistoryRepository);
flowController.setClustered(true, UUID.randomUUID().toString());
flowController.setNodeId(localNodeId);
@@ -275,7 +279,7 @@
flowController.setConnectionStatus(new NodeConnectionStatus(localNodeId, NodeConnectionState.CONNECTED));
} else {
flowController = FlowController.createStandaloneInstance(flowFileEventRepository, nifiProperties, authorizer, auditService, encryptor, bulletinRepo,
- VariableRegistry.ENVIRONMENT_SYSTEM_REGISTRY, flowRegistryClient, extensionManager);
+ VariableRegistry.ENVIRONMENT_SYSTEM_REGISTRY, flowRegistryClient, extensionManager, statusHistoryRepository);
}
processScheduler = new StandardProcessScheduler(flowEngine, flowController, encryptor, flowController.getStateManagerProvider(), nifiProperties);
diff --git a/nifi-nar-bundles/nifi-framework-bundle/nifi-framework/nifi-headless-server/src/main/java/org/apache/nifi/headless/HeadlessNiFiServer.java b/nifi-nar-bundles/nifi-framework-bundle/nifi-framework/nifi-headless-server/src/main/java/org/apache/nifi/headless/HeadlessNiFiServer.java
index c2b5143..1118995 100644
--- a/nifi-nar-bundles/nifi-framework-bundle/nifi-framework/nifi-headless-server/src/main/java/org/apache/nifi/headless/HeadlessNiFiServer.java
+++ b/nifi-nar-bundles/nifi-framework-bundle/nifi-framework/nifi-headless-server/src/main/java/org/apache/nifi/headless/HeadlessNiFiServer.java
@@ -34,6 +34,8 @@
import org.apache.nifi.controller.flow.FlowManager;
import org.apache.nifi.controller.repository.FlowFileEventRepository;
import org.apache.nifi.controller.repository.metrics.RingBufferEventRepository;
+import org.apache.nifi.controller.status.history.StatusHistoryDumpFactory;
+import org.apache.nifi.controller.status.history.StatusHistoryRepository;
import org.apache.nifi.diagnostics.DiagnosticsDump;
import org.apache.nifi.diagnostics.DiagnosticsDumpElement;
import org.apache.nifi.diagnostics.DiagnosticsFactory;
@@ -51,6 +53,7 @@
import org.apache.nifi.registry.variable.FileBasedVariableRegistry;
import org.apache.nifi.reporting.BulletinRepository;
import org.apache.nifi.services.FlowService;
+import org.apache.nifi.spring.StatusHistoryRepositoryFactoryBean;
import org.apache.nifi.util.FlowParser;
import org.apache.nifi.util.NiFiProperties;
import org.slf4j.Logger;
@@ -64,6 +67,7 @@
import java.util.Set;
/**
+ *
*/
public class HeadlessNiFiServer implements NiFiServer {
@@ -128,6 +132,11 @@
StandardFlowRegistryClient flowRegistryClient = new StandardFlowRegistryClient();
flowRegistryClient.setProperties(props);
+ final StatusHistoryRepositoryFactoryBean statusHistoryRepositoryFactoryBean = new StatusHistoryRepositoryFactoryBean();
+ statusHistoryRepositoryFactoryBean.setNifiProperties(props);
+ statusHistoryRepositoryFactoryBean.setExtensionManager(extensionManager);
+ StatusHistoryRepository statusHistoryRepository = statusHistoryRepositoryFactoryBean.getObject();
+
flowController = FlowController.createStandaloneInstance(
flowFileEventRepository,
props,
@@ -137,7 +146,8 @@
bulletinRepository,
variableRegistry,
flowRegistryClient,
- extensionManager);
+ extensionManager,
+ statusHistoryRepository);
flowService = StandardFlowService.createStandaloneInstance(
flowController,
@@ -199,6 +209,11 @@
return null;
}
+ @Override
+ public StatusHistoryDumpFactory getStatusHistoryDumpFactory() {
+ return null;
+ }
+
public void stop() {
try {
flowService.stop(false);
diff --git a/nifi-nar-bundles/nifi-framework-bundle/nifi-framework/nifi-resources/src/main/resources/bin/nifi.sh b/nifi-nar-bundles/nifi-framework-bundle/nifi-framework/nifi-resources/src/main/resources/bin/nifi.sh
index 6d36bdf..f70e896 100755
--- a/nifi-nar-bundles/nifi-framework-bundle/nifi-framework/nifi-resources/src/main/resources/bin/nifi.sh
+++ b/nifi-nar-bundles/nifi-framework-bundle/nifi-framework/nifi-resources/src/main/resources/bin/nifi.sh
@@ -455,7 +455,7 @@
install "$@"
;;
- start|stop|decommission|run|status|is_loaded|dump|diagnostics|env|stateless|set-sensitive-properties-key|set-single-user-credentials)
+ start|stop|decommission|run|status|is_loaded|dump|diagnostics|status-history|env|stateless|set-sensitive-properties-key|set-single-user-credentials)
main "$@"
;;
@@ -465,6 +465,6 @@
run "start"
;;
*)
- echo "Usage nifi {start|stop|decommission|run|restart|status|dump|diagnostics|install|stateless|set-sensitive-properties-key|set-single-user-credentials}"
+ echo "Usage nifi {start|stop|decommission|run|restart|status|dump|diagnostics|status-history|install|stateless|set-sensitive-properties-key|set-single-user-credentials}"
;;
esac
diff --git a/nifi-nar-bundles/nifi-framework-bundle/nifi-framework/nifi-runtime/src/main/java/org/apache/nifi/BootstrapListener.java b/nifi-nar-bundles/nifi-framework-bundle/nifi-framework/nifi-runtime/src/main/java/org/apache/nifi/BootstrapListener.java
index b9b12ec..574ea29 100644
--- a/nifi-nar-bundles/nifi-framework-bundle/nifi-framework/nifi-runtime/src/main/java/org/apache/nifi/BootstrapListener.java
+++ b/nifi-nar-bundles/nifi-framework-bundle/nifi-framework/nifi-runtime/src/main/java/org/apache/nifi/BootstrapListener.java
@@ -17,6 +17,7 @@
package org.apache.nifi;
import org.apache.nifi.controller.DecommissionTask;
+import org.apache.nifi.controller.status.history.StatusHistoryDump;
import org.apache.nifi.diagnostics.DiagnosticsDump;
import org.apache.nifi.util.LimitingInputStream;
import org.slf4j.Logger;
@@ -72,7 +73,7 @@
listenThread.start();
logger.debug("Notifying Bootstrap that local port is {}", localPort);
- sendCommand("PORT", new String[] { String.valueOf(localPort), secretKey});
+ sendCommand("PORT", new String[]{String.valueOf(localPort), secretKey});
}
public void reload() throws IOException {
@@ -94,7 +95,7 @@
public void sendStartedStatus(boolean status) throws IOException {
logger.debug("Notifying Bootstrap that the status of starting NiFi is {}", status);
- sendCommand("STARTED", new String[]{ String.valueOf(status) });
+ sendCommand("STARTED", new String[]{String.valueOf(status)});
}
private void sendCommand(final String command, final String[] args) throws IOException {
@@ -238,6 +239,12 @@
writeDiagnostics(socket.getOutputStream(), verbose);
break;
+ case STATUS_HISTORY:
+ logger.info("Received STATUS_HISTORY request from Bootstrap");
+ final String[] statusHistoryArgs = request.getArgs();
+ final int days = Integer.parseInt(statusHistoryArgs[0]);
+ writeNodeStatusHistory(socket.getOutputStream(), days);
+ break;
case IS_LOADED:
logger.debug("Received IS_LOADED request from Bootstrap");
String answer = String.valueOf(nifiLoaded);
@@ -282,6 +289,11 @@
diagnosticsDump.writeTo(out);
}
+ private void writeNodeStatusHistory(final OutputStream out, final int days) throws IOException {
+ final StatusHistoryDump statusHistoryDump = nifi.getServer().getStatusHistoryDumpFactory().create(days);
+ statusHistoryDump.writeTo(out);
+ }
+
private void sendAnswer(final OutputStream out, final String answer) throws IOException {
out.write((answer + "\n").getBytes(StandardCharsets.UTF_8));
out.flush();
@@ -333,7 +345,8 @@
DIAGNOSTICS,
DECOMMISSION,
PING,
- IS_LOADED
+ IS_LOADED,
+ STATUS_HISTORY
}
private final RequestType requestType;
diff --git a/nifi-nar-bundles/nifi-framework-bundle/nifi-framework/nifi-web/nifi-jetty/src/main/java/org/apache/nifi/web/server/JettyServer.java b/nifi-nar-bundles/nifi-framework-bundle/nifi-framework/nifi-web/nifi-jetty/src/main/java/org/apache/nifi/web/server/JettyServer.java
index c70f6b9..72efa0d 100644
--- a/nifi-nar-bundles/nifi-framework-bundle/nifi-framework/nifi-web/nifi-jetty/src/main/java/org/apache/nifi/web/server/JettyServer.java
+++ b/nifi-nar-bundles/nifi-framework-bundle/nifi-framework/nifi-web/nifi-jetty/src/main/java/org/apache/nifi/web/server/JettyServer.java
@@ -27,6 +27,7 @@
import org.apache.nifi.controller.UninheritableFlowException;
import org.apache.nifi.controller.serialization.FlowSerializationException;
import org.apache.nifi.controller.serialization.FlowSynchronizationException;
+import org.apache.nifi.controller.status.history.StatusHistoryDumpFactory;
import org.apache.nifi.diagnostics.DiagnosticsDump;
import org.apache.nifi.diagnostics.DiagnosticsDumpElement;
import org.apache.nifi.diagnostics.DiagnosticsFactory;
@@ -165,6 +166,7 @@
private DiagnosticsFactory diagnosticsFactory;
private SslContextFactory.Server sslContextFactory;
private DecommissionTask decommissionTask;
+ private StatusHistoryDumpFactory statusHistoryDumpFactory;
private WebAppContext webApiContext;
private WebAppContext webDocsContext;
@@ -631,7 +633,7 @@
XSSProtectionFilter.class,
XContentTypeOptionsFilter.class));
- if(props.isHTTPSConfigured()) {
+ if (props.isHTTPSConfigured()) {
filters.add(StrictTransportSecurityFilter.class);
}
filters.forEach((filter) -> addFilters(filter, webappContext));
@@ -708,7 +710,7 @@
* @param webAppContext context to which filters will be added
* @param props the {@link NiFiProperties}
*/
- private static void addDenialOfServiceFilters(final WebAppContext webAppContext, final NiFiProperties props) {
+ private static void addDenialOfServiceFilters(final WebAppContext webAppContext, final NiFiProperties props) {
addWebRequestLimitingFilter(webAppContext, props.getMaxWebRequestsPerSecond(), getWebRequestTimeoutMs(props), props.getWebRequestIpWhitelist());
// Only add the ContentLengthFilter if the property is explicitly set (empty by default)
@@ -737,10 +739,10 @@
* In order to allow clients to make more requests than the maximum rate, clients can be added to the {@code ipWhitelist}.
* The {@code requestTimeoutInMilliseconds} value limits requests to the given request timeout amount, and will close connections that run longer than this time.
*
- * @param webAppContext Web Application Context where Filter will be added
+ * @param webAppContext Web Application Context where Filter will be added
* @param maxRequestsPerSec Maximum number of allowed requests per second
- * @param maxRequestMs Maximum amount of time in milliseconds before a connection will be automatically closed
- * @param allowed Comma-separated string of IP addresses that should not be rate limited. Does not apply to request timeout
+ * @param maxRequestMs Maximum amount of time in milliseconds before a connection will be automatically closed
+ * @param allowed Comma-separated string of IP addresses that should not be rate limited. Does not apply to request timeout
*/
private static void addWebRequestLimitingFilter(final WebAppContext webAppContext, final int maxRequestsPerSec, final long maxRequestMs, final String allowed) {
final FilterHolder holder = new FilterHolder(DoSFilter.class);
@@ -905,6 +907,7 @@
/**
* Configures a KeyStoreScanner and TrustStoreScanner at the configured reload intervals. This will
* reload the SSLContextFactory if any changes are detected to the keystore or truststore.
+ *
* @param server The Jetty server
*/
private void configureSslContextFactoryReloading(Server server) {
@@ -1185,6 +1188,7 @@
diagnosticsFactory = webApplicationContext.getBean("diagnosticsFactory", DiagnosticsFactory.class);
decommissionTask = webApplicationContext.getBean("decommissionTask", DecommissionTask.class);
+ statusHistoryDumpFactory = webApplicationContext.getBean("statusHistoryDumpFactory", StatusHistoryDumpFactory.class);
}
// ensure the web document war was loaded and provide the extension mapping
@@ -1263,6 +1267,12 @@
return decommissionTask;
}
+ @Override
+ public StatusHistoryDumpFactory getStatusHistoryDumpFactory() {
+ return statusHistoryDumpFactory;
+ }
+
+
private void performInjectionForComponentUis(final Collection<WebAppContext> componentUiExtensionWebContexts,
final NiFiWebConfigurationContext configurationContext, final FilterHolder securityFilter) {
if (CollectionUtils.isNotEmpty(componentUiExtensionWebContexts)) {
diff --git a/nifi-nar-bundles/nifi-standard-bundle/nifi-standard-processors/pom.xml b/nifi-nar-bundles/nifi-standard-bundle/nifi-standard-processors/pom.xml
index 75b1f0a..27e75be 100644
--- a/nifi-nar-bundles/nifi-standard-bundle/nifi-standard-processors/pom.xml
+++ b/nifi-nar-bundles/nifi-standard-bundle/nifi-standard-processors/pom.xml
@@ -620,6 +620,7 @@
<exclude>src/test/resources/TestLookupRecord/lookup-array-output.json</exclude>
<exclude>src/test/resources/TestValidateRecord/int-maps-schema.avsc</exclude>
<exclude>src/test/resources/TestValidateRecord/int-maps-data.json</exclude>
+ <exclude>src/test/resources/TestValidateRecord/array-and-map-with-null-element.avro</exclude>
</excludes>
</configuration>
</plugin>
diff --git a/nifi-server-api/src/main/java/org/apache/nifi/NiFiServer.java b/nifi-server-api/src/main/java/org/apache/nifi/NiFiServer.java
index cc87079..70a0f5e 100644
--- a/nifi-server-api/src/main/java/org/apache/nifi/NiFiServer.java
+++ b/nifi-server-api/src/main/java/org/apache/nifi/NiFiServer.java
@@ -18,6 +18,7 @@
import org.apache.nifi.bundle.Bundle;
import org.apache.nifi.controller.DecommissionTask;
+import org.apache.nifi.controller.status.history.StatusHistoryDumpFactory;
import org.apache.nifi.diagnostics.DiagnosticsFactory;
import org.apache.nifi.nar.ExtensionMapping;
import org.apache.nifi.util.NiFiProperties;
@@ -40,4 +41,6 @@
DiagnosticsFactory getThreadDumpFactory();
DecommissionTask getDecommissionTask();
+
+ StatusHistoryDumpFactory getStatusHistoryDumpFactory();
}