MINIFI-408 - This closes #99. Perform automated determination of compatible bundles for
processor startup.  This seeks to provide a MiNiFi friendly process for
handling those cases where NiFi would ghost a component and allow a user
to select a specific bundle in the UI.

MINIFI-348 - Removes validation warnings on startup as bundling
information is evaluated and applied before starting the MiNiFi process.

Signed-off-by: joewitt <joewitt@apache.org>
diff --git a/minifi-nar-bundles/minifi-framework-bundle/minifi-framework/minifi-runtime/src/main/java/org/apache/nifi/minifi/FlowEnricher.java b/minifi-nar-bundles/minifi-framework-bundle/minifi-framework/minifi-runtime/src/main/java/org/apache/nifi/minifi/FlowEnricher.java
new file mode 100644
index 0000000..8f99fd0
--- /dev/null
+++ b/minifi-nar-bundles/minifi-framework-bundle/minifi-framework/minifi-runtime/src/main/java/org/apache/nifi/minifi/FlowEnricher.java
@@ -0,0 +1,268 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ * <p>
+ * http://www.apache.org/licenses/LICENSE-2.0
+ * <p>
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.nifi.minifi;
+
+import org.apache.nifi.bundle.Bundle;
+import org.apache.nifi.bundle.BundleCoordinate;
+import org.apache.nifi.bundle.BundleDetails;
+import org.apache.nifi.util.NiFiProperties;
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+import org.w3c.dom.Document;
+import org.w3c.dom.Element;
+import org.w3c.dom.Node;
+import org.w3c.dom.NodeList;
+
+import javax.xml.transform.TransformerException;
+import java.io.IOException;
+import java.nio.file.Path;
+import java.util.Arrays;
+import java.util.HashMap;
+import java.util.List;
+import java.util.Map;
+import java.util.Set;
+import java.util.regex.Pattern;
+import java.util.stream.Collectors;
+
+public class FlowEnricher {
+
+    private static final Logger logger = LoggerFactory.getLogger(FlowEnricher.class);
+
+    private final MiNiFi minifi;
+    private final FlowParser flowParser;
+    private final NiFiProperties niFiProperties;
+
+    public static final String PROCESSOR_TAG_NAME = "processor";
+    public static final String CONTROLLER_SERVICE_TAG_NAME = "controllerService";
+    public static final String REPORTING_TASK_TAG_NAME = "reportingTask";
+
+    private static final Pattern UUID_PATTERN = Pattern.compile("[0-9A-F]{8}-[0-9A-F]{4}-[0-9A-F]{4}-[0-9A-F]{4}-[0-9A-F]{12}", Pattern.CASE_INSENSITIVE);
+
+    public FlowEnricher(MiNiFi minifi, FlowParser flowParser, NiFiProperties niFiProperties) {
+        this.minifi = minifi;
+        this.flowParser = flowParser;
+        this.niFiProperties = niFiProperties;
+    }
+
+    /**
+     * Traverse a flow document and enrich all components with bundle pairings that satisfy the constraints presented by
+     * the versions of bundles supplied on the classpath.
+     * <p>
+     * The primary nature of these relationships is comprised of a standlone instance
+     *
+     * @throws FlowEnrichmentException if the provided flow cannot be enriched
+     */
+    public void enrichFlowWithBundleInformation() throws FlowEnrichmentException {
+        final Path flowPath = niFiProperties.getFlowConfigurationFile().toPath();
+        logger.debug("Enriching generated {} with bundling information", flowPath.toAbsolutePath());
+
+        try {
+            // Prepare elements and establish initial bookkeeping to use for analysis
+            final Document flowDocument = flowParser.parse(flowPath.toAbsolutePath().toFile());
+
+            if (flowDocument == null) {
+                throw new FlowEnrichmentException("Unable to successfully parse the specified flow at " + flowPath.toAbsolutePath());
+            }
+
+            // Aggregate all dependency mappings of all component types that need to have a bundle evaluated with their
+            // associated XML information
+            final Map<String, EnrichingElementAdapter> componentEnrichingMap = new HashMap<>();
+
+            // Treat all component types as one map
+            for (String typeElementName : Arrays.asList(PROCESSOR_TAG_NAME, CONTROLLER_SERVICE_TAG_NAME, REPORTING_TASK_TAG_NAME)) {
+                final NodeList componentNodeList = flowDocument.getElementsByTagName(typeElementName);
+                final Map<String, EnrichingElementAdapter> elementIdToMetadataMap = mapComponents(componentNodeList);
+
+                componentEnrichingMap.putAll(elementIdToMetadataMap);
+
+            }
+
+            // For each of the components we have, evaluate its dependencies and apply versions
+            for (Map.Entry<String, EnrichingElementAdapter> componentIdToMetadata : componentEnrichingMap.entrySet()) {
+                // If this particular component has already had bundle information applied, skip it
+                final EnrichingElementAdapter componentToEnrich = componentIdToMetadata.getValue();
+                if (componentToEnrich.getBundleElement() != null) {
+                    continue;
+                }
+
+                final String componentToEnrichClass = componentToEnrich.getComponentClass();
+                final Map<String, Bundle> componentToEnrichVersionToBundles = minifi.getBundles(componentToEnrichClass)
+                        .stream()
+                        .collect(Collectors.toMap(bundle -> bundle.getBundleDetails().getCoordinate().getVersion(), bundle -> bundle));
+
+                enrichComponent(componentToEnrich, componentToEnrichVersionToBundles);
+                // verify error conditions
+            }
+
+            flowParser.writeFlow(flowDocument, flowPath.toAbsolutePath());
+        } catch (IOException | TransformerException e) {
+            throw new FlowEnrichmentException("Unable to successfully automate the enrichment of the generated flow with bundle information", e);
+        }
+    }
+
+    private void enrichComponent(EnrichingElementAdapter componentToEnrich, Map<String, Bundle> componentToEnrichVersionToBundles) throws FlowEnrichmentException {
+
+        if (componentToEnrich.getBundleElement() != null) {
+            return;
+        }
+
+        BundleCoordinate enrichingBundleCoordinate = null;
+        if (!componentToEnrichVersionToBundles.isEmpty()) {
+            // If there is only one supporting bundle, choose it, otherwise carry out additional analysis
+            if (componentToEnrichVersionToBundles.size() == 1) {
+                BundleDetails enrichingBundleDetails = componentToEnrichVersionToBundles.entrySet().iterator().next().getValue().getBundleDetails();
+                enrichingBundleCoordinate = enrichingBundleDetails.getCoordinate();
+                // Adjust the bundle to reflect the values we learned from the Extension Manager
+                componentToEnrich.setBundleInformation(enrichingBundleCoordinate);
+                componentToEnrich.setDependsUponBundleCoordinate(enrichingBundleDetails.getDependencyCoordinate());
+            } else {
+
+                // mUltiple options
+                final Set<String> componentToEnrichBundleVersions = componentToEnrichVersionToBundles.values().stream()
+                        .map(bundle -> bundle.getBundleDetails().getCoordinate().getVersion()).collect(Collectors.toSet());
+                final String componentToEnrichId = componentToEnrich.getComponentId();
+                String bundleVersion = componentToEnrichBundleVersions.stream().sorted().reduce((version, otherVersion) -> otherVersion).orElse(null);
+                if (bundleVersion != null) {
+                    componentToEnrich.setBundleInformation(componentToEnrichVersionToBundles.get(bundleVersion).getBundleDetails().getCoordinate());
+                }
+                logger.info("Enriching {} with bundle {}", new Object[]{});
+
+            }
+        } else {
+            logger.warn("Could not find any eligible bundles for {}.  Automatic start of the flow cannot be guaranteed.", componentToEnrich.getComponentClass());
+        }
+    }
+
+    /**
+     * Find dependent components for the nodes provided.
+     * <p>
+     * We do not have any other information in a generic sense other than that the  properties that make use of UUIDs
+     * are eligible to be dependent components; there is no typing that a value is an ID and not just the format of a UUID.
+     * If we find a property that has a UUID as its value, we take note and create a mapping.
+     * If it is a valid ID of another component, we can use this to pair up versions, otherwise, it is ignored.
+     *
+     * @param parentNodes component nodes to map to dependent components (e.g. Processor -> Controller Service)
+     * @return a map of component IDs to their metadata about their relationship
+     */
+    protected static Map<String, EnrichingElementAdapter> mapComponents(NodeList parentNodes) {
+        final Map<String, EnrichingElementAdapter> componentReferenceMap = new HashMap<>();
+        for (int compIdx = 0; compIdx < parentNodes.getLength(); compIdx++) {
+            final Node subjComponent = parentNodes.item(compIdx);
+            final EnrichingElementAdapter enrichingElement = new EnrichingElementAdapter((Element) subjComponent);
+            componentReferenceMap.put(enrichingElement.getComponentId(), enrichingElement);
+        }
+        return componentReferenceMap;
+    }
+
+
+    /*
+     * Convenience class to aid in interacting with the XML elements pertaining to a bundle-able component
+     */
+    public static class EnrichingElementAdapter {
+        public static final String BUNDLE_ELEMENT_NAME = "bundle";
+
+        public static final String GROUP_ELEMENT_NAME = "group";
+        public static final String ARTIFACT_ELEMENT_NAME = "artifact";
+        public static final String VERSION_ELEMENT_NAME = "version";
+
+        public static final String PROPERTY_ELEMENT_NAME = "property";
+
+        // Source object
+        private Element rawElement;
+
+        // Metadata
+        private String id;
+        private String compClass;
+        private Element bundleElement;
+        private BundleCoordinate dependsUponBundleCoordinate;
+
+        public EnrichingElementAdapter(Element element) {
+            this.rawElement = element;
+        }
+
+        public String getComponentId() {
+            if (this.id == null) {
+                this.id = lookupValue("id");
+            }
+            return this.id;
+        }
+
+        public String getComponentClass() {
+            if (this.compClass == null) {
+                this.compClass = lookupValue("class");
+            }
+            return compClass;
+        }
+
+        public Element getBundleElement() {
+            if (this.bundleElement == null) {
+                // Check if the raw element has bundle information, returning it if it does
+                final NodeList bundleElements = this.rawElement.getElementsByTagName(BUNDLE_ELEMENT_NAME);
+                if (bundleElements != null && bundleElements.getLength() == 1) {
+                    this.bundleElement = (Element) bundleElements.item(0);
+                }
+            }
+            return this.bundleElement;
+        }
+
+        public List<Element> getProperties() {
+            return FlowParser.getChildrenByTagName(this.rawElement, PROPERTY_ELEMENT_NAME);
+        }
+
+        private String lookupValue(String elementName) {
+            return FlowParser.getChildrenByTagName(this.rawElement, elementName).get(0).getTextContent();
+        }
+
+        public void setBundleInformation(final BundleCoordinate bundleCoordinate) {
+            // If we are handling a component that does not yet have bundle information, create a placeholder element
+            if (this.bundleElement == null) {
+                this.bundleElement = this.rawElement.getOwnerDocument().createElement(BUNDLE_ELEMENT_NAME);
+                for (String elementTag : Arrays.asList(GROUP_ELEMENT_NAME, ARTIFACT_ELEMENT_NAME, VERSION_ELEMENT_NAME)) {
+                    this.bundleElement.appendChild(this.bundleElement.getOwnerDocument().createElement(elementTag));
+                }
+                this.rawElement.appendChild(this.bundleElement);
+            }
+            setBundleInformation(bundleCoordinate.getGroup(), bundleCoordinate.getId(), bundleCoordinate.getVersion());
+        }
+
+        private void setBundleInformation(String group, String artifact, String version) {
+            this.bundleElement.getElementsByTagName(GROUP_ELEMENT_NAME).item(0).setTextContent(group);
+            this.bundleElement.getElementsByTagName(ARTIFACT_ELEMENT_NAME).item(0).setTextContent(artifact);
+            this.bundleElement.getElementsByTagName(VERSION_ELEMENT_NAME).item(0).setTextContent(version);
+        }
+
+        public void setDependsUponBundleCoordinate(BundleCoordinate dependsUponBundleCoordinate) {
+            this.dependsUponBundleCoordinate = dependsUponBundleCoordinate;
+        }
+
+        private String getBundleElementPropertyContent(String elementName) {
+            return (getBundleElement() == null) ? null : FlowParser.getChildrenByTagName(this.bundleElement, elementName).get(0).getTextContent();
+        }
+
+        public String getBundleGroup() {
+            return getBundleElementPropertyContent(GROUP_ELEMENT_NAME);
+        }
+
+        public String getBundleId() {
+            return getBundleElementPropertyContent(ARTIFACT_ELEMENT_NAME);
+        }
+
+        public String getBundleVersion() {
+            return getBundleElementPropertyContent(VERSION_ELEMENT_NAME);
+        }
+    }
+}
diff --git a/minifi-nar-bundles/minifi-framework-bundle/minifi-framework/minifi-runtime/src/main/java/org/apache/nifi/minifi/FlowEnrichmentException.java b/minifi-nar-bundles/minifi-framework-bundle/minifi-framework/minifi-runtime/src/main/java/org/apache/nifi/minifi/FlowEnrichmentException.java
new file mode 100644
index 0000000..ac7c089
--- /dev/null
+++ b/minifi-nar-bundles/minifi-framework-bundle/minifi-framework/minifi-runtime/src/main/java/org/apache/nifi/minifi/FlowEnrichmentException.java
@@ -0,0 +1,39 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ * <p>
+ * http://www.apache.org/licenses/LICENSE-2.0
+ * <p>
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.nifi.minifi;
+
+public class FlowEnrichmentException extends Exception {
+    public FlowEnrichmentException() {
+        super();
+    }
+
+    public FlowEnrichmentException(String message) {
+        super(message);
+    }
+
+    public FlowEnrichmentException(String message, Throwable cause) {
+        super(message, cause);
+    }
+
+    public FlowEnrichmentException(Throwable cause) {
+        super(cause);
+    }
+
+    protected FlowEnrichmentException(String message, Throwable cause, boolean enableSuppression, boolean writableStackTrace) {
+        super(message, cause, enableSuppression, writableStackTrace);
+    }
+}
diff --git a/minifi-nar-bundles/minifi-framework-bundle/minifi-framework/minifi-runtime/src/main/java/org/apache/nifi/minifi/FlowParser.java b/minifi-nar-bundles/minifi-framework-bundle/minifi-framework/minifi-runtime/src/main/java/org/apache/nifi/minifi/FlowParser.java
new file mode 100644
index 0000000..4fb3e7d
--- /dev/null
+++ b/minifi-nar-bundles/minifi-framework-bundle/minifi-framework/minifi-runtime/src/main/java/org/apache/nifi/minifi/FlowParser.java
@@ -0,0 +1,156 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.nifi.minifi;
+
+import org.apache.commons.io.IOUtils;
+import org.apache.nifi.util.LoggingXmlParserErrorHandler;
+import org.apache.nifi.util.file.FileUtils;
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+import org.w3c.dom.Document;
+import org.w3c.dom.Element;
+import org.w3c.dom.Node;
+import org.w3c.dom.NodeList;
+import org.xml.sax.SAXException;
+
+import javax.xml.XMLConstants;
+import javax.xml.parsers.DocumentBuilder;
+import javax.xml.parsers.DocumentBuilderFactory;
+import javax.xml.parsers.ParserConfigurationException;
+import javax.xml.transform.Result;
+import javax.xml.transform.Source;
+import javax.xml.transform.TransformerException;
+import javax.xml.transform.TransformerFactory;
+import javax.xml.transform.dom.DOMSource;
+import javax.xml.transform.stream.StreamResult;
+import java.io.ByteArrayInputStream;
+import java.io.ByteArrayOutputStream;
+import java.io.File;
+import java.io.IOException;
+import java.io.InputStream;
+import java.io.OutputStream;
+import java.nio.file.Files;
+import java.nio.file.Path;
+import java.nio.file.StandardOpenOption;
+import java.util.ArrayList;
+import java.util.List;
+import java.util.zip.GZIPInputStream;
+import java.util.zip.GZIPOutputStream;
+
+/**
+ * Parses a flow from its xml.gz format into an XML {@link Document}.  This class is primarily toward utilities for assisting
+ * with the handling of component bundles.
+ * <p>
+ * Provides auxiliary methods to aid in evaluating and manipulating the flow.
+ */
+public class FlowParser {
+
+    private static final Logger logger = LoggerFactory.getLogger(FlowParser.class);
+
+    /**
+     * Generates a {@link Document} from the flow configuration file provided
+     */
+    public Document parse(final File flowConfigurationFile) {
+        if (flowConfigurationFile == null) {
+            logger.debug("Flow Configuration file was null");
+            return null;
+        }
+
+        // if the flow doesn't exist or is 0 bytes, then return null
+        final Path flowPath = flowConfigurationFile.toPath();
+        try {
+            if (!Files.exists(flowPath) || Files.size(flowPath) == 0) {
+                logger.warn("Flow Configuration does not exist or was empty");
+                return null;
+            }
+        } catch (IOException e) {
+            logger.error("An error occurred determining the size of the Flow Configuration file");
+            return null;
+        }
+
+        // otherwise create the appropriate input streams to read the file
+        try (final InputStream in = Files.newInputStream(flowPath, StandardOpenOption.READ);
+             final InputStream gzipIn = new GZIPInputStream(in)) {
+
+            final byte[] flowBytes = IOUtils.toByteArray(gzipIn);
+            if (flowBytes == null || flowBytes.length == 0) {
+                logger.warn("Could not extract root group id because Flow Configuration File was empty");
+                return null;
+            }
+
+            final DocumentBuilderFactory docFactory = DocumentBuilderFactory.newInstance();
+            docFactory.setNamespaceAware(true);
+            docFactory.setFeature(XMLConstants.FEATURE_SECURE_PROCESSING, true);
+
+            // parse the flow
+            final DocumentBuilder docBuilder = docFactory.newDocumentBuilder();
+            docBuilder.setErrorHandler(new LoggingXmlParserErrorHandler("Flow Configuration", logger));
+            final Document document = docBuilder.parse(new ByteArrayInputStream(flowBytes));
+            return document;
+
+        } catch (final SAXException | ParserConfigurationException | IOException ex) {
+            logger.error("Unable to parse flow {} due to {}", new Object[]{flowPath.toAbsolutePath(), ex});
+            return null;
+        }
+    }
+
+
+    /**
+     * Writes a given XML Flow out to the specified path.
+     *
+     * @param flowDocument flowDocument of the associated XML content to write to disk
+     * @param flowXmlPath  path on disk to write the flow
+     * @throws IOException if there are issues in accessing the target destination for the flow
+     * @throws TransformerException if there are issues in the xml transformation process
+     */
+    public void writeFlow(final Document flowDocument, final Path flowXmlPath) throws IOException, TransformerException {
+        final ByteArrayOutputStream outputStream = new ByteArrayOutputStream();
+        final Source xmlSource = new DOMSource(flowDocument);
+        final Result outputTarget = new StreamResult(outputStream);
+        TransformerFactory.newInstance().newTransformer().transform(xmlSource, outputTarget);
+        final InputStream is = new ByteArrayInputStream(outputStream.toByteArray());
+
+        try (final OutputStream output = Files.newOutputStream(flowXmlPath, StandardOpenOption.WRITE, StandardOpenOption.CREATE);
+             final OutputStream gzipOut = new GZIPOutputStream(output);) {
+            FileUtils.copy(is, gzipOut);
+        }
+    }
+
+    /**
+     * Finds child elements with the given tagName.
+     *
+     * @param element the parent element
+     * @param tagName the child element name to find
+     * @return a list of matching child elements
+     */
+    public static List<Element> getChildrenByTagName(final Element element, final String tagName) {
+        final List<Element> matches = new ArrayList<>();
+        final NodeList nodeList = element.getChildNodes();
+        for (int i = 0; i < nodeList.getLength(); i++) {
+            final Node node = nodeList.item(i);
+            if (!(node instanceof Element)) {
+                continue;
+            }
+            final Element child = (Element) nodeList.item(i);
+            if (child.getNodeName().equals(tagName)) {
+                matches.add(child);
+            }
+        }
+        return matches;
+    }
+
+}
diff --git a/minifi-nar-bundles/minifi-framework-bundle/minifi-framework/minifi-runtime/src/main/java/org/apache/nifi/minifi/MiNiFi.java b/minifi-nar-bundles/minifi-framework-bundle/minifi-framework/minifi-runtime/src/main/java/org/apache/nifi/minifi/MiNiFi.java
index 8ee1626..0878125 100644
--- a/minifi-nar-bundles/minifi-framework-bundle/minifi-framework/minifi-runtime/src/main/java/org/apache/nifi/minifi/MiNiFi.java
+++ b/minifi-nar-bundles/minifi-framework-bundle/minifi-framework/minifi-runtime/src/main/java/org/apache/nifi/minifi/MiNiFi.java
@@ -32,6 +32,7 @@
 import java.lang.Thread.UncaughtExceptionHandler;
 import java.lang.reflect.Constructor;
 import java.lang.reflect.InvocationTargetException;
+import java.util.List;
 import java.util.Set;
 import java.util.Timer;
 import java.util.TimerTask;
@@ -54,8 +55,10 @@
     public static final String BOOTSTRAP_PORT_PROPERTY = "nifi.bootstrap.listen.port";
     private volatile boolean shutdown = false;
 
+
     public MiNiFi(final NiFiProperties properties)
-        throws ClassNotFoundException, IOException, NoSuchMethodException, InstantiationException, IllegalAccessException, IllegalArgumentException, InvocationTargetException {
+            throws ClassNotFoundException, IOException, NoSuchMethodException, InstantiationException,
+            IllegalAccessException, IllegalArgumentException, InvocationTargetException, FlowEnrichmentException {
         Thread.setDefaultUncaughtExceptionHandler(new UncaughtExceptionHandler() {
             @Override
             public void uncaughtException(final Thread t, final Throwable e) {
@@ -126,9 +129,14 @@
         ExtensionManager.discoverExtensions(systemBundle, narBundles);
         ExtensionManager.logClassLoaderMapping();
 
+        // Enrich the flow xml using the Extension Manager mapping
+        final FlowParser flowParser = new FlowParser();
+        final FlowEnricher flowEnricher = new FlowEnricher(this, flowParser, properties);
+        flowEnricher.enrichFlowWithBundleInformation();
+
         // load the server from the framework classloader
         Thread.currentThread().setContextClassLoader(frameworkClassLoader);
-        Class<?> minifiServerClass= Class.forName("org.apache.nifi.minifi.MiNiFiServer", true, frameworkClassLoader);
+        Class<?> minifiServerClass = Class.forName("org.apache.nifi.minifi.MiNiFiServer", true, frameworkClassLoader);
         Constructor<?> minifiServerConstructor = minifiServerClass.getConstructor(NiFiProperties.class);
 
         final long startTime = System.nanoTime();
@@ -215,7 +223,7 @@
 
                 if (occurrences.get() < minRequiredOccurrences || occurrencesOutOfRange.get() > maxOccurrencesOutOfRange) {
                     logger.warn("MiNiFi has detected that this box is not responding within the expected timing interval, which may cause "
-                        + "Processors to be scheduled erratically. Please see the MiNiFi documentation for more information.");
+                            + "Processors to be scheduled erratically. Please see the MiNiFi documentation for more information.");
                 }
             }
         };
@@ -241,4 +249,8 @@
             logger.error("Failure to launch MiNiFi due to " + t, t);
         }
     }
+
+    protected List<Bundle> getBundles(final String bundleClass) {
+        return ExtensionManager.getBundles(bundleClass);
+    }
 }
diff --git a/minifi-nar-bundles/minifi-framework-bundle/minifi-framework/minifi-runtime/src/test/java/org/apache/nifi/minifi/FlowEnricherTest.java b/minifi-nar-bundles/minifi-framework-bundle/minifi-framework/minifi-runtime/src/test/java/org/apache/nifi/minifi/FlowEnricherTest.java
new file mode 100644
index 0000000..15f493e
--- /dev/null
+++ b/minifi-nar-bundles/minifi-framework-bundle/minifi-framework/minifi-runtime/src/test/java/org/apache/nifi/minifi/FlowEnricherTest.java
@@ -0,0 +1,224 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.nifi.minifi;
+
+import org.apache.nifi.bundle.Bundle;
+import org.apache.nifi.bundle.BundleCoordinate;
+import org.apache.nifi.bundle.BundleDetails.Builder;
+import org.apache.nifi.util.NiFiProperties;
+import org.junit.Assert;
+import org.junit.Test;
+import org.mockito.ArgumentCaptor;
+import org.mockito.Mockito;
+import org.mockito.stubbing.Answer;
+import org.w3c.dom.Document;
+import org.w3c.dom.Element;
+import org.w3c.dom.NodeList;
+
+import java.io.File;
+import java.util.ArrayList;
+import java.util.Arrays;
+import java.util.List;
+
+import static org.mockito.Matchers.any;
+import static org.mockito.Matchers.anyString;
+import static org.mockito.Mockito.atLeastOnce;
+
+public class FlowEnricherTest {
+
+    private static final String TEST_FLOW_XML_PATH = "./src/test/resources/flow.xml.gz";
+    private static final File TEST_FLOW_XML_FILE = new File(TEST_FLOW_XML_PATH);
+
+    private static final String KAFKA_PROCESSOR_CLASS = "org.apache.nifi.processors.kafka.pubsub.PublishKafka_0_10";
+    private static final String TAILFILE_PROCESSOR_CLASS = "org.apache.nifi.processors.standard.TailFile";
+    private static final String SPLITCONTENT_PROCESSOR_CLASS = "org.apache.nifi.processors.standard.SplitContent";
+    private static final String SSL_CONTROLLER_SERVICE_CLASS = "org.apache.nifi.ssl.StandardSSLContextService";
+
+    private static final List<String> EXTENSION_CLASSES = Arrays.asList(
+            KAFKA_PROCESSOR_CLASS,
+            TAILFILE_PROCESSOR_CLASS,
+            SPLITCONTENT_PROCESSOR_CLASS,
+            SSL_CONTROLLER_SERVICE_CLASS);
+
+    private static ClassLoader mockClassLoader = Mockito.mock(ClassLoader.class);
+
+    @Test
+    public void enrichFlowWithBundleInformationTest() throws Exception {
+
+        /* Setup Mocks */
+        // Create a copy of the document to use with our mock
+        final FlowParser parser = new FlowParser();
+        final Document flowDocument = parser.parse(TEST_FLOW_XML_FILE);
+
+        final FlowParser mockFlowParser = Mockito.mock(FlowParser.class);
+        Mockito.when(mockFlowParser.parse(any())).thenReturn(flowDocument);
+
+        final MiNiFi minifi = Mockito.mock(MiNiFi.class);
+
+        final NiFiProperties mockProperties = Mockito.mock(NiFiProperties.class);
+        Mockito.when(mockProperties.getFlowConfigurationFile()).thenReturn(new File(TEST_FLOW_XML_PATH));
+
+        final String defaultGroup = "org.apache.nifi";
+        final String aVersion = "1.0.0";
+        final String anotherVersion = "2.0.0";
+
+        final String minifiGroup = "org.apache.nifi.minifi";
+        final String minifiStandardNarId = "minifi-standard-nar";
+        final String minifiVersion = "0.0.2";
+
+        // Standard Services API Bundles
+        final String standardSvcsId = "nifi-standard-services-api-nar";
+        final List<Bundle> standardSvcsBundles = new ArrayList<>();
+        standardSvcsBundles.add(generateBundle(defaultGroup, standardSvcsId, aVersion));
+        standardSvcsBundles.add(generateBundle(defaultGroup, standardSvcsId, anotherVersion));
+
+        // SSL Context Service Bundles - depends on nifi-standard-services-api
+        final String sslContextSvcId = " nifi-ssl-context-service-nar";
+        final List<Bundle> sslBundles = new ArrayList<>();
+        sslBundles.add(generateBundle(defaultGroup, sslContextSvcId, aVersion, standardSvcsBundles.get(0).getBundleDetails().getCoordinate()));
+        sslBundles.add(generateBundle(defaultGroup, sslContextSvcId, anotherVersion, standardSvcsBundles.get(1).getBundleDetails().getCoordinate()));
+
+        // MiNiFi Standard NAR Bundle
+        List<Bundle> minifiStdBundles = new ArrayList<>();
+        minifiStdBundles.add(generateBundle(minifiGroup, minifiStandardNarId, minifiVersion, standardSvcsBundles.get(0).getBundleDetails().getCoordinate()));
+
+        // Kafka depends on SSL
+        List<Bundle> kafkaBundles = new ArrayList<>();
+        final String kafkaId = "nifi-kafka-0-10-nar";
+        kafkaBundles.add(generateBundle(defaultGroup, kafkaId, anotherVersion, standardSvcsBundles.get(1).getBundleDetails().getCoordinate()));
+
+
+       /* If we are evaluating potential problem children components, provide a tailored bundle.
+        * Otherwise, these can be sourced from the standard NAR.
+        */
+        Mockito.when(minifi.getBundles(anyString())).thenAnswer((Answer<List<Bundle>>) invocationOnMock -> {
+            final String requestedClass = (String) invocationOnMock.getArguments()[0];
+            switch (requestedClass) {
+                case KAFKA_PROCESSOR_CLASS:
+                    return kafkaBundles;
+                case SSL_CONTROLLER_SERVICE_CLASS:
+                    return sslBundles;
+                default:
+                    return minifiStdBundles;
+            }
+        });
+
+        /* Perform Test */
+        final FlowEnricher flowEnricher = new FlowEnricher(minifi, mockFlowParser, mockProperties);
+        flowEnricher.enrichFlowWithBundleInformation();
+
+        final ArgumentCaptor<String> bundleLookupCaptor = ArgumentCaptor.forClass(String.class);
+
+        // Inspect the document to ensure all components were enriched
+        Mockito.verify(minifi, atLeastOnce()).getBundles(bundleLookupCaptor.capture());
+        final List<String> allValues = bundleLookupCaptor.getAllValues();
+
+        // Verify each class performed a bundle look up
+        EXTENSION_CLASSES.stream().forEach(procClass -> Assert.assertTrue(allValues.contains(procClass)));
+
+        // Verify bundles are correctly applied in our flow document
+        // We have three processors, one controller service, and no reporting tasks
+        final NodeList processorNodes = flowDocument.getElementsByTagName(FlowEnricher.PROCESSOR_TAG_NAME);
+        Assert.assertEquals("Incorrect number of processors", 3, processorNodes.getLength());
+
+        final NodeList controllerServiceNodes = flowDocument.getElementsByTagName(FlowEnricher.CONTROLLER_SERVICE_TAG_NAME);
+        Assert.assertEquals("Incorrect number of controller services", 1, controllerServiceNodes.getLength());
+
+        final NodeList reportingTaskNodes = flowDocument.getElementsByTagName(FlowEnricher.REPORTING_TASK_TAG_NAME);
+        Assert.assertEquals("Incorrect number of reporting tasks", 0, reportingTaskNodes.getLength());
+
+        for (int i = 0; i < processorNodes.getLength(); i++) {
+            final Element componentElement = (Element) processorNodes.item(i);
+            Assert.assertEquals(1, componentElement.getElementsByTagName(FlowEnricher.EnrichingElementAdapter.BUNDLE_ELEMENT_NAME).getLength());
+            final FlowEnricher.EnrichingElementAdapter elementAdapter = new FlowEnricher.EnrichingElementAdapter(componentElement);
+
+            // Only our Kafka processor has a bundle outside of the standard bundle
+            if (elementAdapter.getComponentClass().equalsIgnoreCase(KAFKA_PROCESSOR_CLASS)) {
+                verifyBundleProperties(elementAdapter, defaultGroup, kafkaId, anotherVersion);
+            } else {
+                verifyBundleProperties(elementAdapter, minifiGroup, minifiStandardNarId, minifiVersion);
+            }
+        }
+
+        for (int i = 0; i < controllerServiceNodes.getLength(); i++) {
+            final Element componentElement = (Element) controllerServiceNodes.item(i);
+            Assert.assertEquals(1, componentElement.getElementsByTagName(FlowEnricher.EnrichingElementAdapter.BUNDLE_ELEMENT_NAME).getLength());
+            final FlowEnricher.EnrichingElementAdapter elementAdapter = new FlowEnricher.EnrichingElementAdapter(componentElement);
+
+            // Only our Kafka processor has a bundle outside of the standard bundle
+            if (elementAdapter.getComponentClass().equalsIgnoreCase(SSL_CONTROLLER_SERVICE_CLASS)) {
+                verifyBundleProperties(elementAdapter, defaultGroup, sslContextSvcId, anotherVersion);
+            } else {
+                Assert.fail("A controller service that was not an SSL Controller service was found.");
+            }
+        }
+
+        // Verify the updated flow was persisted
+        Mockito.verify(mockFlowParser, atLeastOnce()).writeFlow(any(), any());
+    }
+
+    /* utility methods to generate and verify test objects */
+
+    private static Bundle generateBundle(String group, String id, String version) {
+        return generateBundle(group, id, version, null, null, null);
+    }
+
+
+    private static Bundle generateBundle(String group, String id, String version, String dependencyGroup, String dependencyId, String dependencyVersion) {
+        final BundleCoordinate dependencyCoordinate =
+                (dependencyGroup != null && dependencyId != null && dependencyVersion != null)
+                        ? new BundleCoordinate(dependencyGroup, dependencyId, dependencyVersion) : null;
+
+        return generateBundle(group, id, version, dependencyCoordinate);
+    }
+
+    private static Bundle generateBundle(String group, String id, String version, BundleCoordinate dependencyCoordinate) {
+
+        final File workingDirectory = new File("src/test/resources");
+
+        final BundleCoordinate bundleCoordinate = new BundleCoordinate(group, id, version);
+
+        final String buildTag = "HEAD";
+        final String buildRevision = "1";
+        final String buildBranch = "DEV";
+        final String buildTimestamp = "2017-01-01 00:00:00";
+        final String buildJdk = "JDK8";
+        final String builtBy = "somebody";
+
+        Builder bundleBuilder = new Builder()
+                .workingDir(workingDirectory)
+                .coordinate(bundleCoordinate)
+                .buildTag(buildTag)
+                .buildRevision(buildRevision)
+                .buildBranch(buildBranch)
+                .buildTimestamp(buildTimestamp)
+                .buildJdk(buildJdk)
+                .builtBy(builtBy);
+
+        if (dependencyCoordinate != null) {
+            bundleBuilder.dependencyCoordinate(dependencyCoordinate);
+        }
+        return new Bundle(bundleBuilder.build(), mockClassLoader);
+    }
+
+    private static void verifyBundleProperties(FlowEnricher.EnrichingElementAdapter capturedComponent, String expectedGroup, String expectedArtifact, String expectedVersion) {
+        Assert.assertEquals("Wrong group for component", expectedGroup, capturedComponent.getBundleGroup());
+        Assert.assertEquals("Wrong id for component", expectedArtifact, capturedComponent.getBundleId());
+        Assert.assertEquals("Wrong version for component", expectedVersion, capturedComponent.getBundleVersion());
+    }
+}
diff --git a/minifi-nar-bundles/minifi-framework-bundle/minifi-framework/minifi-runtime/src/test/java/org/apache/nifi/minifi/FlowParserTest.java b/minifi-nar-bundles/minifi-framework-bundle/minifi-framework/minifi-runtime/src/test/java/org/apache/nifi/minifi/FlowParserTest.java
new file mode 100644
index 0000000..2ffc26f
--- /dev/null
+++ b/minifi-nar-bundles/minifi-framework-bundle/minifi-framework/minifi-runtime/src/test/java/org/apache/nifi/minifi/FlowParserTest.java
@@ -0,0 +1,64 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.nifi.minifi;
+
+import org.junit.Assert;
+import org.junit.Test;
+import org.w3c.dom.Document;
+import org.w3c.dom.Element;
+import org.w3c.dom.NodeList;
+
+import java.io.File;
+import java.util.List;
+
+public class FlowParserTest {
+
+    private static final String TEST_FLOW_XML_PATH = "./src/test/resources/flow.xml.gz";
+
+    @Test
+    public void testCanParseFlow() {
+        FlowParser flowParser = new FlowParser();
+        Document parsedFlow = flowParser.parse(new File(TEST_FLOW_XML_PATH));
+
+        Assert.assertNotNull(parsedFlow);
+
+        NodeList processors = parsedFlow.getElementsByTagName(FlowEnricher.PROCESSOR_TAG_NAME);
+        NodeList controllerServices = parsedFlow.getElementsByTagName(FlowEnricher.CONTROLLER_SERVICE_TAG_NAME);
+        NodeList reportingTasks = parsedFlow.getElementsByTagName(FlowEnricher.REPORTING_TASK_TAG_NAME);
+
+        Assert.assertEquals("Verify correct number of processors", 3, processors.getLength());
+        Assert.assertEquals("Verify correct number of controller services", 1, controllerServices.getLength());
+        Assert.assertEquals("Verify correct number of reporting tasks", 0, reportingTasks.getLength());
+    }
+
+    @Test
+    public void testGetChildrenByTagName() {
+        FlowParser flowParser = new FlowParser();
+        Document parsedFlow = flowParser.parse(new File(TEST_FLOW_XML_PATH));
+        NodeList processors = parsedFlow.getElementsByTagName(FlowEnricher.PROCESSOR_TAG_NAME);
+
+        for (int i = 0; i < processors.getLength(); i++) {
+            Element processor = (Element) processors.item(i);
+
+            List<Element> classElements = FlowParser.getChildrenByTagName(processor, "class");
+            Assert.assertEquals(1, classElements.size());
+
+            List<Element> propertyElements = FlowParser.getChildrenByTagName(processor, "property");
+            Assert.assertTrue(propertyElements.size() > 1);
+        }
+    }
+}
diff --git a/minifi-nar-bundles/minifi-framework-bundle/minifi-framework/minifi-runtime/src/test/resources/flow.xml.gz b/minifi-nar-bundles/minifi-framework-bundle/minifi-framework/minifi-runtime/src/test/resources/flow.xml.gz
new file mode 100644
index 0000000..187ef54
--- /dev/null
+++ b/minifi-nar-bundles/minifi-framework-bundle/minifi-framework/minifi-runtime/src/test/resources/flow.xml.gz
Binary files differ