MINIFI-408 - This closes #99. Perform automated determination of compatible bundles for
processor startup. This seeks to provide a MiNiFi friendly process for
handling those cases where NiFi would ghost a component and allow a user
to select a specific bundle in the UI.
MINIFI-348 - Removes validation warnings on startup as bundling
information is evaluated and applied before starting the MiNiFi process.
Signed-off-by: joewitt <joewitt@apache.org>
diff --git a/minifi-nar-bundles/minifi-framework-bundle/minifi-framework/minifi-runtime/src/main/java/org/apache/nifi/minifi/FlowEnricher.java b/minifi-nar-bundles/minifi-framework-bundle/minifi-framework/minifi-runtime/src/main/java/org/apache/nifi/minifi/FlowEnricher.java
new file mode 100644
index 0000000..8f99fd0
--- /dev/null
+++ b/minifi-nar-bundles/minifi-framework-bundle/minifi-framework/minifi-runtime/src/main/java/org/apache/nifi/minifi/FlowEnricher.java
@@ -0,0 +1,268 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ * <p>
+ * http://www.apache.org/licenses/LICENSE-2.0
+ * <p>
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.nifi.minifi;
+
+import org.apache.nifi.bundle.Bundle;
+import org.apache.nifi.bundle.BundleCoordinate;
+import org.apache.nifi.bundle.BundleDetails;
+import org.apache.nifi.util.NiFiProperties;
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+import org.w3c.dom.Document;
+import org.w3c.dom.Element;
+import org.w3c.dom.Node;
+import org.w3c.dom.NodeList;
+
+import javax.xml.transform.TransformerException;
+import java.io.IOException;
+import java.nio.file.Path;
+import java.util.Arrays;
+import java.util.HashMap;
+import java.util.List;
+import java.util.Map;
+import java.util.Set;
+import java.util.regex.Pattern;
+import java.util.stream.Collectors;
+
+public class FlowEnricher {
+
+ private static final Logger logger = LoggerFactory.getLogger(FlowEnricher.class);
+
+ private final MiNiFi minifi;
+ private final FlowParser flowParser;
+ private final NiFiProperties niFiProperties;
+
+ public static final String PROCESSOR_TAG_NAME = "processor";
+ public static final String CONTROLLER_SERVICE_TAG_NAME = "controllerService";
+ public static final String REPORTING_TASK_TAG_NAME = "reportingTask";
+
+ private static final Pattern UUID_PATTERN = Pattern.compile("[0-9A-F]{8}-[0-9A-F]{4}-[0-9A-F]{4}-[0-9A-F]{4}-[0-9A-F]{12}", Pattern.CASE_INSENSITIVE);
+
+ public FlowEnricher(MiNiFi minifi, FlowParser flowParser, NiFiProperties niFiProperties) {
+ this.minifi = minifi;
+ this.flowParser = flowParser;
+ this.niFiProperties = niFiProperties;
+ }
+
+ /**
+ * Traverse a flow document and enrich all components with bundle pairings that satisfy the constraints presented by
+ * the versions of bundles supplied on the classpath.
+ * <p>
+ * The primary nature of these relationships is comprised of a standlone instance
+ *
+ * @throws FlowEnrichmentException if the provided flow cannot be enriched
+ */
+ public void enrichFlowWithBundleInformation() throws FlowEnrichmentException {
+ final Path flowPath = niFiProperties.getFlowConfigurationFile().toPath();
+ logger.debug("Enriching generated {} with bundling information", flowPath.toAbsolutePath());
+
+ try {
+ // Prepare elements and establish initial bookkeeping to use for analysis
+ final Document flowDocument = flowParser.parse(flowPath.toAbsolutePath().toFile());
+
+ if (flowDocument == null) {
+ throw new FlowEnrichmentException("Unable to successfully parse the specified flow at " + flowPath.toAbsolutePath());
+ }
+
+ // Aggregate all dependency mappings of all component types that need to have a bundle evaluated with their
+ // associated XML information
+ final Map<String, EnrichingElementAdapter> componentEnrichingMap = new HashMap<>();
+
+ // Treat all component types as one map
+ for (String typeElementName : Arrays.asList(PROCESSOR_TAG_NAME, CONTROLLER_SERVICE_TAG_NAME, REPORTING_TASK_TAG_NAME)) {
+ final NodeList componentNodeList = flowDocument.getElementsByTagName(typeElementName);
+ final Map<String, EnrichingElementAdapter> elementIdToMetadataMap = mapComponents(componentNodeList);
+
+ componentEnrichingMap.putAll(elementIdToMetadataMap);
+
+ }
+
+ // For each of the components we have, evaluate its dependencies and apply versions
+ for (Map.Entry<String, EnrichingElementAdapter> componentIdToMetadata : componentEnrichingMap.entrySet()) {
+ // If this particular component has already had bundle information applied, skip it
+ final EnrichingElementAdapter componentToEnrich = componentIdToMetadata.getValue();
+ if (componentToEnrich.getBundleElement() != null) {
+ continue;
+ }
+
+ final String componentToEnrichClass = componentToEnrich.getComponentClass();
+ final Map<String, Bundle> componentToEnrichVersionToBundles = minifi.getBundles(componentToEnrichClass)
+ .stream()
+ .collect(Collectors.toMap(bundle -> bundle.getBundleDetails().getCoordinate().getVersion(), bundle -> bundle));
+
+ enrichComponent(componentToEnrich, componentToEnrichVersionToBundles);
+ // verify error conditions
+ }
+
+ flowParser.writeFlow(flowDocument, flowPath.toAbsolutePath());
+ } catch (IOException | TransformerException e) {
+ throw new FlowEnrichmentException("Unable to successfully automate the enrichment of the generated flow with bundle information", e);
+ }
+ }
+
+ private void enrichComponent(EnrichingElementAdapter componentToEnrich, Map<String, Bundle> componentToEnrichVersionToBundles) throws FlowEnrichmentException {
+
+ if (componentToEnrich.getBundleElement() != null) {
+ return;
+ }
+
+ BundleCoordinate enrichingBundleCoordinate = null;
+ if (!componentToEnrichVersionToBundles.isEmpty()) {
+ // If there is only one supporting bundle, choose it, otherwise carry out additional analysis
+ if (componentToEnrichVersionToBundles.size() == 1) {
+ BundleDetails enrichingBundleDetails = componentToEnrichVersionToBundles.entrySet().iterator().next().getValue().getBundleDetails();
+ enrichingBundleCoordinate = enrichingBundleDetails.getCoordinate();
+ // Adjust the bundle to reflect the values we learned from the Extension Manager
+ componentToEnrich.setBundleInformation(enrichingBundleCoordinate);
+ componentToEnrich.setDependsUponBundleCoordinate(enrichingBundleDetails.getDependencyCoordinate());
+ } else {
+
+ // mUltiple options
+ final Set<String> componentToEnrichBundleVersions = componentToEnrichVersionToBundles.values().stream()
+ .map(bundle -> bundle.getBundleDetails().getCoordinate().getVersion()).collect(Collectors.toSet());
+ final String componentToEnrichId = componentToEnrich.getComponentId();
+ String bundleVersion = componentToEnrichBundleVersions.stream().sorted().reduce((version, otherVersion) -> otherVersion).orElse(null);
+ if (bundleVersion != null) {
+ componentToEnrich.setBundleInformation(componentToEnrichVersionToBundles.get(bundleVersion).getBundleDetails().getCoordinate());
+ }
+ logger.info("Enriching {} with bundle {}", new Object[]{});
+
+ }
+ } else {
+ logger.warn("Could not find any eligible bundles for {}. Automatic start of the flow cannot be guaranteed.", componentToEnrich.getComponentClass());
+ }
+ }
+
+ /**
+ * Find dependent components for the nodes provided.
+ * <p>
+ * We do not have any other information in a generic sense other than that the properties that make use of UUIDs
+ * are eligible to be dependent components; there is no typing that a value is an ID and not just the format of a UUID.
+ * If we find a property that has a UUID as its value, we take note and create a mapping.
+ * If it is a valid ID of another component, we can use this to pair up versions, otherwise, it is ignored.
+ *
+ * @param parentNodes component nodes to map to dependent components (e.g. Processor -> Controller Service)
+ * @return a map of component IDs to their metadata about their relationship
+ */
+ protected static Map<String, EnrichingElementAdapter> mapComponents(NodeList parentNodes) {
+ final Map<String, EnrichingElementAdapter> componentReferenceMap = new HashMap<>();
+ for (int compIdx = 0; compIdx < parentNodes.getLength(); compIdx++) {
+ final Node subjComponent = parentNodes.item(compIdx);
+ final EnrichingElementAdapter enrichingElement = new EnrichingElementAdapter((Element) subjComponent);
+ componentReferenceMap.put(enrichingElement.getComponentId(), enrichingElement);
+ }
+ return componentReferenceMap;
+ }
+
+
+ /*
+ * Convenience class to aid in interacting with the XML elements pertaining to a bundle-able component
+ */
+ public static class EnrichingElementAdapter {
+ public static final String BUNDLE_ELEMENT_NAME = "bundle";
+
+ public static final String GROUP_ELEMENT_NAME = "group";
+ public static final String ARTIFACT_ELEMENT_NAME = "artifact";
+ public static final String VERSION_ELEMENT_NAME = "version";
+
+ public static final String PROPERTY_ELEMENT_NAME = "property";
+
+ // Source object
+ private Element rawElement;
+
+ // Metadata
+ private String id;
+ private String compClass;
+ private Element bundleElement;
+ private BundleCoordinate dependsUponBundleCoordinate;
+
+ public EnrichingElementAdapter(Element element) {
+ this.rawElement = element;
+ }
+
+ public String getComponentId() {
+ if (this.id == null) {
+ this.id = lookupValue("id");
+ }
+ return this.id;
+ }
+
+ public String getComponentClass() {
+ if (this.compClass == null) {
+ this.compClass = lookupValue("class");
+ }
+ return compClass;
+ }
+
+ public Element getBundleElement() {
+ if (this.bundleElement == null) {
+ // Check if the raw element has bundle information, returning it if it does
+ final NodeList bundleElements = this.rawElement.getElementsByTagName(BUNDLE_ELEMENT_NAME);
+ if (bundleElements != null && bundleElements.getLength() == 1) {
+ this.bundleElement = (Element) bundleElements.item(0);
+ }
+ }
+ return this.bundleElement;
+ }
+
+ public List<Element> getProperties() {
+ return FlowParser.getChildrenByTagName(this.rawElement, PROPERTY_ELEMENT_NAME);
+ }
+
+ private String lookupValue(String elementName) {
+ return FlowParser.getChildrenByTagName(this.rawElement, elementName).get(0).getTextContent();
+ }
+
+ public void setBundleInformation(final BundleCoordinate bundleCoordinate) {
+ // If we are handling a component that does not yet have bundle information, create a placeholder element
+ if (this.bundleElement == null) {
+ this.bundleElement = this.rawElement.getOwnerDocument().createElement(BUNDLE_ELEMENT_NAME);
+ for (String elementTag : Arrays.asList(GROUP_ELEMENT_NAME, ARTIFACT_ELEMENT_NAME, VERSION_ELEMENT_NAME)) {
+ this.bundleElement.appendChild(this.bundleElement.getOwnerDocument().createElement(elementTag));
+ }
+ this.rawElement.appendChild(this.bundleElement);
+ }
+ setBundleInformation(bundleCoordinate.getGroup(), bundleCoordinate.getId(), bundleCoordinate.getVersion());
+ }
+
+ private void setBundleInformation(String group, String artifact, String version) {
+ this.bundleElement.getElementsByTagName(GROUP_ELEMENT_NAME).item(0).setTextContent(group);
+ this.bundleElement.getElementsByTagName(ARTIFACT_ELEMENT_NAME).item(0).setTextContent(artifact);
+ this.bundleElement.getElementsByTagName(VERSION_ELEMENT_NAME).item(0).setTextContent(version);
+ }
+
+ public void setDependsUponBundleCoordinate(BundleCoordinate dependsUponBundleCoordinate) {
+ this.dependsUponBundleCoordinate = dependsUponBundleCoordinate;
+ }
+
+ private String getBundleElementPropertyContent(String elementName) {
+ return (getBundleElement() == null) ? null : FlowParser.getChildrenByTagName(this.bundleElement, elementName).get(0).getTextContent();
+ }
+
+ public String getBundleGroup() {
+ return getBundleElementPropertyContent(GROUP_ELEMENT_NAME);
+ }
+
+ public String getBundleId() {
+ return getBundleElementPropertyContent(ARTIFACT_ELEMENT_NAME);
+ }
+
+ public String getBundleVersion() {
+ return getBundleElementPropertyContent(VERSION_ELEMENT_NAME);
+ }
+ }
+}
diff --git a/minifi-nar-bundles/minifi-framework-bundle/minifi-framework/minifi-runtime/src/main/java/org/apache/nifi/minifi/FlowEnrichmentException.java b/minifi-nar-bundles/minifi-framework-bundle/minifi-framework/minifi-runtime/src/main/java/org/apache/nifi/minifi/FlowEnrichmentException.java
new file mode 100644
index 0000000..ac7c089
--- /dev/null
+++ b/minifi-nar-bundles/minifi-framework-bundle/minifi-framework/minifi-runtime/src/main/java/org/apache/nifi/minifi/FlowEnrichmentException.java
@@ -0,0 +1,39 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ * <p>
+ * http://www.apache.org/licenses/LICENSE-2.0
+ * <p>
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.nifi.minifi;
+
+public class FlowEnrichmentException extends Exception {
+ public FlowEnrichmentException() {
+ super();
+ }
+
+ public FlowEnrichmentException(String message) {
+ super(message);
+ }
+
+ public FlowEnrichmentException(String message, Throwable cause) {
+ super(message, cause);
+ }
+
+ public FlowEnrichmentException(Throwable cause) {
+ super(cause);
+ }
+
+ protected FlowEnrichmentException(String message, Throwable cause, boolean enableSuppression, boolean writableStackTrace) {
+ super(message, cause, enableSuppression, writableStackTrace);
+ }
+}
diff --git a/minifi-nar-bundles/minifi-framework-bundle/minifi-framework/minifi-runtime/src/main/java/org/apache/nifi/minifi/FlowParser.java b/minifi-nar-bundles/minifi-framework-bundle/minifi-framework/minifi-runtime/src/main/java/org/apache/nifi/minifi/FlowParser.java
new file mode 100644
index 0000000..4fb3e7d
--- /dev/null
+++ b/minifi-nar-bundles/minifi-framework-bundle/minifi-framework/minifi-runtime/src/main/java/org/apache/nifi/minifi/FlowParser.java
@@ -0,0 +1,156 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.nifi.minifi;
+
+import org.apache.commons.io.IOUtils;
+import org.apache.nifi.util.LoggingXmlParserErrorHandler;
+import org.apache.nifi.util.file.FileUtils;
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+import org.w3c.dom.Document;
+import org.w3c.dom.Element;
+import org.w3c.dom.Node;
+import org.w3c.dom.NodeList;
+import org.xml.sax.SAXException;
+
+import javax.xml.XMLConstants;
+import javax.xml.parsers.DocumentBuilder;
+import javax.xml.parsers.DocumentBuilderFactory;
+import javax.xml.parsers.ParserConfigurationException;
+import javax.xml.transform.Result;
+import javax.xml.transform.Source;
+import javax.xml.transform.TransformerException;
+import javax.xml.transform.TransformerFactory;
+import javax.xml.transform.dom.DOMSource;
+import javax.xml.transform.stream.StreamResult;
+import java.io.ByteArrayInputStream;
+import java.io.ByteArrayOutputStream;
+import java.io.File;
+import java.io.IOException;
+import java.io.InputStream;
+import java.io.OutputStream;
+import java.nio.file.Files;
+import java.nio.file.Path;
+import java.nio.file.StandardOpenOption;
+import java.util.ArrayList;
+import java.util.List;
+import java.util.zip.GZIPInputStream;
+import java.util.zip.GZIPOutputStream;
+
+/**
+ * Parses a flow from its xml.gz format into an XML {@link Document}. This class is primarily toward utilities for assisting
+ * with the handling of component bundles.
+ * <p>
+ * Provides auxiliary methods to aid in evaluating and manipulating the flow.
+ */
+public class FlowParser {
+
+ private static final Logger logger = LoggerFactory.getLogger(FlowParser.class);
+
+ /**
+ * Generates a {@link Document} from the flow configuration file provided
+ */
+ public Document parse(final File flowConfigurationFile) {
+ if (flowConfigurationFile == null) {
+ logger.debug("Flow Configuration file was null");
+ return null;
+ }
+
+ // if the flow doesn't exist or is 0 bytes, then return null
+ final Path flowPath = flowConfigurationFile.toPath();
+ try {
+ if (!Files.exists(flowPath) || Files.size(flowPath) == 0) {
+ logger.warn("Flow Configuration does not exist or was empty");
+ return null;
+ }
+ } catch (IOException e) {
+ logger.error("An error occurred determining the size of the Flow Configuration file");
+ return null;
+ }
+
+ // otherwise create the appropriate input streams to read the file
+ try (final InputStream in = Files.newInputStream(flowPath, StandardOpenOption.READ);
+ final InputStream gzipIn = new GZIPInputStream(in)) {
+
+ final byte[] flowBytes = IOUtils.toByteArray(gzipIn);
+ if (flowBytes == null || flowBytes.length == 0) {
+ logger.warn("Could not extract root group id because Flow Configuration File was empty");
+ return null;
+ }
+
+ final DocumentBuilderFactory docFactory = DocumentBuilderFactory.newInstance();
+ docFactory.setNamespaceAware(true);
+ docFactory.setFeature(XMLConstants.FEATURE_SECURE_PROCESSING, true);
+
+ // parse the flow
+ final DocumentBuilder docBuilder = docFactory.newDocumentBuilder();
+ docBuilder.setErrorHandler(new LoggingXmlParserErrorHandler("Flow Configuration", logger));
+ final Document document = docBuilder.parse(new ByteArrayInputStream(flowBytes));
+ return document;
+
+ } catch (final SAXException | ParserConfigurationException | IOException ex) {
+ logger.error("Unable to parse flow {} due to {}", new Object[]{flowPath.toAbsolutePath(), ex});
+ return null;
+ }
+ }
+
+
+ /**
+ * Writes a given XML Flow out to the specified path.
+ *
+ * @param flowDocument flowDocument of the associated XML content to write to disk
+ * @param flowXmlPath path on disk to write the flow
+ * @throws IOException if there are issues in accessing the target destination for the flow
+ * @throws TransformerException if there are issues in the xml transformation process
+ */
+ public void writeFlow(final Document flowDocument, final Path flowXmlPath) throws IOException, TransformerException {
+ final ByteArrayOutputStream outputStream = new ByteArrayOutputStream();
+ final Source xmlSource = new DOMSource(flowDocument);
+ final Result outputTarget = new StreamResult(outputStream);
+ TransformerFactory.newInstance().newTransformer().transform(xmlSource, outputTarget);
+ final InputStream is = new ByteArrayInputStream(outputStream.toByteArray());
+
+ try (final OutputStream output = Files.newOutputStream(flowXmlPath, StandardOpenOption.WRITE, StandardOpenOption.CREATE);
+ final OutputStream gzipOut = new GZIPOutputStream(output);) {
+ FileUtils.copy(is, gzipOut);
+ }
+ }
+
+ /**
+ * Finds child elements with the given tagName.
+ *
+ * @param element the parent element
+ * @param tagName the child element name to find
+ * @return a list of matching child elements
+ */
+ public static List<Element> getChildrenByTagName(final Element element, final String tagName) {
+ final List<Element> matches = new ArrayList<>();
+ final NodeList nodeList = element.getChildNodes();
+ for (int i = 0; i < nodeList.getLength(); i++) {
+ final Node node = nodeList.item(i);
+ if (!(node instanceof Element)) {
+ continue;
+ }
+ final Element child = (Element) nodeList.item(i);
+ if (child.getNodeName().equals(tagName)) {
+ matches.add(child);
+ }
+ }
+ return matches;
+ }
+
+}
diff --git a/minifi-nar-bundles/minifi-framework-bundle/minifi-framework/minifi-runtime/src/main/java/org/apache/nifi/minifi/MiNiFi.java b/minifi-nar-bundles/minifi-framework-bundle/minifi-framework/minifi-runtime/src/main/java/org/apache/nifi/minifi/MiNiFi.java
index 8ee1626..0878125 100644
--- a/minifi-nar-bundles/minifi-framework-bundle/minifi-framework/minifi-runtime/src/main/java/org/apache/nifi/minifi/MiNiFi.java
+++ b/minifi-nar-bundles/minifi-framework-bundle/minifi-framework/minifi-runtime/src/main/java/org/apache/nifi/minifi/MiNiFi.java
@@ -32,6 +32,7 @@
import java.lang.Thread.UncaughtExceptionHandler;
import java.lang.reflect.Constructor;
import java.lang.reflect.InvocationTargetException;
+import java.util.List;
import java.util.Set;
import java.util.Timer;
import java.util.TimerTask;
@@ -54,8 +55,10 @@
public static final String BOOTSTRAP_PORT_PROPERTY = "nifi.bootstrap.listen.port";
private volatile boolean shutdown = false;
+
public MiNiFi(final NiFiProperties properties)
- throws ClassNotFoundException, IOException, NoSuchMethodException, InstantiationException, IllegalAccessException, IllegalArgumentException, InvocationTargetException {
+ throws ClassNotFoundException, IOException, NoSuchMethodException, InstantiationException,
+ IllegalAccessException, IllegalArgumentException, InvocationTargetException, FlowEnrichmentException {
Thread.setDefaultUncaughtExceptionHandler(new UncaughtExceptionHandler() {
@Override
public void uncaughtException(final Thread t, final Throwable e) {
@@ -126,9 +129,14 @@
ExtensionManager.discoverExtensions(systemBundle, narBundles);
ExtensionManager.logClassLoaderMapping();
+ // Enrich the flow xml using the Extension Manager mapping
+ final FlowParser flowParser = new FlowParser();
+ final FlowEnricher flowEnricher = new FlowEnricher(this, flowParser, properties);
+ flowEnricher.enrichFlowWithBundleInformation();
+
// load the server from the framework classloader
Thread.currentThread().setContextClassLoader(frameworkClassLoader);
- Class<?> minifiServerClass= Class.forName("org.apache.nifi.minifi.MiNiFiServer", true, frameworkClassLoader);
+ Class<?> minifiServerClass = Class.forName("org.apache.nifi.minifi.MiNiFiServer", true, frameworkClassLoader);
Constructor<?> minifiServerConstructor = minifiServerClass.getConstructor(NiFiProperties.class);
final long startTime = System.nanoTime();
@@ -215,7 +223,7 @@
if (occurrences.get() < minRequiredOccurrences || occurrencesOutOfRange.get() > maxOccurrencesOutOfRange) {
logger.warn("MiNiFi has detected that this box is not responding within the expected timing interval, which may cause "
- + "Processors to be scheduled erratically. Please see the MiNiFi documentation for more information.");
+ + "Processors to be scheduled erratically. Please see the MiNiFi documentation for more information.");
}
}
};
@@ -241,4 +249,8 @@
logger.error("Failure to launch MiNiFi due to " + t, t);
}
}
+
+ protected List<Bundle> getBundles(final String bundleClass) {
+ return ExtensionManager.getBundles(bundleClass);
+ }
}
diff --git a/minifi-nar-bundles/minifi-framework-bundle/minifi-framework/minifi-runtime/src/test/java/org/apache/nifi/minifi/FlowEnricherTest.java b/minifi-nar-bundles/minifi-framework-bundle/minifi-framework/minifi-runtime/src/test/java/org/apache/nifi/minifi/FlowEnricherTest.java
new file mode 100644
index 0000000..15f493e
--- /dev/null
+++ b/minifi-nar-bundles/minifi-framework-bundle/minifi-framework/minifi-runtime/src/test/java/org/apache/nifi/minifi/FlowEnricherTest.java
@@ -0,0 +1,224 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.nifi.minifi;
+
+import org.apache.nifi.bundle.Bundle;
+import org.apache.nifi.bundle.BundleCoordinate;
+import org.apache.nifi.bundle.BundleDetails.Builder;
+import org.apache.nifi.util.NiFiProperties;
+import org.junit.Assert;
+import org.junit.Test;
+import org.mockito.ArgumentCaptor;
+import org.mockito.Mockito;
+import org.mockito.stubbing.Answer;
+import org.w3c.dom.Document;
+import org.w3c.dom.Element;
+import org.w3c.dom.NodeList;
+
+import java.io.File;
+import java.util.ArrayList;
+import java.util.Arrays;
+import java.util.List;
+
+import static org.mockito.Matchers.any;
+import static org.mockito.Matchers.anyString;
+import static org.mockito.Mockito.atLeastOnce;
+
+public class FlowEnricherTest {
+
+ private static final String TEST_FLOW_XML_PATH = "./src/test/resources/flow.xml.gz";
+ private static final File TEST_FLOW_XML_FILE = new File(TEST_FLOW_XML_PATH);
+
+ private static final String KAFKA_PROCESSOR_CLASS = "org.apache.nifi.processors.kafka.pubsub.PublishKafka_0_10";
+ private static final String TAILFILE_PROCESSOR_CLASS = "org.apache.nifi.processors.standard.TailFile";
+ private static final String SPLITCONTENT_PROCESSOR_CLASS = "org.apache.nifi.processors.standard.SplitContent";
+ private static final String SSL_CONTROLLER_SERVICE_CLASS = "org.apache.nifi.ssl.StandardSSLContextService";
+
+ private static final List<String> EXTENSION_CLASSES = Arrays.asList(
+ KAFKA_PROCESSOR_CLASS,
+ TAILFILE_PROCESSOR_CLASS,
+ SPLITCONTENT_PROCESSOR_CLASS,
+ SSL_CONTROLLER_SERVICE_CLASS);
+
+ private static ClassLoader mockClassLoader = Mockito.mock(ClassLoader.class);
+
+ @Test
+ public void enrichFlowWithBundleInformationTest() throws Exception {
+
+ /* Setup Mocks */
+ // Create a copy of the document to use with our mock
+ final FlowParser parser = new FlowParser();
+ final Document flowDocument = parser.parse(TEST_FLOW_XML_FILE);
+
+ final FlowParser mockFlowParser = Mockito.mock(FlowParser.class);
+ Mockito.when(mockFlowParser.parse(any())).thenReturn(flowDocument);
+
+ final MiNiFi minifi = Mockito.mock(MiNiFi.class);
+
+ final NiFiProperties mockProperties = Mockito.mock(NiFiProperties.class);
+ Mockito.when(mockProperties.getFlowConfigurationFile()).thenReturn(new File(TEST_FLOW_XML_PATH));
+
+ final String defaultGroup = "org.apache.nifi";
+ final String aVersion = "1.0.0";
+ final String anotherVersion = "2.0.0";
+
+ final String minifiGroup = "org.apache.nifi.minifi";
+ final String minifiStandardNarId = "minifi-standard-nar";
+ final String minifiVersion = "0.0.2";
+
+ // Standard Services API Bundles
+ final String standardSvcsId = "nifi-standard-services-api-nar";
+ final List<Bundle> standardSvcsBundles = new ArrayList<>();
+ standardSvcsBundles.add(generateBundle(defaultGroup, standardSvcsId, aVersion));
+ standardSvcsBundles.add(generateBundle(defaultGroup, standardSvcsId, anotherVersion));
+
+ // SSL Context Service Bundles - depends on nifi-standard-services-api
+ final String sslContextSvcId = " nifi-ssl-context-service-nar";
+ final List<Bundle> sslBundles = new ArrayList<>();
+ sslBundles.add(generateBundle(defaultGroup, sslContextSvcId, aVersion, standardSvcsBundles.get(0).getBundleDetails().getCoordinate()));
+ sslBundles.add(generateBundle(defaultGroup, sslContextSvcId, anotherVersion, standardSvcsBundles.get(1).getBundleDetails().getCoordinate()));
+
+ // MiNiFi Standard NAR Bundle
+ List<Bundle> minifiStdBundles = new ArrayList<>();
+ minifiStdBundles.add(generateBundle(minifiGroup, minifiStandardNarId, minifiVersion, standardSvcsBundles.get(0).getBundleDetails().getCoordinate()));
+
+ // Kafka depends on SSL
+ List<Bundle> kafkaBundles = new ArrayList<>();
+ final String kafkaId = "nifi-kafka-0-10-nar";
+ kafkaBundles.add(generateBundle(defaultGroup, kafkaId, anotherVersion, standardSvcsBundles.get(1).getBundleDetails().getCoordinate()));
+
+
+ /* If we are evaluating potential problem children components, provide a tailored bundle.
+ * Otherwise, these can be sourced from the standard NAR.
+ */
+ Mockito.when(minifi.getBundles(anyString())).thenAnswer((Answer<List<Bundle>>) invocationOnMock -> {
+ final String requestedClass = (String) invocationOnMock.getArguments()[0];
+ switch (requestedClass) {
+ case KAFKA_PROCESSOR_CLASS:
+ return kafkaBundles;
+ case SSL_CONTROLLER_SERVICE_CLASS:
+ return sslBundles;
+ default:
+ return minifiStdBundles;
+ }
+ });
+
+ /* Perform Test */
+ final FlowEnricher flowEnricher = new FlowEnricher(minifi, mockFlowParser, mockProperties);
+ flowEnricher.enrichFlowWithBundleInformation();
+
+ final ArgumentCaptor<String> bundleLookupCaptor = ArgumentCaptor.forClass(String.class);
+
+ // Inspect the document to ensure all components were enriched
+ Mockito.verify(minifi, atLeastOnce()).getBundles(bundleLookupCaptor.capture());
+ final List<String> allValues = bundleLookupCaptor.getAllValues();
+
+ // Verify each class performed a bundle look up
+ EXTENSION_CLASSES.stream().forEach(procClass -> Assert.assertTrue(allValues.contains(procClass)));
+
+ // Verify bundles are correctly applied in our flow document
+ // We have three processors, one controller service, and no reporting tasks
+ final NodeList processorNodes = flowDocument.getElementsByTagName(FlowEnricher.PROCESSOR_TAG_NAME);
+ Assert.assertEquals("Incorrect number of processors", 3, processorNodes.getLength());
+
+ final NodeList controllerServiceNodes = flowDocument.getElementsByTagName(FlowEnricher.CONTROLLER_SERVICE_TAG_NAME);
+ Assert.assertEquals("Incorrect number of controller services", 1, controllerServiceNodes.getLength());
+
+ final NodeList reportingTaskNodes = flowDocument.getElementsByTagName(FlowEnricher.REPORTING_TASK_TAG_NAME);
+ Assert.assertEquals("Incorrect number of reporting tasks", 0, reportingTaskNodes.getLength());
+
+ for (int i = 0; i < processorNodes.getLength(); i++) {
+ final Element componentElement = (Element) processorNodes.item(i);
+ Assert.assertEquals(1, componentElement.getElementsByTagName(FlowEnricher.EnrichingElementAdapter.BUNDLE_ELEMENT_NAME).getLength());
+ final FlowEnricher.EnrichingElementAdapter elementAdapter = new FlowEnricher.EnrichingElementAdapter(componentElement);
+
+ // Only our Kafka processor has a bundle outside of the standard bundle
+ if (elementAdapter.getComponentClass().equalsIgnoreCase(KAFKA_PROCESSOR_CLASS)) {
+ verifyBundleProperties(elementAdapter, defaultGroup, kafkaId, anotherVersion);
+ } else {
+ verifyBundleProperties(elementAdapter, minifiGroup, minifiStandardNarId, minifiVersion);
+ }
+ }
+
+ for (int i = 0; i < controllerServiceNodes.getLength(); i++) {
+ final Element componentElement = (Element) controllerServiceNodes.item(i);
+ Assert.assertEquals(1, componentElement.getElementsByTagName(FlowEnricher.EnrichingElementAdapter.BUNDLE_ELEMENT_NAME).getLength());
+ final FlowEnricher.EnrichingElementAdapter elementAdapter = new FlowEnricher.EnrichingElementAdapter(componentElement);
+
+ // Only our Kafka processor has a bundle outside of the standard bundle
+ if (elementAdapter.getComponentClass().equalsIgnoreCase(SSL_CONTROLLER_SERVICE_CLASS)) {
+ verifyBundleProperties(elementAdapter, defaultGroup, sslContextSvcId, anotherVersion);
+ } else {
+ Assert.fail("A controller service that was not an SSL Controller service was found.");
+ }
+ }
+
+ // Verify the updated flow was persisted
+ Mockito.verify(mockFlowParser, atLeastOnce()).writeFlow(any(), any());
+ }
+
+ /* utility methods to generate and verify test objects */
+
+ private static Bundle generateBundle(String group, String id, String version) {
+ return generateBundle(group, id, version, null, null, null);
+ }
+
+
+ private static Bundle generateBundle(String group, String id, String version, String dependencyGroup, String dependencyId, String dependencyVersion) {
+ final BundleCoordinate dependencyCoordinate =
+ (dependencyGroup != null && dependencyId != null && dependencyVersion != null)
+ ? new BundleCoordinate(dependencyGroup, dependencyId, dependencyVersion) : null;
+
+ return generateBundle(group, id, version, dependencyCoordinate);
+ }
+
+ private static Bundle generateBundle(String group, String id, String version, BundleCoordinate dependencyCoordinate) {
+
+ final File workingDirectory = new File("src/test/resources");
+
+ final BundleCoordinate bundleCoordinate = new BundleCoordinate(group, id, version);
+
+ final String buildTag = "HEAD";
+ final String buildRevision = "1";
+ final String buildBranch = "DEV";
+ final String buildTimestamp = "2017-01-01 00:00:00";
+ final String buildJdk = "JDK8";
+ final String builtBy = "somebody";
+
+ Builder bundleBuilder = new Builder()
+ .workingDir(workingDirectory)
+ .coordinate(bundleCoordinate)
+ .buildTag(buildTag)
+ .buildRevision(buildRevision)
+ .buildBranch(buildBranch)
+ .buildTimestamp(buildTimestamp)
+ .buildJdk(buildJdk)
+ .builtBy(builtBy);
+
+ if (dependencyCoordinate != null) {
+ bundleBuilder.dependencyCoordinate(dependencyCoordinate);
+ }
+ return new Bundle(bundleBuilder.build(), mockClassLoader);
+ }
+
+ private static void verifyBundleProperties(FlowEnricher.EnrichingElementAdapter capturedComponent, String expectedGroup, String expectedArtifact, String expectedVersion) {
+ Assert.assertEquals("Wrong group for component", expectedGroup, capturedComponent.getBundleGroup());
+ Assert.assertEquals("Wrong id for component", expectedArtifact, capturedComponent.getBundleId());
+ Assert.assertEquals("Wrong version for component", expectedVersion, capturedComponent.getBundleVersion());
+ }
+}
diff --git a/minifi-nar-bundles/minifi-framework-bundle/minifi-framework/minifi-runtime/src/test/java/org/apache/nifi/minifi/FlowParserTest.java b/minifi-nar-bundles/minifi-framework-bundle/minifi-framework/minifi-runtime/src/test/java/org/apache/nifi/minifi/FlowParserTest.java
new file mode 100644
index 0000000..2ffc26f
--- /dev/null
+++ b/minifi-nar-bundles/minifi-framework-bundle/minifi-framework/minifi-runtime/src/test/java/org/apache/nifi/minifi/FlowParserTest.java
@@ -0,0 +1,64 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.nifi.minifi;
+
+import org.junit.Assert;
+import org.junit.Test;
+import org.w3c.dom.Document;
+import org.w3c.dom.Element;
+import org.w3c.dom.NodeList;
+
+import java.io.File;
+import java.util.List;
+
+public class FlowParserTest {
+
+ private static final String TEST_FLOW_XML_PATH = "./src/test/resources/flow.xml.gz";
+
+ @Test
+ public void testCanParseFlow() {
+ FlowParser flowParser = new FlowParser();
+ Document parsedFlow = flowParser.parse(new File(TEST_FLOW_XML_PATH));
+
+ Assert.assertNotNull(parsedFlow);
+
+ NodeList processors = parsedFlow.getElementsByTagName(FlowEnricher.PROCESSOR_TAG_NAME);
+ NodeList controllerServices = parsedFlow.getElementsByTagName(FlowEnricher.CONTROLLER_SERVICE_TAG_NAME);
+ NodeList reportingTasks = parsedFlow.getElementsByTagName(FlowEnricher.REPORTING_TASK_TAG_NAME);
+
+ Assert.assertEquals("Verify correct number of processors", 3, processors.getLength());
+ Assert.assertEquals("Verify correct number of controller services", 1, controllerServices.getLength());
+ Assert.assertEquals("Verify correct number of reporting tasks", 0, reportingTasks.getLength());
+ }
+
+ @Test
+ public void testGetChildrenByTagName() {
+ FlowParser flowParser = new FlowParser();
+ Document parsedFlow = flowParser.parse(new File(TEST_FLOW_XML_PATH));
+ NodeList processors = parsedFlow.getElementsByTagName(FlowEnricher.PROCESSOR_TAG_NAME);
+
+ for (int i = 0; i < processors.getLength(); i++) {
+ Element processor = (Element) processors.item(i);
+
+ List<Element> classElements = FlowParser.getChildrenByTagName(processor, "class");
+ Assert.assertEquals(1, classElements.size());
+
+ List<Element> propertyElements = FlowParser.getChildrenByTagName(processor, "property");
+ Assert.assertTrue(propertyElements.size() > 1);
+ }
+ }
+}
diff --git a/minifi-nar-bundles/minifi-framework-bundle/minifi-framework/minifi-runtime/src/test/resources/flow.xml.gz b/minifi-nar-bundles/minifi-framework-bundle/minifi-framework/minifi-runtime/src/test/resources/flow.xml.gz
new file mode 100644
index 0000000..187ef54
--- /dev/null
+++ b/minifi-nar-bundles/minifi-framework-bundle/minifi-framework/minifi-runtime/src/test/resources/flow.xml.gz
Binary files differ