MINIFI-255 This closes #84. Update codebase to incorporate NiFi 1.2.0 handling of NARs and required dependencies
MINIFI-256 Support Site to Site attaching to an interface in config

Signed-off-by: joewitt <joewitt@apache.org>
diff --git a/NOTICE b/NOTICE
index c1edd0c..6ee6948 100644
--- a/NOTICE
+++ b/NOTICE
@@ -3,3 +3,5 @@
 
 This product includes software developed at
 The Apache Software Foundation (http://www.apache.org/).
+
+This includes derived works from the Apache NiFi (ASLv2) project including numerous source files from the core framework API.  
diff --git a/minifi-assembly/pom.xml b/minifi-assembly/pom.xml
index 1ce48a7..f28ee89 100644
--- a/minifi-assembly/pom.xml
+++ b/minifi-assembly/pom.xml
@@ -187,6 +187,10 @@
             <groupId>org.apache.nifi</groupId>
             <artifactId>nifi-persistent-provenance-repository</artifactId>
         </dependency>
+        <dependency>
+            <groupId>org.apache.nifi</groupId>
+            <artifactId>nifi-distributed-cache-client-service-api</artifactId>
+        </dependency>
 
         <!-- Provided in NiFi so must include here too -->
         <dependency>
diff --git a/minifi-assembly/src/main/assembly/dependencies.xml b/minifi-assembly/src/main/assembly/dependencies.xml
index d77b098..0605f23 100644
--- a/minifi-assembly/src/main/assembly/dependencies.xml
+++ b/minifi-assembly/src/main/assembly/dependencies.xml
@@ -36,7 +36,6 @@
                 <exclude>minifi-bootstrap</exclude>
                 <exclude>minifi-resources</exclude>
                 <!-- Filter items introduced via transitive dependencies that are provided in associated NARs -->
-                <exclude>zookeeper</exclude>
                 <exclude>spring-aop</exclude>
                 <exclude>spring-context</exclude>
                 <exclude>spring-beans</exclude>
@@ -46,7 +45,6 @@
                 <exclude>jaxb-impl</exclude>
                 <exclude>mail</exclude>
                 <exclude>log4j</exclude>
-                <exclude>lucene-analyzers-common</exclude>
                 <exclude>lucene-queryparser</exclude>
                 <exclude>commons-net</exclude>
                 <exclude>spring-context</exclude>
diff --git a/minifi-bootstrap/src/main/java/org/apache/nifi/minifi/bootstrap/util/ConfigTransformer.java b/minifi-bootstrap/src/main/java/org/apache/nifi/minifi/bootstrap/util/ConfigTransformer.java
index edd4a36..e01c256 100644
--- a/minifi-bootstrap/src/main/java/org/apache/nifi/minifi/bootstrap/util/ConfigTransformer.java
+++ b/minifi-bootstrap/src/main/java/org/apache/nifi/minifi/bootstrap/util/ConfigTransformer.java
@@ -77,7 +77,7 @@
 
 public final class ConfigTransformer {
     // Underlying version of NIFI will be using
-    public static final String NIFI_VERSION = "1.1.0";
+    public static final String NIFI_VERSION = "1.2.0";
     public static final String ROOT_GROUP = "Root-Group";
     public static final String DEFAULT_PROV_REPORTING_TASK_CLASS = "org.apache.nifi.reporting.SiteToSiteProvenanceReportingTask";
     public static final String NIFI_VERSION_KEY = "nifi.version";
@@ -576,6 +576,7 @@
             for (RemotePortSchema remoteOutputPortSchema : remoteOutputPorts) {
                 addRemoteGroupPort(element, remoteOutputPortSchema, "outputPort");
             }
+            addTextElement(element, "networkInterface", remoteProcessGroupProperties.getLocalNetworkInterface());
 
             parentElement.appendChild(element);
         } catch (Exception e) {
diff --git a/minifi-bootstrap/src/test/resources/config.yml b/minifi-bootstrap/src/test/resources/config.yml
index 3b20902..e18d74a 100644
--- a/minifi-bootstrap/src/test/resources/config.yml
+++ b/minifi-bootstrap/src/test/resources/config.yml
@@ -87,6 +87,7 @@
   url: https://localhost:8090/nifi
   comment: ''
   timeout: 30 secs
+  local network interface: eth1
   yield period: 10 sec
   Input Ports:
   - id: 8644cbcc-a45c-40e0-964d-5e536e2ada61
diff --git a/minifi-commons/minifi-commons-schema/src/main/java/org/apache/nifi/minifi/commons/schema/RemoteProcessGroupSchema.java b/minifi-commons/minifi-commons-schema/src/main/java/org/apache/nifi/minifi/commons/schema/RemoteProcessGroupSchema.java
index 5a7593a..c22deff 100644
--- a/minifi-commons/minifi-commons-schema/src/main/java/org/apache/nifi/minifi/commons/schema/RemoteProcessGroupSchema.java
+++ b/minifi-commons/minifi-commons-schema/src/main/java/org/apache/nifi/minifi/commons/schema/RemoteProcessGroupSchema.java
@@ -38,6 +38,7 @@
     public static final String PROXY_PORT_KEY = "proxy port";
     public static final String PROXY_USER_KEY = "proxy user";
     public static final String PROXY_PASSWORD_KEY = "proxy password";
+    public static final String LOCAL_NETWORK_INTERFACE_KEY = "local network interface";
 
     public static final String EXPECTED_PROXY_HOST_IF_PROXY_PORT = "expected " + PROXY_HOST_KEY + " to be set if " + PROXY_PORT_KEY + " is";
     public static final String EXPECTED_PROXY_HOST_IF_PROXY_USER = "expected " + PROXY_HOST_KEY + " to be set if " + PROXY_USER_KEY + " is";
@@ -60,6 +61,7 @@
     public static final Integer DEFAULT_PROXY_PORT = null;
     public static final String DEFAULT_PROXY_USER = "";
     public static final String DEFAULT_PROXY_PASSWORD = "";
+    public static final String DEFAULT_NETWORK_INTERFACE = "";
 
     private String url;
     private List<RemotePortSchema> inputPorts;
@@ -73,6 +75,7 @@
     private Integer proxyPort = DEFAULT_PROXY_PORT;
     private String proxyUser = DEFAULT_PROXY_USER;
     private String proxyPassword = DEFAULT_PROXY_PASSWORD;
+    private String localNetworkInterface = DEFAULT_NETWORK_INTERFACE;
 
     public RemoteProcessGroupSchema(Map map) {
         super(map, "RemoteProcessGroup(id: {id}, name: {name})");
@@ -98,6 +101,8 @@
             addValidationIssue(TRANSPORT_PROTOCOL_KEY, wrapperName, "it must be either 'RAW' or 'HTTP' but is '" + transportProtocol + "'");
         }
 
+        localNetworkInterface = getOptionalKeyAsType(map, LOCAL_NETWORK_INTERFACE_KEY, String.class, wrapperName, DEFAULT_NETWORK_INTERFACE);
+
         proxyHost = getOptionalKeyAsType(map, PROXY_HOST_KEY, String.class, wrapperName, DEFAULT_PROXY_HOST);
         proxyPort = getOptionalKeyAsType(map, PROXY_PORT_KEY, Integer.class, wrapperName, DEFAULT_PROXY_PORT);
         proxyUser = getOptionalKeyAsType(map, PROXY_USER_KEY, String.class, wrapperName, DEFAULT_PROXY_USER);
@@ -121,6 +126,7 @@
         } else if (StringUtil.isNullOrEmpty(proxyPassword)) {
             addValidationIssue(PROXY_USER_KEY, wrapperName, EXPECTED_PROXY_PASSWORD_IF_PROXY_USER);
         }
+
     }
 
     @Override
@@ -135,6 +141,7 @@
         result.put(PROXY_PORT_KEY, proxyPort == null ? "" : proxyPort);
         result.put(PROXY_USER_KEY, proxyUser);
         result.put(PROXY_PASSWORD_KEY, proxyPassword);
+        result.put(LOCAL_NETWORK_INTERFACE_KEY, localNetworkInterface);
         putListIfNotNull(result, INPUT_PORTS_KEY, inputPorts);
         putListIfNotNull(result, OUTPUT_PORTS_KEY, outputPorts);
         return result;
@@ -203,4 +210,12 @@
     public String getProxyPassword() {
         return proxyPassword;
     }
+
+    public void setLocalNetworkInterface(String LocalNetworkInterface) {
+        this.localNetworkInterface = LocalNetworkInterface;
+    }
+
+    public String getLocalNetworkInterface() {
+        return localNetworkInterface;
+    }
 }
diff --git a/minifi-commons/minifi-commons-schema/src/test/java/org/apache/nifi/minifi/commons/schema/RemoteProcessGroupSchemaTest.java b/minifi-commons/minifi-commons-schema/src/test/java/org/apache/nifi/minifi/commons/schema/RemoteProcessGroupSchemaTest.java
index 2339f40..d1a6174 100644
--- a/minifi-commons/minifi-commons-schema/src/test/java/org/apache/nifi/minifi/commons/schema/RemoteProcessGroupSchemaTest.java
+++ b/minifi-commons/minifi-commons-schema/src/test/java/org/apache/nifi/minifi/commons/schema/RemoteProcessGroupSchemaTest.java
@@ -73,6 +73,18 @@
     }
 
     @Test
+    public void testLocalNetworkInterface() {
+        Map<String, Object> map = new HashMap<>();
+        map.put(CommonPropertyKeys.INPUT_PORTS_KEY, Arrays.asList(createPortSchema("f94d2469-39f8-4f07-a0d8-acd9396f639e", "testName", ConfigSchema.TOP_LEVEL_NAME).toMap()));
+        map.put(RemoteProcessGroupSchema.URL_KEY, "http://localhost:8080/nifi");
+        map.put(CommonPropertyKeys.ID_KEY, "a58d2fab-7efe-4cb7-8224-12a60bd8003d");
+
+        map.put(RemoteProcessGroupSchema.LOCAL_NETWORK_INTERFACE_KEY, "eth1");
+        RemoteProcessGroupSchema first =  new RemoteProcessGroupSchema(map);
+        validateIssuesNumMatches(0,first);
+        assertEquals(first.getLocalNetworkInterface(), "eth1");
+    }
+    @Test
     public void testProxySettings() {
         Map<String, Object> map = new HashMap<>();
         map.put(RemoteProcessGroupSchema.PROXY_PORT_KEY, 1234);
diff --git a/minifi-nar-bundles/minifi-framework-bundle/minifi-framework/minifi-nar-utils/src/main/java/org/apache/nifi/init/ConfigurableComponentInitializer.java b/minifi-nar-bundles/minifi-framework-bundle/minifi-framework/minifi-nar-utils/src/main/java/org/apache/nifi/init/ConfigurableComponentInitializer.java
new file mode 100644
index 0000000..9a1149c
--- /dev/null
+++ b/minifi-nar-bundles/minifi-framework-bundle/minifi-framework/minifi-nar-utils/src/main/java/org/apache/nifi/init/ConfigurableComponentInitializer.java
@@ -0,0 +1,45 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.nifi.init;
+
+import org.apache.nifi.components.ConfigurableComponent;
+import org.apache.nifi.reporting.InitializationException;
+
+/**
+ * An interface for initializing and tearing down a ConfigurableComponent. It is up to the
+ * implementer to call "init" so that you can call
+ * ConfigurableComponent.getPropertyDescriptors()
+ *
+ */
+public interface ConfigurableComponentInitializer {
+
+    /**
+     * Initializes a configurable component to the point that you can call
+     * getPropertyDescriptors() on it
+     *
+     * @param component the component to initialize
+     * @throws InitializationException if the component could not be initialized
+     */
+    void initialize(ConfigurableComponent component) throws InitializationException;
+
+    /**
+     * Calls the lifecycle methods that should be called when a flow is shutdown.
+     *
+     * @param component the component to initialize
+     */
+    void teardown(ConfigurableComponent component);
+}
diff --git a/minifi-nar-bundles/minifi-framework-bundle/minifi-framework/minifi-nar-utils/src/main/java/org/apache/nifi/init/ConfigurableComponentInitializerFactory.java b/minifi-nar-bundles/minifi-framework-bundle/minifi-framework/minifi-nar-utils/src/main/java/org/apache/nifi/init/ConfigurableComponentInitializerFactory.java
new file mode 100644
index 0000000..f6ab922
--- /dev/null
+++ b/minifi-nar-bundles/minifi-framework-bundle/minifi-framework/minifi-nar-utils/src/main/java/org/apache/nifi/init/ConfigurableComponentInitializerFactory.java
@@ -0,0 +1,44 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.nifi.init;
+
+import org.apache.nifi.components.ConfigurableComponent;
+import org.apache.nifi.controller.ControllerService;
+import org.apache.nifi.processor.Processor;
+import org.apache.nifi.reporting.ReportingTask;
+
+public class ConfigurableComponentInitializerFactory {
+
+    /**
+     * Returns a ConfigurableComponentInitializer for the type of component.
+     * Currently Processor, ControllerService and ReportingTask are supported.
+     *
+     * @param componentClass the class that requires a ConfigurableComponentInitializer
+     * @return a ConfigurableComponentInitializer capable of initializing that specific type of class
+     */
+    public static ConfigurableComponentInitializer createComponentInitializer(final Class<? extends ConfigurableComponent> componentClass) {
+        if (Processor.class.isAssignableFrom(componentClass)) {
+            return new ProcessorInitializer();
+        } else if (ControllerService.class.isAssignableFrom(componentClass)) {
+            return new ControllerServiceInitializer();
+        } else if (ReportingTask.class.isAssignableFrom(componentClass)) {
+            return new ReportingTaskingInitializer();
+        }
+
+        return null;
+    }
+}
diff --git a/minifi-nar-bundles/minifi-framework-bundle/minifi-framework/minifi-nar-utils/src/main/java/org/apache/nifi/init/ControllerServiceInitializer.java b/minifi-nar-bundles/minifi-framework-bundle/minifi-framework/minifi-nar-utils/src/main/java/org/apache/nifi/init/ControllerServiceInitializer.java
new file mode 100644
index 0000000..21b107f
--- /dev/null
+++ b/minifi-nar-bundles/minifi-framework-bundle/minifi-framework/minifi-nar-utils/src/main/java/org/apache/nifi/init/ControllerServiceInitializer.java
@@ -0,0 +1,59 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.nifi.init;
+
+import org.apache.nifi.annotation.lifecycle.OnShutdown;
+import org.apache.nifi.components.ConfigurableComponent;
+import org.apache.nifi.controller.ControllerService;
+import org.apache.nifi.controller.ControllerServiceInitializationContext;
+import org.apache.nifi.logging.ComponentLog;
+import org.apache.nifi.mock.MockComponentLogger;
+import org.apache.nifi.mock.MockConfigurationContext;
+import org.apache.nifi.mock.MockControllerServiceInitializationContext;
+import org.apache.nifi.nar.ExtensionManager;
+import org.apache.nifi.nar.NarCloseable;
+import org.apache.nifi.reporting.InitializationException;
+
+/**
+ * Initializes a ControllerService using a MockControllerServiceInitializationContext
+ *
+ *
+ */
+public class ControllerServiceInitializer implements ConfigurableComponentInitializer {
+
+    @Override
+    public void initialize(ConfigurableComponent component) throws InitializationException {
+        ControllerService controllerService = (ControllerService) component;
+        ControllerServiceInitializationContext context = new MockControllerServiceInitializationContext();
+        try (NarCloseable narCloseable = NarCloseable.withComponentNarLoader(component.getClass(), context.getIdentifier())) {
+            controllerService.initialize(context);
+        }
+    }
+
+    @Override
+    public void teardown(ConfigurableComponent component) {
+        try (NarCloseable narCloseable = NarCloseable.withComponentNarLoader(component.getClass(), component.getIdentifier())) {
+            ControllerService controllerService = (ControllerService) component;
+
+            final ComponentLog logger = new MockComponentLogger();
+            final MockConfigurationContext context = new MockConfigurationContext();
+            ReflectionUtils.quietlyInvokeMethodsWithAnnotation(OnShutdown.class, controllerService, logger, context);
+        } finally {
+            ExtensionManager.removeInstanceClassLoader(component.getIdentifier());
+        }
+    }
+}
diff --git a/minifi-nar-bundles/minifi-framework-bundle/minifi-framework/minifi-nar-utils/src/main/java/org/apache/nifi/init/ProcessorInitializer.java b/minifi-nar-bundles/minifi-framework-bundle/minifi-framework/minifi-nar-utils/src/main/java/org/apache/nifi/init/ProcessorInitializer.java
new file mode 100644
index 0000000..06fdead
--- /dev/null
+++ b/minifi-nar-bundles/minifi-framework-bundle/minifi-framework/minifi-nar-utils/src/main/java/org/apache/nifi/init/ProcessorInitializer.java
@@ -0,0 +1,58 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.nifi.init;
+
+import org.apache.nifi.annotation.lifecycle.OnShutdown;
+import org.apache.nifi.components.ConfigurableComponent;
+import org.apache.nifi.logging.ComponentLog;
+import org.apache.nifi.mock.MockComponentLogger;
+import org.apache.nifi.mock.MockProcessContext;
+import org.apache.nifi.mock.MockProcessorInitializationContext;
+import org.apache.nifi.nar.ExtensionManager;
+import org.apache.nifi.nar.NarCloseable;
+import org.apache.nifi.processor.Processor;
+import org.apache.nifi.processor.ProcessorInitializationContext;
+
+/**
+ * Initializes a Processor using a MockProcessorInitializationContext
+ *
+ *
+ */
+public class ProcessorInitializer implements ConfigurableComponentInitializer {
+
+    @Override
+    public void initialize(ConfigurableComponent component) {
+        Processor processor = (Processor) component;
+        ProcessorInitializationContext initializationContext = new MockProcessorInitializationContext();
+        try (NarCloseable narCloseable = NarCloseable.withComponentNarLoader(component.getClass(), initializationContext.getIdentifier())) {
+            processor.initialize(initializationContext);
+        }
+    }
+
+    @Override
+    public void teardown(ConfigurableComponent component) {
+        Processor processor = (Processor) component;
+        try (NarCloseable narCloseable = NarCloseable.withComponentNarLoader(component.getClass(), component.getIdentifier())) {
+
+            final ComponentLog logger = new MockComponentLogger();
+            final MockProcessContext context = new MockProcessContext();
+            ReflectionUtils.quietlyInvokeMethodsWithAnnotation(OnShutdown.class, processor, logger, context);
+        } finally {
+            ExtensionManager.removeInstanceClassLoader(component.getIdentifier());
+        }
+    }
+}
diff --git a/minifi-nar-bundles/minifi-framework-bundle/minifi-framework/minifi-nar-utils/src/main/java/org/apache/nifi/init/ReflectionUtils.java b/minifi-nar-bundles/minifi-framework-bundle/minifi-framework/minifi-nar-utils/src/main/java/org/apache/nifi/init/ReflectionUtils.java
new file mode 100644
index 0000000..22420bd
--- /dev/null
+++ b/minifi-nar-bundles/minifi-framework-bundle/minifi-framework/minifi-nar-utils/src/main/java/org/apache/nifi/init/ReflectionUtils.java
@@ -0,0 +1,133 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.nifi.init;
+
+import org.apache.nifi.logging.ComponentLog;
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+
+import java.lang.annotation.Annotation;
+import java.lang.reflect.InvocationTargetException;
+import java.lang.reflect.Method;
+
+/**
+ * This class is a copy of org.apache.nifi.util.ReflectionUtils. Ultimately the
+ * documentation generation component should be moved to a place where it can
+ * depend on this directly instead of copying it in.
+ *
+ *
+ */
+public class ReflectionUtils {
+
+    private final static Logger LOG = LoggerFactory.getLogger(ReflectionUtils.class);
+
+    /**
+     * Invokes all methods on the given instance that have been annotated with
+     * the given annotation. If the signature of the method that is defined in
+     * <code>instance</code> uses 1 or more parameters, those parameters must be
+     * specified by the <code>args</code> parameter. However, if more arguments
+     * are supplied by the <code>args</code> parameter than needed, the extra
+     * arguments will be ignored.
+     *
+     * @param annotation annotation
+     * @param instance instance
+     * @param logger the ComponentLog to use for logging any errors. If null,
+     * will use own logger, but that will not generate bulletins or easily tie
+     * to the Processor's log messages.
+     * @param args args
+     * @return <code>true</code> if all appropriate methods were invoked and
+     * returned without throwing an Exception, <code>false</code> if one of the
+     * methods threw an Exception or could not be invoked; if <code>false</code>
+     * is returned, an error will have been logged.
+     */
+    public static boolean quietlyInvokeMethodsWithAnnotation(
+            final Class<? extends Annotation> annotation, final Object instance, final ComponentLog logger, final Object... args) {
+
+        for (final Method method : instance.getClass().getMethods()) {
+            if (method.isAnnotationPresent(annotation)) {
+
+                final boolean isAccessible = method.isAccessible();
+                method.setAccessible(true);
+
+                try {
+                    final Class<?>[] argumentTypes = method.getParameterTypes();
+                    if (argumentTypes.length > args.length) {
+                        if (logger == null) {
+                            LOG.error("Unable to invoke method {} on {} because method expects {} parameters but only {} were given",
+                                    new Object[]{method.getName(), instance, argumentTypes.length, args.length});
+                        } else {
+                            logger.error("Unable to invoke method {} on {} because method expects {} parameters but only {} were given",
+                                    new Object[]{method.getName(), instance, argumentTypes.length, args.length});
+                        }
+
+                        return false;
+                    }
+
+                    for (int i = 0; i < argumentTypes.length; i++) {
+                        final Class<?> argType = argumentTypes[i];
+                        if (!argType.isAssignableFrom(args[i].getClass())) {
+                            if (logger == null) {
+                                LOG.error("Unable to invoke method {} on {} because method parameter {} is expected to be of type {} but argument passed was of type {}",
+                                        new Object[]{method.getName(), instance, i, argType, args[i].getClass()});
+                            } else {
+                                logger.error("Unable to invoke method {} on {} because method parameter {} is expected to be of type {} but argument passed was of type {}",
+                                        new Object[]{method.getName(), instance, i, argType, args[i].getClass()});
+                            }
+
+                            return false;
+                        }
+                    }
+
+                    try {
+                        if (argumentTypes.length == args.length) {
+                            method.invoke(instance, args);
+                        } else {
+                            final Object[] argsToPass = new Object[argumentTypes.length];
+                            for (int i = 0; i < argsToPass.length; i++) {
+                                argsToPass[i] = args[i];
+                            }
+
+                            method.invoke(instance, argsToPass);
+                        }
+                    } catch (final InvocationTargetException ite) {
+                        if (logger == null) {
+                            LOG.error("Unable to invoke method {} on {} due to {}", new Object[]{method.getName(), instance, ite.getCause()});
+                            LOG.error("", ite.getCause());
+                        } else {
+                            logger.error("Unable to invoke method {} on {} due to {}", new Object[]{method.getName(), instance, ite.getCause()});
+                        }
+                    } catch (final IllegalAccessException | IllegalArgumentException t) {
+                        if (logger == null) {
+                            LOG.error("Unable to invoke method {} on {} due to {}", new Object[]{method.getName(), instance, t});
+                            LOG.error("", t);
+                        } else {
+                            logger.error("Unable to invoke method {} on {} due to {}", new Object[]{method.getName(), instance, t});
+                        }
+
+                        return false;
+                    }
+                } finally {
+                    if (!isAccessible) {
+                        method.setAccessible(false);
+                    }
+                }
+            }
+        }
+
+        return true;
+    }
+}
diff --git a/minifi-nar-bundles/minifi-framework-bundle/minifi-framework/minifi-nar-utils/src/main/java/org/apache/nifi/init/ReportingTaskingInitializer.java b/minifi-nar-bundles/minifi-framework-bundle/minifi-framework/minifi-nar-utils/src/main/java/org/apache/nifi/init/ReportingTaskingInitializer.java
new file mode 100644
index 0000000..f0f495d
--- /dev/null
+++ b/minifi-nar-bundles/minifi-framework-bundle/minifi-framework/minifi-nar-utils/src/main/java/org/apache/nifi/init/ReportingTaskingInitializer.java
@@ -0,0 +1,57 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.nifi.init;
+
+import org.apache.nifi.annotation.lifecycle.OnShutdown;
+import org.apache.nifi.components.ConfigurableComponent;
+import org.apache.nifi.mock.MockComponentLogger;
+import org.apache.nifi.mock.MockConfigurationContext;
+import org.apache.nifi.mock.MockReportingInitializationContext;
+import org.apache.nifi.nar.ExtensionManager;
+import org.apache.nifi.nar.NarCloseable;
+import org.apache.nifi.reporting.InitializationException;
+import org.apache.nifi.reporting.ReportingInitializationContext;
+import org.apache.nifi.reporting.ReportingTask;
+
+/**
+ * Initializes a ReportingTask using a MockReportingInitializationContext;
+ *
+ *
+ */
+public class ReportingTaskingInitializer implements ConfigurableComponentInitializer {
+
+    @Override
+    public void initialize(ConfigurableComponent component) throws InitializationException {
+        ReportingTask reportingTask = (ReportingTask) component;
+        ReportingInitializationContext context = new MockReportingInitializationContext();
+        try (NarCloseable narCloseable = NarCloseable.withComponentNarLoader(component.getClass(), context.getIdentifier())) {
+            reportingTask.initialize(context);
+        }
+    }
+
+    @Override
+    public void teardown(ConfigurableComponent component) {
+        ReportingTask reportingTask = (ReportingTask) component;
+        try (NarCloseable narCloseable = NarCloseable.withComponentNarLoader(component.getClass(), component.getIdentifier())) {
+
+            final MockConfigurationContext context = new MockConfigurationContext();
+            ReflectionUtils.quietlyInvokeMethodsWithAnnotation(OnShutdown.class, reportingTask, new MockComponentLogger(), context);
+        } finally {
+            ExtensionManager.removeInstanceClassLoader(component.getIdentifier());
+        }
+    }
+}
diff --git a/minifi-nar-bundles/minifi-framework-bundle/minifi-framework/minifi-nar-utils/src/main/java/org/apache/nifi/mock/MockComponentLogger.java b/minifi-nar-bundles/minifi-framework-bundle/minifi-framework/minifi-nar-utils/src/main/java/org/apache/nifi/mock/MockComponentLogger.java
new file mode 100644
index 0000000..920d7eb
--- /dev/null
+++ b/minifi-nar-bundles/minifi-framework-bundle/minifi-framework/minifi-nar-utils/src/main/java/org/apache/nifi/mock/MockComponentLogger.java
@@ -0,0 +1,258 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.nifi.mock;
+
+import org.apache.nifi.logging.ComponentLog;
+import org.apache.nifi.logging.LogLevel;
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+
+/**
+ * Stubs out the functionality of a ComponentLog so that it can
+ * be used during initialization of a component.
+ *
+ */
+public class MockComponentLogger implements ComponentLog {
+
+    private static final Logger logger = LoggerFactory
+            .getLogger(MockComponentLogger.class);
+
+    @Override
+    public void warn(String msg, Throwable t) {
+        logger.warn(msg, t);
+    }
+
+    @Override
+    public void warn(String msg, Object[] os) {
+        logger.warn(msg, os);
+    }
+
+    @Override
+    public void warn(String msg, Object[] os, Throwable t) {
+        logger.warn(msg, os);
+        logger.warn("", t);
+    }
+
+    @Override
+    public void warn(String msg) {
+        logger.warn(msg);
+    }
+
+    @Override
+    public void trace(String msg, Throwable t) {
+        logger.trace(msg, t);
+    }
+
+    @Override
+    public void trace(String msg, Object[] os) {
+        logger.trace(msg, os);
+    }
+
+    @Override
+    public void trace(String msg) {
+        logger.trace(msg);
+    }
+
+    @Override
+    public void trace(String msg, Object[] os, Throwable t) {
+        logger.trace(msg, os);
+        logger.trace("", t);
+    }
+
+    @Override
+    public boolean isWarnEnabled() {
+        return logger.isWarnEnabled();
+    }
+
+    @Override
+    public boolean isTraceEnabled() {
+        return logger.isTraceEnabled();
+    }
+
+    @Override
+    public boolean isInfoEnabled() {
+        return logger.isInfoEnabled();
+    }
+
+    @Override
+    public boolean isErrorEnabled() {
+        return logger.isErrorEnabled();
+    }
+
+    @Override
+    public boolean isDebugEnabled() {
+        return logger.isDebugEnabled();
+    }
+
+    @Override
+    public void info(String msg, Throwable t) {
+        logger.info(msg, t);
+    }
+
+    @Override
+    public void info(String msg, Object[] os) {
+        logger.info(msg, os);
+    }
+
+    @Override
+    public void info(String msg) {
+        logger.info(msg);
+
+    }
+
+    @Override
+    public void info(String msg, Object[] os, Throwable t) {
+        logger.trace(msg, os);
+        logger.trace("", t);
+
+    }
+
+    @Override
+    public String getName() {
+        return logger.getName();
+    }
+
+    @Override
+    public void error(String msg, Throwable t) {
+        logger.error(msg, t);
+    }
+
+    @Override
+    public void error(String msg, Object[] os) {
+        logger.error(msg, os);
+    }
+
+    @Override
+    public void error(String msg) {
+        logger.error(msg);
+    }
+
+    @Override
+    public void error(String msg, Object[] os, Throwable t) {
+        logger.error(msg, os);
+        logger.error("", t);
+    }
+
+    @Override
+    public void debug(String msg, Throwable t) {
+        logger.debug(msg, t);
+    }
+
+    @Override
+    public void debug(String msg, Object[] os) {
+        logger.debug(msg, os);
+    }
+
+    @Override
+    public void debug(String msg, Object[] os, Throwable t) {
+        logger.debug(msg, os);
+        logger.debug("", t);
+    }
+
+    @Override
+    public void debug(String msg) {
+        logger.debug(msg);
+    }
+
+    @Override
+    public void log(LogLevel level, String msg, Throwable t) {
+        switch (level) {
+            case DEBUG:
+                debug(msg, t);
+                break;
+            case ERROR:
+            case FATAL:
+                error(msg, t);
+                break;
+            case INFO:
+                info(msg, t);
+                break;
+            case TRACE:
+                trace(msg, t);
+                break;
+            case WARN:
+                warn(msg, t);
+                break;
+        }
+    }
+
+    @Override
+    public void log(LogLevel level, String msg, Object[] os) {
+        switch (level) {
+            case DEBUG:
+                debug(msg, os);
+                break;
+            case ERROR:
+            case FATAL:
+                error(msg, os);
+                break;
+            case INFO:
+                info(msg, os);
+                break;
+            case TRACE:
+                trace(msg, os);
+                break;
+            case WARN:
+                warn(msg, os);
+                break;
+        }
+    }
+
+    @Override
+    public void log(LogLevel level, String msg) {
+        switch (level) {
+            case DEBUG:
+                debug(msg);
+                break;
+            case ERROR:
+            case FATAL:
+                error(msg);
+                break;
+            case INFO:
+                info(msg);
+                break;
+            case TRACE:
+                trace(msg);
+                break;
+            case WARN:
+                warn(msg);
+                break;
+        }
+    }
+
+    @Override
+    public void log(LogLevel level, String msg, Object[] os, Throwable t) {
+        switch (level) {
+            case DEBUG:
+                debug(msg, os, t);
+                break;
+            case ERROR:
+            case FATAL:
+                error(msg, os, t);
+                break;
+            case INFO:
+                info(msg, os, t);
+                break;
+            case TRACE:
+                trace(msg, os, t);
+                break;
+            case WARN:
+                warn(msg, os, t);
+                break;
+        }
+    }
+}
diff --git a/minifi-nar-bundles/minifi-framework-bundle/minifi-framework/minifi-nar-utils/src/main/java/org/apache/nifi/mock/MockConfigurationContext.java b/minifi-nar-bundles/minifi-framework-bundle/minifi-framework/minifi-nar-utils/src/main/java/org/apache/nifi/mock/MockConfigurationContext.java
new file mode 100644
index 0000000..d1e73fb
--- /dev/null
+++ b/minifi-nar-bundles/minifi-framework-bundle/minifi-framework/minifi-nar-utils/src/main/java/org/apache/nifi/mock/MockConfigurationContext.java
@@ -0,0 +1,48 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.nifi.mock;
+
+import org.apache.nifi.components.PropertyDescriptor;
+import org.apache.nifi.components.PropertyValue;
+import org.apache.nifi.controller.ConfigurationContext;
+
+import java.util.Collections;
+import java.util.Map;
+import java.util.concurrent.TimeUnit;
+
+public class MockConfigurationContext implements ConfigurationContext {
+
+    @Override
+    public PropertyValue getProperty(PropertyDescriptor property) {
+        return null;
+    }
+
+    @Override
+    public Map<PropertyDescriptor, String> getProperties() {
+        return Collections.emptyMap();
+    }
+
+    @Override
+    public String getSchedulingPeriod() {
+        return "0 secs";
+    }
+
+    @Override
+    public Long getSchedulingPeriod(final TimeUnit timeUnit) {
+        return 0L;
+    }
+}
diff --git a/minifi-nar-bundles/minifi-framework-bundle/minifi-framework/minifi-nar-utils/src/main/java/org/apache/nifi/mock/MockControllerServiceInitializationContext.java b/minifi-nar-bundles/minifi-framework-bundle/minifi-framework/minifi-nar-utils/src/main/java/org/apache/nifi/mock/MockControllerServiceInitializationContext.java
new file mode 100644
index 0000000..b111ad2
--- /dev/null
+++ b/minifi-nar-bundles/minifi-framework-bundle/minifi-framework/minifi-nar-utils/src/main/java/org/apache/nifi/mock/MockControllerServiceInitializationContext.java
@@ -0,0 +1,68 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.nifi.mock;
+
+import org.apache.nifi.components.state.StateManager;
+import org.apache.nifi.controller.ControllerServiceInitializationContext;
+import org.apache.nifi.controller.ControllerServiceLookup;
+import org.apache.nifi.logging.ComponentLog;
+
+import java.io.File;
+
+/**
+ * A Mock ControllerServiceInitializationContext so that ControllerServices can
+ * be initialized for the purpose of generating documentation.
+ *
+ *
+ */
+public class MockControllerServiceInitializationContext implements ControllerServiceInitializationContext {
+
+    @Override
+    public String getIdentifier() {
+        return "mock-controller-service";
+    }
+
+    @Override
+    public ControllerServiceLookup getControllerServiceLookup() {
+        return new MockControllerServiceLookup();
+    }
+
+    @Override
+    public ComponentLog getLogger() {
+        return new MockComponentLogger();
+    }
+
+    @Override
+    public StateManager getStateManager() {
+        return null;
+    }
+
+    @Override
+    public String getKerberosServicePrincipal() {
+        return null;
+    }
+
+    @Override
+    public File getKerberosServiceKeytab() {
+        return null;
+    }
+
+    @Override
+    public File getKerberosConfigurationFile() {
+        return null;
+    }
+}
\ No newline at end of file
diff --git a/minifi-nar-bundles/minifi-framework-bundle/minifi-framework/minifi-nar-utils/src/main/java/org/apache/nifi/mock/MockControllerServiceLookup.java b/minifi-nar-bundles/minifi-framework-bundle/minifi-framework/minifi-nar-utils/src/main/java/org/apache/nifi/mock/MockControllerServiceLookup.java
new file mode 100644
index 0000000..5307ac4
--- /dev/null
+++ b/minifi-nar-bundles/minifi-framework-bundle/minifi-framework/minifi-nar-utils/src/main/java/org/apache/nifi/mock/MockControllerServiceLookup.java
@@ -0,0 +1,63 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.nifi.mock;
+
+import org.apache.nifi.controller.ControllerService;
+import org.apache.nifi.controller.ControllerServiceLookup;
+
+import java.util.Collections;
+import java.util.Set;
+
+/**
+ * A Mock ControllerServiceLookup that can be used so that
+ * ConfigurableComponents can be initialized for the purpose of generating
+ * documentation
+ *
+ *
+ */
+public class MockControllerServiceLookup implements ControllerServiceLookup {
+
+    @Override
+    public ControllerService getControllerService(final String serviceIdentifier) {
+        return null;
+    }
+
+    @Override
+    public boolean isControllerServiceEnabled(final String serviceIdentifier) {
+        return false;
+    }
+
+    @Override
+    public boolean isControllerServiceEnabled(final ControllerService service) {
+        return false;
+    }
+
+    @Override
+    public Set<String> getControllerServiceIdentifiers(final Class<? extends ControllerService> serviceType) throws IllegalArgumentException {
+        return Collections.emptySet();
+    }
+
+    @Override
+    public boolean isControllerServiceEnabling(final String serviceIdentifier) {
+        return false;
+    }
+
+    @Override
+    public String getControllerServiceName(final String serviceIdentifier) {
+        return serviceIdentifier;
+    }
+}
diff --git a/minifi-nar-bundles/minifi-framework-bundle/minifi-framework/minifi-nar-utils/src/main/java/org/apache/nifi/mock/MockNodeTypeProvider.java b/minifi-nar-bundles/minifi-framework-bundle/minifi-framework/minifi-nar-utils/src/main/java/org/apache/nifi/mock/MockNodeTypeProvider.java
new file mode 100644
index 0000000..61390e1
--- /dev/null
+++ b/minifi-nar-bundles/minifi-framework-bundle/minifi-framework/minifi-nar-utils/src/main/java/org/apache/nifi/mock/MockNodeTypeProvider.java
@@ -0,0 +1,40 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.nifi.mock;
+
+import org.apache.nifi.controller.NodeTypeProvider;
+
+/**
+ * A Mock NodeTypeProvider that can be used so that
+ * ConfigurableComponents can be initialized for the purpose of generating
+ * documentation
+ *
+ *
+ */
+public class MockNodeTypeProvider implements NodeTypeProvider {
+
+    @Override
+    public boolean isClustered() {
+        return false;
+    }
+
+    @Override
+    public boolean isPrimary() {
+        return false;
+    }
+
+}
diff --git a/minifi-nar-bundles/minifi-framework-bundle/minifi-framework/minifi-nar-utils/src/main/java/org/apache/nifi/mock/MockProcessContext.java b/minifi-nar-bundles/minifi-framework-bundle/minifi-framework/minifi-nar-utils/src/main/java/org/apache/nifi/mock/MockProcessContext.java
new file mode 100644
index 0000000..cf2e2cf
--- /dev/null
+++ b/minifi-nar-bundles/minifi-framework-bundle/minifi-framework/minifi-nar-utils/src/main/java/org/apache/nifi/mock/MockProcessContext.java
@@ -0,0 +1,116 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.nifi.mock;
+
+import org.apache.nifi.components.PropertyDescriptor;
+import org.apache.nifi.components.PropertyValue;
+import org.apache.nifi.components.state.StateManager;
+import org.apache.nifi.controller.ControllerServiceLookup;
+import org.apache.nifi.processor.ProcessContext;
+import org.apache.nifi.processor.Relationship;
+
+import java.util.Collections;
+import java.util.Map;
+import java.util.Set;
+
+public class MockProcessContext implements ProcessContext {
+
+    @Override
+    public PropertyValue getProperty(PropertyDescriptor descriptor) {
+        return null;
+    }
+
+    @Override
+    public PropertyValue getProperty(String propertyName) {
+        return null;
+    }
+
+    @Override
+    public PropertyValue newPropertyValue(String rawValue) {
+        return null;
+    }
+
+    @Override
+    public void yield() {
+
+    }
+
+    @Override
+    public int getMaxConcurrentTasks() {
+        return 0;
+    }
+
+    @Override
+    public String getAnnotationData() {
+        return "";
+    }
+
+    @Override
+    public Map<PropertyDescriptor, String> getProperties() {
+        return Collections.emptyMap();
+    }
+
+    @Override
+    public String encrypt(String unencrypted) {
+        return unencrypted;
+    }
+
+    @Override
+    public String decrypt(String encrypted) {
+        return encrypted;
+    }
+
+    @Override
+    public ControllerServiceLookup getControllerServiceLookup() {
+        return new MockControllerServiceLookup();
+    }
+
+    @Override
+    public Set<Relationship> getAvailableRelationships() {
+        return Collections.emptySet();
+    }
+
+    @Override
+    public boolean hasIncomingConnection() {
+        return true;
+    }
+
+    @Override
+    public boolean hasNonLoopConnection() {
+        return true;
+    }
+
+    @Override
+    public boolean hasConnection(Relationship relationship) {
+        return false;
+    }
+
+    @Override
+    public boolean isExpressionLanguagePresent(PropertyDescriptor property) {
+        return false;
+    }
+
+    @Override
+    public StateManager getStateManager() {
+        return null;
+    }
+
+    @Override
+    public String getName() {
+        return null;
+    }
+}
\ No newline at end of file
diff --git a/minifi-nar-bundles/minifi-framework-bundle/minifi-framework/minifi-nar-utils/src/main/java/org/apache/nifi/mock/MockProcessorInitializationContext.java b/minifi-nar-bundles/minifi-framework-bundle/minifi-framework/minifi-nar-utils/src/main/java/org/apache/nifi/mock/MockProcessorInitializationContext.java
new file mode 100644
index 0000000..d9320b2
--- /dev/null
+++ b/minifi-nar-bundles/minifi-framework-bundle/minifi-framework/minifi-nar-utils/src/main/java/org/apache/nifi/mock/MockProcessorInitializationContext.java
@@ -0,0 +1,68 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.nifi.mock;
+
+import org.apache.nifi.controller.ControllerServiceLookup;
+import org.apache.nifi.controller.NodeTypeProvider;
+import org.apache.nifi.logging.ComponentLog;
+import org.apache.nifi.processor.ProcessorInitializationContext;
+
+import java.io.File;
+
+/**
+ * A Mock ProcessorInitializationContext that can be used so that Processors can
+ * be initialized for the purpose of generating documentation.
+ *
+ *
+ */
+public class MockProcessorInitializationContext implements ProcessorInitializationContext {
+
+    @Override
+    public String getIdentifier() {
+        return "mock-processor";
+    }
+
+    @Override
+    public ComponentLog getLogger() {
+        return new MockComponentLogger();
+    }
+
+    @Override
+    public ControllerServiceLookup getControllerServiceLookup() {
+        return new MockControllerServiceLookup();
+    }
+
+    @Override
+    public NodeTypeProvider getNodeTypeProvider() {
+        return new MockNodeTypeProvider();
+    }
+
+    @Override
+    public String getKerberosServicePrincipal() {
+        return null;
+    }
+
+    @Override
+    public File getKerberosServiceKeytab() {
+        return null;
+    }
+
+    @Override
+    public File getKerberosConfigurationFile() {
+        return null;
+    }
+}
diff --git a/minifi-nar-bundles/minifi-framework-bundle/minifi-framework/minifi-nar-utils/src/main/java/org/apache/nifi/mock/MockReportingInitializationContext.java b/minifi-nar-bundles/minifi-framework-bundle/minifi-framework/minifi-nar-utils/src/main/java/org/apache/nifi/mock/MockReportingInitializationContext.java
new file mode 100644
index 0000000..630c657
--- /dev/null
+++ b/minifi-nar-bundles/minifi-framework-bundle/minifi-framework/minifi-nar-utils/src/main/java/org/apache/nifi/mock/MockReportingInitializationContext.java
@@ -0,0 +1,83 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.nifi.mock;
+
+import org.apache.nifi.controller.ControllerServiceLookup;
+import org.apache.nifi.logging.ComponentLog;
+import org.apache.nifi.reporting.ReportingInitializationContext;
+import org.apache.nifi.scheduling.SchedulingStrategy;
+
+import java.io.File;
+import java.util.concurrent.TimeUnit;
+
+/**
+ * A Mock ReportingInitializationContext that can be used to initialize a
+ * ReportingTask for the purposes of documentation generation.
+ *
+ */
+public class MockReportingInitializationContext implements ReportingInitializationContext {
+
+    @Override
+    public String getIdentifier() {
+        return "mock-reporting-task";
+    }
+
+    @Override
+    public String getName() {
+        return "";
+    }
+
+    @Override
+    public long getSchedulingPeriod(TimeUnit timeUnit) {
+        return 0;
+    }
+
+    @Override
+    public ControllerServiceLookup getControllerServiceLookup() {
+        return new MockControllerServiceLookup();
+    }
+
+    @Override
+    public String getSchedulingPeriod() {
+        return "";
+    }
+
+    @Override
+    public SchedulingStrategy getSchedulingStrategy() {
+        return SchedulingStrategy.TIMER_DRIVEN;
+    }
+
+    @Override
+    public ComponentLog getLogger() {
+        return new MockComponentLogger();
+    }
+
+    @Override
+    public String getKerberosServicePrincipal() {
+        return null;
+    }
+
+    @Override
+    public File getKerberosServiceKeytab() {
+        return null;
+    }
+
+    @Override
+    public File getKerberosConfigurationFile() {
+        return null;
+    }
+}
diff --git a/minifi-nar-bundles/minifi-framework-bundle/minifi-framework/minifi-nar-utils/src/main/java/org/apache/nifi/nar/ExtensionManager.java b/minifi-nar-bundles/minifi-framework-bundle/minifi-framework/minifi-nar-utils/src/main/java/org/apache/nifi/nar/ExtensionManager.java
index 9fd9e66..b80207b 100644
--- a/minifi-nar-bundles/minifi-framework-bundle/minifi-framework/minifi-nar-utils/src/main/java/org/apache/nifi/nar/ExtensionManager.java
+++ b/minifi-nar-bundles/minifi-framework-bundle/minifi-framework/minifi-nar-utils/src/main/java/org/apache/nifi/nar/ExtensionManager.java
@@ -19,14 +19,22 @@
 import org.apache.nifi.annotation.behavior.RequiresInstanceClassLoading;
 import org.apache.nifi.authentication.LoginIdentityProvider;
 import org.apache.nifi.authorization.Authorizer;
+import org.apache.nifi.bundle.Bundle;
+import org.apache.nifi.bundle.BundleCoordinate;
+import org.apache.nifi.components.ConfigurableComponent;
+import org.apache.nifi.components.PropertyDescriptor;
+import org.apache.nifi.components.state.StateProvider;
 import org.apache.nifi.controller.ControllerService;
 import org.apache.nifi.controller.repository.ContentRepository;
 import org.apache.nifi.controller.repository.FlowFileRepository;
 import org.apache.nifi.controller.repository.FlowFileSwapManager;
 import org.apache.nifi.controller.status.history.ComponentStatusRepository;
 import org.apache.nifi.flowfile.FlowFilePrioritizer;
+import org.apache.nifi.init.ConfigurableComponentInitializer;
+import org.apache.nifi.init.ConfigurableComponentInitializerFactory;
 import org.apache.nifi.processor.Processor;
 import org.apache.nifi.provenance.ProvenanceRepository;
+import org.apache.nifi.reporting.InitializationException;
 import org.apache.nifi.reporting.ReportingTask;
 import org.apache.nifi.util.StringUtils;
 import org.slf4j.Logger;
@@ -35,13 +43,17 @@
 import java.io.IOException;
 import java.net.URL;
 import java.net.URLClassLoader;
+import java.util.ArrayList;
 import java.util.Collections;
 import java.util.HashMap;
 import java.util.HashSet;
+import java.util.LinkedHashSet;
+import java.util.List;
 import java.util.Map;
 import java.util.ServiceLoader;
 import java.util.Set;
 import java.util.concurrent.ConcurrentHashMap;
+import java.util.stream.Collectors;
 
 /**
  * Scans through the classpath to load all FlowFileProcessors, FlowFileComparators, and ReportingTasks using the service provider API and running through all classloaders (root, NARs).
@@ -56,10 +68,13 @@
     // Maps a service definition (interface) to those classes that implement the interface
     private static final Map<Class, Set<Class>> definitionMap = new HashMap<>();
 
-    private static final Map<String, ClassLoader> extensionClassloaderLookup = new HashMap<>();
+    private static final Map<String, List<Bundle>> classNameBundleLookup = new HashMap<>();
+    private static final Map<BundleCoordinate, Bundle> bundleCoordinateBundleLookup = new HashMap<>();
+    private static final Map<ClassLoader, Bundle> classLoaderBundleLookup = new HashMap<>();
+    private static final Map<String, ConfigurableComponent> tempComponentLookup = new HashMap<>();
 
-    private static final Set<String> requiresInstanceClassLoading = new HashSet<>();
-    private static final Map<String, ClassLoader> instanceClassloaderLookup = new ConcurrentHashMap<>();
+    private static final Map<String, Class<?>> requiresInstanceClassLoading = new HashMap<>();
+    private static final Map<String, InstanceClassLoader> instanceClassloaderLookup = new ConcurrentHashMap<>();
 
     static {
         definitionMap.put(Processor.class, new HashSet<>());
@@ -73,28 +88,33 @@
         definitionMap.put(FlowFileRepository.class, new HashSet<>());
         definitionMap.put(FlowFileSwapManager.class, new HashSet<>());
         definitionMap.put(ContentRepository.class, new HashSet<>());
+        definitionMap.put(StateProvider.class, new HashSet<>());
     }
 
     /**
      * Loads all FlowFileProcessor, FlowFileComparator, ReportingTask class types that can be found on the bootstrap classloader and by creating classloaders for all NARs found within the classpath.
-     * @param extensionLoaders the loaders to scan through in search of extensions
+     *
+     * @param narBundles the bundles to scan through in search of extensions
      */
-    public static void discoverExtensions(final Set<ClassLoader> extensionLoaders) {
-        final ClassLoader systemClassLoader = ClassLoader.getSystemClassLoader();
-
+    public static void discoverExtensions(final Bundle systemBundle, final Set<Bundle> narBundles) {
         // get the current context class loader
         ClassLoader currentContextClassLoader = Thread.currentThread().getContextClassLoader();
 
-        // consider the system class loader
-        loadExtensions(systemClassLoader);
+        // load the system bundle first so that any extensions found in JARs directly in lib will be registered as
+        // being from the system bundle and not from all the other NARs
+        loadExtensions(systemBundle);
+        bundleCoordinateBundleLookup.put(systemBundle.getBundleDetails().getCoordinate(), systemBundle);
 
         // consider each nar class loader
-        for (final ClassLoader ncl : extensionLoaders) {
-
+        for (final Bundle bundle : narBundles) {
             // Must set the context class loader to the nar classloader itself
             // so that static initialization techniques that depend on the context class loader will work properly
+            final ClassLoader ncl = bundle.getClassLoader();
             Thread.currentThread().setContextClassLoader(ncl);
-            loadExtensions(ncl);
+            loadExtensions(bundle);
+
+            // Create a look-up from coordinate to bundle
+            bundleCoordinateBundleLookup.put(bundle.getBundleDetails().getCoordinate(), bundle);
         }
 
         // restore the current context class loader if appropriate
@@ -104,160 +124,409 @@
     }
 
     /**
-     * Loads extensions from the specified class loader.
+     * Loads extensions from the specified bundle.
      *
-     * @param classLoader from which to load extensions
+     * @param bundle from which to load extensions
      */
     @SuppressWarnings("unchecked")
-    private static void loadExtensions(final ClassLoader classLoader) {
+    private static void loadExtensions(final Bundle bundle) {
         for (final Map.Entry<Class, Set<Class>> entry : definitionMap.entrySet()) {
-            final ServiceLoader<?> serviceLoader = ServiceLoader.load(entry.getKey(), classLoader);
+            final boolean isControllerService = ControllerService.class.equals(entry.getKey());
+            final boolean isProcessor = Processor.class.equals(entry.getKey());
+            final boolean isReportingTask = ReportingTask.class.equals(entry.getKey());
 
+            final ServiceLoader<?> serviceLoader = ServiceLoader.load(entry.getKey(), bundle.getClassLoader());
             for (final Object o : serviceLoader) {
-                registerServiceClass(o.getClass(), extensionClassloaderLookup, classLoader, entry.getValue());
+                // create a cache of temp ConfigurableComponent instances, the initialize here has to happen before the checks below
+                if ((isControllerService || isProcessor || isReportingTask) && o instanceof ConfigurableComponent) {
+                    final ConfigurableComponent configurableComponent = (ConfigurableComponent) o;
+                    initializeTempComponent(configurableComponent);
+
+                    final String cacheKey = getClassBundleKey(o.getClass().getCanonicalName(), bundle.getBundleDetails().getCoordinate());
+                    tempComponentLookup.put(cacheKey, (ConfigurableComponent) o);
+                }
+
+                // only consider extensions discovered directly in this bundle
+                boolean registerExtension = bundle.getClassLoader().equals(o.getClass().getClassLoader());
+
+                if (registerExtension) {
+                    final Class extensionType = o.getClass();
+                    if (isControllerService && !checkControllerServiceEligibility(extensionType)) {
+                        registerExtension = false;
+                        logger.error(String.format(
+                                "Skipping Controller Service %s because it is bundled with its supporting APIs and requires instance class loading.", extensionType.getName()));
+                    }
+
+                    final boolean canReferenceControllerService = (isControllerService || isProcessor || isReportingTask) && o instanceof ConfigurableComponent;
+                    if (canReferenceControllerService && !checkControllerServiceReferenceEligibility((ConfigurableComponent) o, bundle.getClassLoader())) {
+                        registerExtension = false;
+                        logger.error(String.format(
+                                "Skipping component %s because it is bundled with its referenced Controller Service APIs and requires instance class loading.", extensionType.getName()));
+                    }
+
+                    if (registerExtension) {
+                        registerServiceClass(o.getClass(), classNameBundleLookup, bundle, entry.getValue());
+                    }
+                }
+
             }
+
+            classLoaderBundleLookup.put(bundle.getClassLoader(), bundle);
         }
     }
 
-    /**
-     * Registers extension for the specified type from the specified ClassLoader.
-     *
-     * @param type the extension type
-     * @param classloaderMap mapping of classname to classloader
-     * @param classLoader the classloader being mapped to
-     * @param classes to map to this classloader but which come from its ancestors
-     */
-    private static void registerServiceClass(final Class<?> type, final Map<String, ClassLoader> classloaderMap, final ClassLoader classLoader, final Set<Class> classes) {
-        final String className = type.getName();
-        final ClassLoader registeredClassLoader = classloaderMap.get(className);
+    private static void initializeTempComponent(final ConfigurableComponent configurableComponent) {
+        ConfigurableComponentInitializer initializer = null;
+        try {
+            initializer = ConfigurableComponentInitializerFactory.createComponentInitializer(configurableComponent.getClass());
+            initializer.initialize(configurableComponent);
+        } catch (final InitializationException e) {
+            logger.warn(String.format("Unable to initialize component %s due to %s", configurableComponent.getClass().getName(), e.getMessage()));
+        }
+    }
 
-        // see if this class is already registered (this should happen when the class is loaded by an ancestor of the specified classloader)
-        if (registeredClassLoader == null) {
-            classloaderMap.put(className, classLoader);
+    private static boolean checkControllerServiceReferenceEligibility(final ConfigurableComponent component, final ClassLoader classLoader) {
+        // if the extension does not require instance classloading, its eligible
+        final boolean requiresInstanceClassLoading = component.getClass().isAnnotationPresent(RequiresInstanceClassLoading.class);
+
+        final Set<Class> cobundledApis = new HashSet<>();
+        try (final NarCloseable closeable = NarCloseable.withComponentNarLoader(component.getClass().getClassLoader())) {
+            final List<PropertyDescriptor> descriptors = component.getPropertyDescriptors();
+            if (descriptors != null && !descriptors.isEmpty()) {
+                for (final PropertyDescriptor descriptor : descriptors) {
+                    final Class<? extends ControllerService> serviceApi = descriptor.getControllerServiceDefinition();
+                    if (serviceApi != null && classLoader.equals(serviceApi.getClassLoader())) {
+                        cobundledApis.add(serviceApi);
+                    }
+                }
+            }
+        }
+
+        if (!cobundledApis.isEmpty()) {
+            logger.warn(String.format(
+                    "Component %s is bundled with its referenced Controller Service APIs %s. The service APIs should not be bundled with component implementations that reference it.",
+                    component.getClass().getName(), StringUtils.join(cobundledApis.stream().map(cls -> cls.getName()).collect(Collectors.toSet()), ", ")));
+        }
+
+        // the component is eligible when it does not require instance classloading or when the supporting APIs are bundled in a parent NAR
+        return requiresInstanceClassLoading == false || cobundledApis.isEmpty();
+    }
+
+    private static boolean checkControllerServiceEligibility(Class extensionType) {
+        final Class originalExtensionType = extensionType;
+        final ClassLoader originalExtensionClassLoader = extensionType.getClassLoader();
+
+        // if the extension does not require instance classloading, its eligible
+        final boolean requiresInstanceClassLoading = extensionType.isAnnotationPresent(RequiresInstanceClassLoading.class);
+
+        final Set<Class> cobundledApis = new HashSet<>();
+        while (extensionType != null) {
+            for (final Class i : extensionType.getInterfaces()) {
+                if (originalExtensionClassLoader.equals(i.getClassLoader())) {
+                    cobundledApis.add(i);
+                }
+            }
+
+            extensionType = extensionType.getSuperclass();
+        }
+
+        if (!cobundledApis.isEmpty()) {
+            logger.warn(String.format("Controller Service %s is bundled with its supporting APIs %s. The service APIs should not be bundled with the implementations.",
+                    originalExtensionType.getName(), StringUtils.join(cobundledApis.stream().map(cls -> cls.getName()).collect(Collectors.toSet()), ", ")));
+        }
+
+        // the service is eligible when it does not require instance classloading or when the supporting APIs are bundled in a parent NAR
+        return requiresInstanceClassLoading == false || cobundledApis.isEmpty();
+    }
+
+    /**
+     * Registers extension for the specified type from the specified Bundle.
+     *
+     * @param type               the extension type
+     * @param classNameBundleMap mapping of classname to Bundle
+     * @param bundle             the Bundle being mapped to
+     * @param classes            to map to this classloader but which come from its ancestors
+     */
+    private static void registerServiceClass(final Class<?> type, final Map<String, List<Bundle>> classNameBundleMap, final Bundle bundle, final Set<Class> classes) {
+        final String className = type.getName();
+
+        // get the bundles that have already been registered for the class name
+        List<Bundle> registeredBundles = classNameBundleMap.get(className);
+
+        if (registeredBundles == null) {
+            registeredBundles = new ArrayList<>();
+            classNameBundleMap.put(className, registeredBundles);
+        }
+
+        boolean alreadyRegistered = false;
+        for (final Bundle registeredBundle : registeredBundles) {
+            final BundleCoordinate registeredCoordinate = registeredBundle.getBundleDetails().getCoordinate();
+
+            // if the incoming bundle has the same coordinate as one of the registered bundles then consider it already registered
+            if (registeredCoordinate.equals(bundle.getBundleDetails().getCoordinate())) {
+                alreadyRegistered = true;
+                break;
+            }
+
+            // if the type wasn't loaded from an ancestor, and the type isn't a processor, cs, or reporting task, then
+            // fail registration because we don't support multiple versions of any other types
+            if (!multipleVersionsAllowed(type)) {
+                throw new IllegalStateException("Attempt was made to load " + className + " from "
+                        + bundle.getBundleDetails().getCoordinate().getCoordinate()
+                        + " but that class name is already loaded/registered from " + registeredBundle.getBundleDetails().getCoordinate()
+                        + " and multiple versions are not supported for this type"
+                );
+            }
+        }
+
+        // if none of the above was true then register the new bundle
+        if (!alreadyRegistered) {
+            registeredBundles.add(bundle);
             classes.add(type);
 
-            // keep track of which classes require a class loader per component instance
             if (type.isAnnotationPresent(RequiresInstanceClassLoading.class)) {
-                requiresInstanceClassLoading.add(className);
-            }
-
-        } else {
-            boolean loadedFromAncestor = false;
-
-            // determine if this class was loaded from an ancestor
-            ClassLoader ancestorClassLoader = classLoader.getParent();
-            while (ancestorClassLoader != null) {
-                if (ancestorClassLoader == registeredClassLoader) {
-                    loadedFromAncestor = true;
-                    break;
-                }
-                ancestorClassLoader = ancestorClassLoader.getParent();
-            }
-
-            // if this class was loaded from a non ancestor class loader, report potential unexpected behavior
-            if (!loadedFromAncestor) {
-                logger.warn("Attempt was made to load " + className + " from " + classLoader
-                        + " but that class name is already loaded/registered from " + registeredClassLoader
-                        + ".  This may cause unpredictable behavior.  Order of NARs is not guaranteed.");
+                final String cacheKey = getClassBundleKey(className, bundle.getBundleDetails().getCoordinate());
+                requiresInstanceClassLoading.put(cacheKey, type);
             }
         }
+
     }
 
     /**
-     * Determines the effective classloader for classes of the given type. If returns null it indicates the given type is not known or was not detected.
-     *
-     * @param classType to lookup the classloader of
-     * @return String of fully qualified class name; null if not a detected type
+     * @param type a Class that we found from a service loader
+     * @return true if the given class is a processor, controller service, or reporting task
      */
-    public static ClassLoader getClassLoader(final String classType) {
-        return extensionClassloaderLookup.get(classType);
+    private static boolean multipleVersionsAllowed(Class<?> type) {
+        return Processor.class.isAssignableFrom(type) || ControllerService.class.isAssignableFrom(type) || ReportingTask.class.isAssignableFrom(type);
     }
 
     /**
      * Determines the effective ClassLoader for the instance of the given type.
      *
-     * @param classType the type of class to lookup the ClassLoader for
+     * @param classType          the type of class to lookup the ClassLoader for
      * @param instanceIdentifier the identifier of the specific instance of the classType to look up the ClassLoader for
+     * @param bundle             the bundle where the classType exists
+     * @param additionalUrls     additional URLs to add to the instance class loader
      * @return the ClassLoader for the given instance of the given type, or null if the type is not a detected extension type
      */
-    public static ClassLoader getClassLoader(final String classType, final String instanceIdentifier) {
-        if (StringUtils.isEmpty(classType) || StringUtils.isEmpty(instanceIdentifier)) {
-            throw new IllegalArgumentException("Class Type and Instance Identifier must be provided");
+    public static InstanceClassLoader createInstanceClassLoader(final String classType, final String instanceIdentifier, final Bundle bundle, final Set<URL> additionalUrls) {
+        if (StringUtils.isEmpty(classType)) {
+            throw new IllegalArgumentException("Class-Type is required");
         }
 
-        // Check if we already have a ClassLoader for this instance
-        ClassLoader instanceClassLoader = instanceClassloaderLookup.get(instanceIdentifier);
-
-        // If we don't then we'll create a new ClassLoader for this instance and add it to the map for future lookups
-        if (instanceClassLoader == null) {
-            final ClassLoader registeredClassLoader = getClassLoader(classType);
-            if (registeredClassLoader == null) {
-                return null;
-            }
-
-            // If the class is annotated with @RequiresInstanceClassLoading and the registered ClassLoader is a URLClassLoader
-            // then make a new InstanceClassLoader that is a full copy of the NAR Class Loader, otherwise create an empty
-            // InstanceClassLoader that has the NAR ClassLoader as a parent
-            if (requiresInstanceClassLoading.contains(classType) && (registeredClassLoader instanceof URLClassLoader)) {
-                final URLClassLoader registeredUrlClassLoader = (URLClassLoader) registeredClassLoader;
-                instanceClassLoader = new InstanceClassLoader(instanceIdentifier, classType, registeredUrlClassLoader.getURLs(), registeredUrlClassLoader.getParent());
-            } else {
-                instanceClassLoader = new InstanceClassLoader(instanceIdentifier, classType, new URL[0], registeredClassLoader);
-            }
-
-            instanceClassloaderLookup.put(instanceIdentifier, instanceClassLoader);
+        if (StringUtils.isEmpty(instanceIdentifier)) {
+            throw new IllegalArgumentException("Instance Identifier is required");
         }
 
+        if (bundle == null) {
+            throw new IllegalArgumentException("Bundle is required");
+        }
+
+        // If the class is annotated with @RequiresInstanceClassLoading and the registered ClassLoader is a URLClassLoader
+        // then make a new InstanceClassLoader that is a full copy of the NAR Class Loader, otherwise create an empty
+        // InstanceClassLoader that has the NAR ClassLoader as a parent
+
+        InstanceClassLoader instanceClassLoader;
+        final ClassLoader bundleClassLoader = bundle.getClassLoader();
+        final String key = getClassBundleKey(classType, bundle.getBundleDetails().getCoordinate());
+
+        if (requiresInstanceClassLoading.containsKey(key) && bundleClassLoader instanceof NarClassLoader) {
+            final Class<?> type = requiresInstanceClassLoading.get(key);
+            final RequiresInstanceClassLoading requiresInstanceClassLoading = type.getAnnotation(RequiresInstanceClassLoading.class);
+
+            final NarClassLoader narBundleClassLoader = (NarClassLoader) bundleClassLoader;
+            logger.debug("Including ClassLoader resources from {} for component {}", new Object[]{bundle.getBundleDetails(), instanceIdentifier});
+
+            final Set<URL> instanceUrls = new LinkedHashSet<>();
+            for (final URL url : narBundleClassLoader.getURLs()) {
+                instanceUrls.add(url);
+            }
+
+            ClassLoader ancestorClassLoader = narBundleClassLoader.getParent();
+
+            if (requiresInstanceClassLoading.cloneAncestorResources()) {
+                final ConfigurableComponent component = getTempComponent(classType, bundle.getBundleDetails().getCoordinate());
+                final Set<BundleCoordinate> reachableApiBundles = findReachableApiBundles(component);
+
+                while (ancestorClassLoader != null && ancestorClassLoader instanceof NarClassLoader) {
+                    final Bundle ancestorNarBundle = classLoaderBundleLookup.get(ancestorClassLoader);
+
+                    // stop including ancestor resources when we reach one of the APIs
+                    if (ancestorNarBundle == null || reachableApiBundles.contains(ancestorNarBundle.getBundleDetails().getCoordinate())) {
+                        break;
+                    }
+
+                    final NarClassLoader ancestorNarClassLoader = (NarClassLoader) ancestorClassLoader;
+                    for (final URL url : ancestorNarClassLoader.getURLs()) {
+                        instanceUrls.add(url);
+                    }
+                    ancestorClassLoader = ancestorNarClassLoader.getParent();
+                }
+            }
+
+            instanceClassLoader = new InstanceClassLoader(instanceIdentifier, classType, instanceUrls, additionalUrls, ancestorClassLoader);
+        } else {
+            instanceClassLoader = new InstanceClassLoader(instanceIdentifier, classType, Collections.emptySet(), additionalUrls, bundleClassLoader);
+        }
+
+        if (logger.isTraceEnabled()) {
+            for (URL url : instanceClassLoader.getURLs()) {
+                logger.trace("URL resource {} for {}...", new Object[]{url.toExternalForm(), instanceIdentifier});
+            }
+        }
+
+        instanceClassloaderLookup.put(instanceIdentifier, instanceClassLoader);
         return instanceClassLoader;
     }
 
     /**
-     * Removes the ClassLoader for the given instance and closes it if necessary.
+     * Find the bundle coordinates for any service APIs that are referenced by this component and not part of the same bundle.
      *
-     * @param instanceIdentifier the identifier of a component to remove the ClassLoader for
-     * @return the removed ClassLoader for the given instance, or null if not found
+     * @param component the component being instantiated
      */
-    public static ClassLoader removeInstanceClassLoaderIfExists(final String instanceIdentifier) {
+    protected static Set<BundleCoordinate> findReachableApiBundles(final ConfigurableComponent component) {
+        final Set<BundleCoordinate> reachableApiBundles = new HashSet<>();
+
+        try (final NarCloseable closeable = NarCloseable.withComponentNarLoader(component.getClass().getClassLoader())) {
+            final List<PropertyDescriptor> descriptors = component.getPropertyDescriptors();
+            if (descriptors != null && !descriptors.isEmpty()) {
+                for (final PropertyDescriptor descriptor : descriptors) {
+                    final Class<? extends ControllerService> serviceApi = descriptor.getControllerServiceDefinition();
+                    if (serviceApi != null && !component.getClass().getClassLoader().equals(serviceApi.getClassLoader())) {
+                        final Bundle apiBundle = classLoaderBundleLookup.get(serviceApi.getClassLoader());
+                        reachableApiBundles.add(apiBundle.getBundleDetails().getCoordinate());
+                    }
+                }
+            }
+        }
+
+        return reachableApiBundles;
+    }
+
+    /**
+     * Retrieves the InstanceClassLoader for the component with the given identifier.
+     *
+     * @param instanceIdentifier the identifier of a component
+     * @return the instance class loader for the component
+     */
+    public static InstanceClassLoader getInstanceClassLoader(final String instanceIdentifier) {
+        return instanceClassloaderLookup.get(instanceIdentifier);
+    }
+
+    /**
+     * Removes the InstanceClassLoader for a given component.
+     *
+     * @param instanceIdentifier the of a component
+     */
+    public static InstanceClassLoader removeInstanceClassLoader(final String instanceIdentifier) {
         if (instanceIdentifier == null) {
             return null;
         }
 
-        final ClassLoader classLoader = instanceClassloaderLookup.remove(instanceIdentifier);
+        final InstanceClassLoader classLoader = instanceClassloaderLookup.remove(instanceIdentifier);
+        closeURLClassLoader(instanceIdentifier, classLoader);
+        return classLoader;
+    }
+
+    /**
+     * Closes the given ClassLoader if it is an instance of URLClassLoader.
+     *
+     * @param instanceIdentifier the instance id the class loader corresponds to
+     * @param classLoader        the class loader to close
+     */
+    public static void closeURLClassLoader(final String instanceIdentifier, final ClassLoader classLoader) {
         if (classLoader != null && (classLoader instanceof URLClassLoader)) {
             final URLClassLoader urlClassLoader = (URLClassLoader) classLoader;
             try {
                 urlClassLoader.close();
             } catch (IOException e) {
-                logger.warn("Unable to class URLClassLoader for " + instanceIdentifier);
+                logger.warn("Unable to close URLClassLoader for " + instanceIdentifier);
             }
         }
-        return classLoader;
     }
 
     /**
-     * Checks if the given class type requires per-instance class loading (i.e. contains the @RequiresInstanceClassLoading annotation)
+     * Retrieves the bundles that have a class with the given name.
      *
-     * @param classType the class to check
-     * @return true if the class is found in the set of classes requiring instance level class loading, false otherwise
+     * @param classType the class name of an extension
+     * @return the list of bundles that contain an extension with the given class name
      */
-    public static boolean requiresInstanceClassLoading(final String classType) {
-        return requiresInstanceClassLoading.contains(classType);
+    public static List<Bundle> getBundles(final String classType) {
+        if (classType == null) {
+            throw new IllegalArgumentException("Class type cannot be null");
+        }
+        final List<Bundle> bundles = classNameBundleLookup.get(classType);
+        return bundles == null ? Collections.emptyList() : new ArrayList<>(bundles);
+    }
+
+    /**
+     * Retrieves the bundle with the given coordinate.
+     *
+     * @param bundleCoordinate a coordinate to look up
+     * @return the bundle with the given coordinate, or null if none exists
+     */
+    public static Bundle getBundle(final BundleCoordinate bundleCoordinate) {
+        if (bundleCoordinate == null) {
+            throw new IllegalArgumentException("BundleCoordinate cannot be null");
+        }
+        return bundleCoordinateBundleLookup.get(bundleCoordinate);
+    }
+
+    /**
+     * Retrieves the bundle for the given class loader.
+     *
+     * @param classLoader the class loader to look up the bundle for
+     * @return the bundle for the given class loader
+     */
+    public static Bundle getBundle(final ClassLoader classLoader) {
+        if (classLoader == null) {
+            throw new IllegalArgumentException("ClassLoader cannot be null");
+        }
+        return classLoaderBundleLookup.get(classLoader);
     }
 
     public static Set<Class> getExtensions(final Class<?> definition) {
+        if (definition == null) {
+            throw new IllegalArgumentException("Class cannot be null");
+        }
         final Set<Class> extensions = definitionMap.get(definition);
         return (extensions == null) ? Collections.<Class>emptySet() : extensions;
     }
 
+    public static ConfigurableComponent getTempComponent(final String classType, final BundleCoordinate bundleCoordinate) {
+        if (classType == null) {
+            throw new IllegalArgumentException("Class type cannot be null");
+        }
+
+        if (bundleCoordinate == null) {
+            throw new IllegalArgumentException("Bundle Coordinate cannot be null");
+        }
+
+        return tempComponentLookup.get(getClassBundleKey(classType, bundleCoordinate));
+    }
+
+    private static String getClassBundleKey(final String classType, final BundleCoordinate bundleCoordinate) {
+        return classType + "_" + bundleCoordinate.getCoordinate();
+    }
+
     public static void logClassLoaderMapping() {
         final StringBuilder builder = new StringBuilder();
 
-        builder.append("Extension Type Mapping to Classloader:");
+        builder.append("Extension Type Mapping to Bundle:");
         for (final Map.Entry<Class, Set<Class>> entry : definitionMap.entrySet()) {
-            builder.append("\n\t=== ").append(entry.getKey().getSimpleName()).append(" type || Classloader ===");
+            builder.append("\n\t=== ").append(entry.getKey().getSimpleName()).append(" Type ===");
 
             for (final Class type : entry.getValue()) {
-                builder.append("\n\t").append(type.getName()).append(" || ").append(getClassLoader(type.getName()));
+                final List<Bundle> bundles = classNameBundleLookup.containsKey(type.getName())
+                        ? classNameBundleLookup.get(type.getName()) : Collections.emptyList();
+
+                builder.append("\n\t").append(type.getName());
+
+                for (final Bundle bundle : bundles) {
+                    final String coordinate = bundle.getBundleDetails().getCoordinate().getCoordinate();
+                    final String workingDir = bundle.getBundleDetails().getWorkingDirectory().getPath();
+                    builder.append("\n\t\t").append(coordinate).append(" || ").append(workingDir);
+                }
             }
 
             builder.append("\n\t=== End ").append(entry.getKey().getSimpleName()).append(" types ===");
diff --git a/minifi-nar-bundles/minifi-framework-bundle/minifi-framework/minifi-nar-utils/src/main/java/org/apache/nifi/nar/InstanceClassLoader.java b/minifi-nar-bundles/minifi-framework-bundle/minifi-framework/minifi-nar-utils/src/main/java/org/apache/nifi/nar/InstanceClassLoader.java
index 8aff08f..d9e23fa 100644
--- a/minifi-nar-bundles/minifi-framework-bundle/minifi-framework/minifi-nar-utils/src/main/java/org/apache/nifi/nar/InstanceClassLoader.java
+++ b/minifi-nar-bundles/minifi-framework-bundle/minifi-framework/minifi-nar-utils/src/main/java/org/apache/nifi/nar/InstanceClassLoader.java
@@ -19,16 +19,17 @@
 import org.slf4j.Logger;
 import org.slf4j.LoggerFactory;
 
-import java.io.IOException;
 import java.net.URL;
 import java.net.URLClassLoader;
+import java.util.Collections;
+import java.util.LinkedHashSet;
+import java.util.Set;
 
 /**
- * A ClassLoader created for an instance of a component which lets a client add resources to an intermediary ClassLoader
- * that will be checked first when loading/finding classes.
+ * Each processor, controller service, and reporting task will have an InstanceClassLoader.
  *
- * Typically an instance of this ClassLoader will be created by passing in the URLs and parent from a NARClassLoader in
- * order to create a copy of the NARClassLoader without modifying it.
+ * The InstanceClassLoader will either be an empty pass-through to the NARClassLoader, or will contain a
+ * copy of all the NAR's resources in the case of components that @RequireInstanceClassLoading.
  */
 public class InstanceClassLoader extends URLClassLoader {
 
@@ -36,125 +37,53 @@
 
     private final String identifier;
     private final String instanceType;
-    private ShimClassLoader shimClassLoader;
+
+    private final Set<URL> instanceUrls;
+    private final Set<URL> additionalResourceUrls;
 
     /**
      * @param identifier the id of the component this ClassLoader was created for
-     * @param urls the URLs for the ClassLoader
+     * @param instanceUrls the urls for the instance, will either be empty or a copy of the NARs urls
+     * @param additionalResourceUrls the urls that came from runtime properties of the component
      * @param parent the parent ClassLoader
      */
-    public InstanceClassLoader(final String identifier, final String type, final URL[] urls, final ClassLoader parent) {
-        super(urls, parent);
+    public InstanceClassLoader(final String identifier, final String type, final Set<URL> instanceUrls, final Set<URL> additionalResourceUrls, final ClassLoader parent) {
+        super(combineURLs(instanceUrls, additionalResourceUrls), parent);
         this.identifier = identifier;
         this.instanceType = type;
+        this.instanceUrls = Collections.unmodifiableSet(
+                instanceUrls == null ? Collections.emptySet() : new LinkedHashSet<>(instanceUrls));
+        this.additionalResourceUrls = Collections.unmodifiableSet(
+                additionalResourceUrls == null ? Collections.emptySet() : new LinkedHashSet<>(additionalResourceUrls));
     }
 
-    /**
-     * Initializes a new ShimClassLoader for the provided resources, closing the previous ShimClassLoader if one existed.
-     *
-     * @param urls the URLs for the ShimClassLoader
-     * @throws IOException if the previous ShimClassLoader existed and couldn't be closed
-     */
-    public synchronized void setInstanceResources(final URL[] urls) {
-        if (shimClassLoader != null) {
-            try {
-                shimClassLoader.close();
-            } catch (IOException e) {
-                logger.warn("Unable to close inner URLClassLoader for " + identifier);
-            }
+    private static URL[] combineURLs(final Set<URL> instanceUrls, final Set<URL> additionalResourceUrls) {
+        final Set<URL> allUrls = new LinkedHashSet<>();
+
+        if (instanceUrls != null) {
+            allUrls.addAll(instanceUrls);
         }
 
-        shimClassLoader = new ShimClassLoader(urls, getParent());
+        if (additionalResourceUrls != null) {
+            allUrls.addAll(additionalResourceUrls);
+        }
+
+        return allUrls.toArray(new URL[allUrls.size()]);
     }
 
-    /**
-     * @return the URLs for the instance resources that have been set
-     */
-    public synchronized URL[] getInstanceResources() {
-        if (shimClassLoader != null) {
-            return shimClassLoader.getURLs();
-        }
-        return new URL[0];
+    public String getIdentifier() {
+        return identifier;
     }
 
-    @Override
-    public Class<?> loadClass(String name) throws ClassNotFoundException {
-        return this.loadClass(name, false);
+    public String getInstanceType() {
+        return instanceType;
     }
 
-    @Override
-    protected Class<?> loadClass(String name, boolean resolve) throws ClassNotFoundException {
-        Class<?> c = null;
-        // first try the shim
-        if (shimClassLoader != null) {
-            try {
-                c = shimClassLoader.loadClass(name, resolve);
-            } catch (ClassNotFoundException e) {
-                c = null;
-            }
-        }
-        // if it wasn't in the shim try our self
-        if (c == null) {
-            return super.loadClass(name, resolve);
-        } else {
-            return c;
-        }
+    public Set<URL> getInstanceUrls() {
+        return instanceUrls;
     }
 
-    @Override
-    protected Class<?> findClass(String name) throws ClassNotFoundException {
-        Class<?> c = null;
-        // first try the shim
-        if (shimClassLoader != null) {
-            try {
-                c = shimClassLoader.findClass(name);
-            } catch (ClassNotFoundException cnf) {
-                c = null;
-            }
-        }
-        // if it wasn't in the shim try our self
-        if (c == null) {
-            return super.findClass(name);
-        } else {
-            return c;
-        }
+    public Set<URL> getAdditionalResourceUrls() {
+        return additionalResourceUrls;
     }
-
-    @Override
-    public void close() throws IOException {
-        if (shimClassLoader != null) {
-            try {
-                shimClassLoader.close();
-            } catch (IOException e) {
-                logger.warn("Unable to close inner URLClassLoader for " + identifier);
-            }
-        }
-        super.close();
-    }
-
-    /**
-     * Extend URLClassLoader to increase visibility of protected methods so that InstanceClassLoader can delegate.
-     */
-    private static class ShimClassLoader extends URLClassLoader {
-
-        public ShimClassLoader(URL[] urls, ClassLoader parent) {
-            super(urls, parent);
-        }
-
-        public ShimClassLoader(URL[] urls) {
-            super(urls);
-        }
-
-        @Override
-        public Class<?> findClass(String name) throws ClassNotFoundException {
-            return super.findClass(name);
-        }
-
-        @Override
-        public Class<?> loadClass(String name, boolean resolve) throws ClassNotFoundException {
-            return super.loadClass(name, resolve);
-        }
-
-    }
-
-}
\ No newline at end of file
+}
diff --git a/minifi-nar-bundles/minifi-framework-bundle/minifi-framework/minifi-nar-utils/src/main/java/org/apache/nifi/nar/NarBundleUtil.java b/minifi-nar-bundles/minifi-framework-bundle/minifi-framework/minifi-nar-utils/src/main/java/org/apache/nifi/nar/NarBundleUtil.java
new file mode 100644
index 0000000..62afb37
--- /dev/null
+++ b/minifi-nar-bundles/minifi-framework-bundle/minifi-framework/minifi-nar-utils/src/main/java/org/apache/nifi/nar/NarBundleUtil.java
@@ -0,0 +1,74 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.nifi.nar;
+
+import org.apache.nifi.bundle.BundleCoordinate;
+import org.apache.nifi.bundle.BundleDetails;
+import org.apache.nifi.util.StringUtils;
+
+import java.io.File;
+import java.io.FileInputStream;
+import java.io.IOException;
+import java.util.jar.Attributes;
+import java.util.jar.Manifest;
+
+public class NarBundleUtil {
+
+    /**
+     * Creates a BundleDetails from the given NAR working directory.
+     *
+     * @param narDirectory the directory of an exploded NAR which contains a META-INF/MANIFEST.MF
+     *
+     * @return the BundleDetails constructed from the information in META-INF/MANIFEST.MF
+     */
+    public static BundleDetails fromNarDirectory(final File narDirectory) throws IOException, IllegalStateException {
+        if (narDirectory == null) {
+            throw new IllegalArgumentException("NAR Directory cannot be null");
+        }
+
+        final File manifestFile = new File(narDirectory, "META-INF/MANIFEST.MF");
+        try (final FileInputStream fis = new FileInputStream(manifestFile)) {
+            final Manifest manifest = new Manifest(fis);
+            final Attributes attributes = manifest.getMainAttributes();
+
+            final BundleDetails.Builder builder = new BundleDetails.Builder();
+            builder.workingDir(narDirectory);
+
+            final String group = attributes.getValue(NarManifestEntry.NAR_GROUP.getManifestName());
+            final String id = attributes.getValue(NarManifestEntry.NAR_ID.getManifestName());
+            final String version = attributes.getValue(NarManifestEntry.NAR_VERSION.getManifestName());
+            builder.coordinate(new BundleCoordinate(group, id, version));
+
+            final String dependencyGroup = attributes.getValue(NarManifestEntry.NAR_DEPENDENCY_GROUP.getManifestName());
+            final String dependencyId = attributes.getValue(NarManifestEntry.NAR_DEPENDENCY_ID.getManifestName());
+            final String dependencyVersion = attributes.getValue(NarManifestEntry.NAR_DEPENDENCY_VERSION.getManifestName());
+            if (!StringUtils.isBlank(dependencyId)) {
+                builder.dependencyCoordinate(new BundleCoordinate(dependencyGroup, dependencyId, dependencyVersion));
+            }
+
+            builder.buildBranch(attributes.getValue(NarManifestEntry.BUILD_BRANCH.getManifestName()));
+            builder.buildTag(attributes.getValue(NarManifestEntry.BUILD_TAG.getManifestName()));
+            builder.buildRevision(attributes.getValue(NarManifestEntry.BUILD_REVISION.getManifestName()));
+            builder.buildTimestamp(attributes.getValue(NarManifestEntry.BUILD_TIMESTAMP.getManifestName()));
+            builder.buildJdk(attributes.getValue(NarManifestEntry.BUILD_JDK.getManifestName()));
+            builder.builtBy(attributes.getValue(NarManifestEntry.BUILT_BY.getManifestName()));
+
+            return builder.build();
+        }
+    }
+
+}
diff --git a/minifi-nar-bundles/minifi-framework-bundle/minifi-framework/minifi-nar-utils/src/main/java/org/apache/nifi/nar/NarClassLoaders.java b/minifi-nar-bundles/minifi-framework-bundle/minifi-framework/minifi-nar-utils/src/main/java/org/apache/nifi/nar/NarClassLoaders.java
index 9219721..7e8ba89 100644
--- a/minifi-nar-bundles/minifi-framework-bundle/minifi-framework/minifi-nar-utils/src/main/java/org/apache/nifi/nar/NarClassLoaders.java
+++ b/minifi-nar-bundles/minifi-framework-bundle/minifi-framework/minifi-nar-utils/src/main/java/org/apache/nifi/nar/NarClassLoaders.java
@@ -16,8 +16,14 @@
  */
 package org.apache.nifi.nar;
 
+import org.apache.nifi.bundle.Bundle;
+import org.apache.nifi.bundle.BundleCoordinate;
+import org.apache.nifi.bundle.BundleDetails;
+import org.apache.nifi.util.FileUtils;
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+
 import java.io.File;
-import java.io.FileInputStream;
 import java.io.IOException;
 import java.util.ArrayList;
 import java.util.Arrays;
@@ -28,14 +34,10 @@
 import java.util.List;
 import java.util.Map;
 import java.util.Set;
-import java.util.jar.Attributes;
-import java.util.jar.Manifest;
-import org.apache.nifi.util.FileUtils;
-import org.slf4j.Logger;
-import org.slf4j.LoggerFactory;
+
 
 /**
- *
+ * A singleton class used to initialize the extension and framework classloaders.
  */
 public final class NarClassLoaders {
 
@@ -49,18 +51,18 @@
 
         private final File frameworkWorkingDir;
         private final File extensionWorkingDir;
-        private final ClassLoader frameworkClassLoader;
-        private final Map<String, ClassLoader> extensionClassLoaders;
+        private final Bundle frameworkBundle;
+        private final Map<String, Bundle> bundles;
 
         private InitContext(
                 final File frameworkDir,
                 final File extensionDir,
-                final ClassLoader frameworkClassloader,
-                final Map<String, ClassLoader> extensionClassLoaders) {
+                final Bundle frameworkBundle,
+                final Map<String, Bundle> bundles) {
             this.frameworkWorkingDir = frameworkDir;
             this.extensionWorkingDir = extensionDir;
-            this.frameworkClassLoader = frameworkClassloader;
-            this.extensionClassLoaders = extensionClassLoaders;
+            this.frameworkBundle = frameworkBundle;
+            this.bundles = bundles;
         }
     }
 
@@ -127,12 +129,13 @@
         ClassLoader currentContextClassLoader = Thread.currentThread().getContextClassLoader();
 
         // find all nar files and create class loaders for them.
-        final Map<String, ClassLoader> extensionDirectoryClassLoaderLookup = new LinkedHashMap<>();
-        final Map<String, ClassLoader> narIdClassLoaderLookup = new HashMap<>();
+        final Map<String, Bundle> narDirectoryBundleLookup = new LinkedHashMap<>();
+        final Map<String, ClassLoader> narCoordinateClassLoaderLookup = new HashMap<>();
+        final Map<String, Set<BundleCoordinate>> narIdBundleLookup = new HashMap<>();
 
         // make sure the nar directory is there and accessible
-        FileUtils.ensureDirectoryExistAndCanAccess(frameworkWorkingDir);
-        FileUtils.ensureDirectoryExistAndCanAccess(extensionsWorkingDir);
+        FileUtils.ensureDirectoryExistAndCanReadAndWrite(frameworkWorkingDir);
+        FileUtils.ensureDirectoryExistAndCanReadAndWrite(extensionsWorkingDir);
 
         final List<File> narWorkingDirContents = new ArrayList<>();
         final File[] frameworkWorkingDirContents = frameworkWorkingDir.listFiles();
@@ -145,20 +148,30 @@
         }
 
         if (!narWorkingDirContents.isEmpty()) {
-            final List<NarDetails> narDetails = new ArrayList<>();
+            final List<BundleDetails> narDetails = new ArrayList<>();
+            final Map<String,String> narCoordinatesToWorkingDir = new HashMap<>();
 
             // load the nar details which includes and nar dependencies
             for (final File unpackedNar : narWorkingDirContents) {
-                final NarDetails narDetail = getNarDetails(unpackedNar);
-
-                // ensure the nar contained an identifier
-                if (narDetail.getNarId() == null) {
-                    logger.warn("No NAR Id found. Skipping: " + unpackedNar.getAbsolutePath());
-                    continue;
+                BundleDetails narDetail = null;
+                try {
+                    narDetail = getNarDetails(unpackedNar);
+                } catch (IllegalStateException e) {
+                    logger.warn("Unable to load NAR {} due to {}, skipping...",
+                            new Object[] {unpackedNar.getAbsolutePath(), e.getMessage()});
                 }
 
-                // store the nar details
+                // prevent the application from starting when there are two NARs with same group, id, and version
+                final String narCoordinate = narDetail.getCoordinate().getCoordinate();
+                if (narCoordinatesToWorkingDir.containsKey(narCoordinate)) {
+                    final String existingNarWorkingDir = narCoordinatesToWorkingDir.get(narCoordinate);
+                    throw new IllegalStateException("Unable to load NAR with coordinates " + narCoordinate
+                            + " and working directory " + narDetail.getWorkingDirectory()
+                            + " because another NAR with the same coordinates already exists at " + existingNarWorkingDir);
+                }
+
                 narDetails.add(narDetail);
+                narCoordinatesToWorkingDir.put(narCoordinate, narDetail.getWorkingDirectory().getCanonicalPath());
             }
 
             int narCount;
@@ -167,22 +180,50 @@
                 narCount = narDetails.size();
 
                 // attempt to create each nar class loader
-                for (final Iterator<NarDetails> narDetailsIter = narDetails.iterator(); narDetailsIter.hasNext();) {
-                    final NarDetails narDetail = narDetailsIter.next();
-                    final String narDependencies = narDetail.getNarDependencyId();
+                for (final Iterator<BundleDetails> narDetailsIter = narDetails.iterator(); narDetailsIter.hasNext();) {
+                    final BundleDetails narDetail = narDetailsIter.next();
+                    final BundleCoordinate narDependencyCoordinate = narDetail.getDependencyCoordinate();
 
                     // see if this class loader is eligible for loading
                     ClassLoader narClassLoader = null;
-                    if (narDependencies == null) {
-                        narClassLoader = createNarClassLoader(narDetail.getNarWorkingDirectory(), currentContextClassLoader);
-                    } else if (narIdClassLoaderLookup.containsKey(narDetail.getNarDependencyId())) {
-                        narClassLoader = createNarClassLoader(narDetail.getNarWorkingDirectory(), narIdClassLoaderLookup.get(narDetail.getNarDependencyId()));
+                    if (narDependencyCoordinate == null) {
+                        narClassLoader = createNarClassLoader(narDetail.getWorkingDirectory(), currentContextClassLoader);
+                    } else {
+                        final String dependencyCoordinateStr = narDependencyCoordinate.getCoordinate();
+
+                        // if the declared dependency has already been loaded
+                        if (narCoordinateClassLoaderLookup.containsKey(dependencyCoordinateStr)) {
+                            final ClassLoader narDependencyClassLoader = narCoordinateClassLoaderLookup.get(dependencyCoordinateStr);
+                            narClassLoader = createNarClassLoader(narDetail.getWorkingDirectory(), narDependencyClassLoader);
+                        } else {
+                            // get all bundles that match the declared dependency id
+                            final Set<BundleCoordinate> coordinates = narIdBundleLookup.get(narDependencyCoordinate.getId());
+
+                            // ensure there are known bundles that match the declared dependency id
+                            if (coordinates != null && !coordinates.contains(narDependencyCoordinate)) {
+                                // ensure the declared dependency only has one possible bundle
+                                if (coordinates.size() == 1) {
+                                    // get the bundle with the matching id
+                                    final BundleCoordinate coordinate = coordinates.stream().findFirst().get();
+
+                                    // if that bundle is loaded, use it
+                                    if (narCoordinateClassLoaderLookup.containsKey(coordinate.getCoordinate())) {
+                                        logger.warn(String.format("While loading '%s' unable to locate exact NAR dependency '%s'. Only found one possible match '%s'. Continuing...",
+                                                narDetail.getCoordinate().getCoordinate(), dependencyCoordinateStr, coordinate.getCoordinate()));
+
+                                        final ClassLoader narDependencyClassLoader = narCoordinateClassLoaderLookup.get(coordinate.getCoordinate());
+                                        narClassLoader = createNarClassLoader(narDetail.getWorkingDirectory(), narDependencyClassLoader);
+                                    }
+                                }
+                            }
+                        }
                     }
 
                     // if we were able to create the nar class loader, store it and remove the details
-                    if (narClassLoader != null) {
-                        extensionDirectoryClassLoaderLookup.put(narDetail.getNarWorkingDirectory().getCanonicalPath(), narClassLoader);
-                        narIdClassLoaderLookup.put(narDetail.getNarId(), narClassLoader);
+                    final ClassLoader bundleClassLoader = narClassLoader;
+                    if (bundleClassLoader != null) {
+                        narDirectoryBundleLookup.put(narDetail.getWorkingDirectory().getCanonicalPath(), new Bundle(narDetail, bundleClassLoader));
+                        narCoordinateClassLoaderLookup.put(narDetail.getCoordinate().getCoordinate(), narClassLoader);
                         narDetailsIter.remove();
                     }
                 }
@@ -191,12 +232,18 @@
             } while (narCount != narDetails.size());
 
             // see if any nars couldn't be loaded
-            for (final NarDetails narDetail : narDetails) {
-                logger.warn(String.format("Unable to resolve required dependency '%s'. Skipping NAR %s", narDetail.getNarDependencyId(), narDetail.getNarWorkingDirectory().getAbsolutePath()));
+            for (final BundleDetails narDetail : narDetails) {
+                logger.warn(String.format("Unable to resolve required dependency '%s'. Skipping NAR '%s'",
+                        narDetail.getDependencyCoordinate().getId(), narDetail.getWorkingDirectory().getAbsolutePath()));
             }
         }
 
-        return new InitContext(frameworkWorkingDir, extensionsWorkingDir, narIdClassLoaderLookup.get(FRAMEWORK_NAR_ID), new LinkedHashMap<>(extensionDirectoryClassLoaderLookup));
+        // find the framework bundle, NarUnpacker already checked that there was a framework NAR and that there was only one
+        final Bundle frameworkBundle = narDirectoryBundleLookup.values().stream()
+                .filter(b -> b.getBundleDetails().getCoordinate().getId().equals(FRAMEWORK_NAR_ID))
+                .findFirst().orElse(null);
+
+        return new InitContext(frameworkWorkingDir, extensionsWorkingDir, frameworkBundle, new LinkedHashMap<>(narDirectoryBundleLookup));
     }
 
     /**
@@ -223,50 +270,36 @@
      * @return details about the NAR
      * @throws IOException ioe
      */
-    private static NarDetails getNarDetails(final File narDirectory) throws IOException {
-        final NarDetails narDetails = new NarDetails();
-        narDetails.setNarWorkingDirectory(narDirectory);
-
-        final File manifestFile = new File(narDirectory, "META-INF/MANIFEST.MF");
-        try (final FileInputStream fis = new FileInputStream(manifestFile)) {
-            final Manifest manifest = new Manifest(fis);
-            final Attributes attributes = manifest.getMainAttributes();
-
-            // get the nar details
-            narDetails.setNarId(attributes.getValue("Nar-Id"));
-            narDetails.setNarDependencyId(attributes.getValue("Nar-Dependency-Id"));
-        }
-
-        return narDetails;
+    private static BundleDetails getNarDetails(final File narDirectory) throws IOException {
+        return NarBundleUtil.fromNarDirectory(narDirectory);
     }
 
     /**
-     * @return the framework class loader
+     * @return the framework class Bundle
      *
-     * @throws IllegalStateException if the frame class loader has not been
-     * loaded
+     * @throws IllegalStateException if the frame Bundle has not been loaded
      */
-    public ClassLoader getFrameworkClassLoader() {
+    public Bundle getFrameworkBundle() {
         if (initContext == null) {
-            throw new IllegalStateException("Framework class loader has not been loaded.");
+            throw new IllegalStateException("Framework bundle has not been loaded.");
         }
 
-        return initContext.frameworkClassLoader;
+        return initContext.frameworkBundle;
     }
 
     /**
      * @param extensionWorkingDirectory the directory
-     * @return the class loader for the specified working directory. Returns
-     * null when no class loader exists for the specified working directory
-     * @throws IllegalStateException if the class loaders have not been loaded
+     * @return the bundle for the specified working directory. Returns
+     * null when no bundle exists for the specified working directory
+     * @throws IllegalStateException if the bundles have not been loaded
      */
-    public ClassLoader getExtensionClassLoader(final File extensionWorkingDirectory) {
+    public Bundle getBundle(final File extensionWorkingDirectory) {
         if (initContext == null) {
             throw new IllegalStateException("Extensions class loaders have not been loaded.");
         }
 
         try {
-            return initContext.extensionClassLoaders.get(extensionWorkingDirectory.getCanonicalPath());
+            return initContext.bundles.get(extensionWorkingDirectory.getCanonicalPath());
         } catch (final IOException ioe) {
             if(logger.isDebugEnabled()){
                 logger.debug("Unable to get extension classloader for working directory '{}'", extensionWorkingDirectory);
@@ -276,45 +309,15 @@
     }
 
     /**
-     * @return the extension class loaders
-     * @throws IllegalStateException if the class loaders have not been loaded
+     * @return the extensions that have been loaded
+     * @throws IllegalStateException if the extensions have not been loaded
      */
-    public Set<ClassLoader> getExtensionClassLoaders() {
+    public Set<Bundle> getBundles() {
         if (initContext == null) {
-            throw new IllegalStateException("Extensions class loaders have not been loaded.");
+            throw new IllegalStateException("Bundles have not been loaded.");
         }
 
-        return new LinkedHashSet<>(initContext.extensionClassLoaders.values());
+        return new LinkedHashSet<>(initContext.bundles.values());
     }
 
-    private static class NarDetails {
-
-        private String narId;
-        private String narDependencyId;
-        private File narWorkingDirectory;
-
-        public String getNarDependencyId() {
-            return narDependencyId;
-        }
-
-        public void setNarDependencyId(String narDependencyId) {
-            this.narDependencyId = narDependencyId;
-        }
-
-        public String getNarId() {
-            return narId;
-        }
-
-        public void setNarId(String narId) {
-            this.narId = narId;
-        }
-
-        public File getNarWorkingDirectory() {
-            return narWorkingDirectory;
-        }
-
-        public void setNarWorkingDirectory(File narWorkingDirectory) {
-            this.narWorkingDirectory = narWorkingDirectory;
-        }
-    }
 }
diff --git a/minifi-nar-bundles/minifi-framework-bundle/minifi-framework/minifi-nar-utils/src/main/java/org/apache/nifi/nar/NarCloseable.java b/minifi-nar-bundles/minifi-framework-bundle/minifi-framework/minifi-nar-utils/src/main/java/org/apache/nifi/nar/NarCloseable.java
index 56aff9e..098b7d3 100644
--- a/minifi-nar-bundles/minifi-framework-bundle/minifi-framework/minifi-nar-utils/src/main/java/org/apache/nifi/nar/NarCloseable.java
+++ b/minifi-nar-bundles/minifi-framework-bundle/minifi-framework/minifi-nar-utils/src/main/java/org/apache/nifi/nar/NarCloseable.java
@@ -16,11 +16,11 @@
  */
 package org.apache.nifi.nar;
 
+import java.io.Closeable;
+
 import org.slf4j.Logger;
 import org.slf4j.LoggerFactory;
 
-import java.io.Closeable;
-
 /**
  *
  */
@@ -48,7 +48,7 @@
     public static NarCloseable withComponentNarLoader(final Class componentClass, final String componentIdentifier) {
         final ClassLoader current = Thread.currentThread().getContextClassLoader();
 
-        ClassLoader componentClassLoader = ExtensionManager.getClassLoader(componentClass.getName(), componentIdentifier);
+        ClassLoader componentClassLoader = ExtensionManager.getInstanceClassLoader(componentIdentifier);
         if (componentClassLoader == null) {
             componentClassLoader = componentClass.getClassLoader();
         }
@@ -58,6 +58,20 @@
     }
 
     /**
+     * Sets the current thread context class loader to the provided class loader, and returns a NarCloseable that will
+     * return the current thread context class loader to it's previous state.
+     *
+     * @param componentNarLoader the class loader to set as the current thread context class loader
+     *
+     * @return NarCloseable that will return the current thread context class loader to its previous state
+     */
+    public static NarCloseable withComponentNarLoader(final ClassLoader componentNarLoader) {
+        final ClassLoader current = Thread.currentThread().getContextClassLoader();
+        Thread.currentThread().setContextClassLoader(componentNarLoader);
+        return new NarCloseable(current);
+    }
+
+    /**
      * Creates a Closeable object that can be used to to switch to current class
      * loader to the framework class loader and will automatically set the
      * ClassLoader back to the previous class loader when closed
@@ -67,7 +81,7 @@
     public static NarCloseable withFrameworkNar() {
         final ClassLoader frameworkClassLoader;
         try {
-            frameworkClassLoader = NarClassLoaders.getInstance().getFrameworkClassLoader();
+            frameworkClassLoader = NarClassLoaders.getInstance().getFrameworkBundle().getClassLoader();
         } catch (final Exception e) {
             // This should never happen in a running instance, but it will occur in unit tests
             logger.error("Unable to access Framework ClassLoader due to " + e + ". Will continue without changing ClassLoaders.");
diff --git a/minifi-nar-bundles/minifi-framework-bundle/minifi-framework/minifi-nar-utils/src/main/java/org/apache/nifi/nar/NarManifestEntry.java b/minifi-nar-bundles/minifi-framework-bundle/minifi-framework/minifi-nar-utils/src/main/java/org/apache/nifi/nar/NarManifestEntry.java
new file mode 100644
index 0000000..8b02742
--- /dev/null
+++ b/minifi-nar-bundles/minifi-framework-bundle/minifi-framework/minifi-nar-utils/src/main/java/org/apache/nifi/nar/NarManifestEntry.java
@@ -0,0 +1,48 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.nifi.nar;
+
+/**
+ * Enumeration of entries that will be in a NAR MANIFEST file.
+ */
+public enum NarManifestEntry {
+
+    NAR_GROUP("Nar-Group"),
+    NAR_ID("Nar-Id"),
+    NAR_VERSION("Nar-Version"),
+    NAR_DEPENDENCY_GROUP("Nar-Dependency-Group"),
+    NAR_DEPENDENCY_ID("Nar-Dependency-Id"),
+    NAR_DEPENDENCY_VERSION("Nar-Dependency-Version"),
+    BUILD_TAG("Build-Tag"),
+    BUILD_REVISION("Build-Revision"),
+    BUILD_BRANCH("Build-Branch"),
+    BUILD_TIMESTAMP("Build-Timestamp"),
+    BUILD_JDK("Build-Jdk"),
+    BUILT_BY("Built-By"),
+    ;
+
+    final String manifestName;
+
+    NarManifestEntry(String manifestName) {
+        this.manifestName = manifestName;
+    }
+
+    public String getManifestName() {
+        return manifestName;
+    }
+
+}
diff --git a/minifi-nar-bundles/minifi-framework-bundle/minifi-framework/minifi-nar-utils/src/main/java/org/apache/nifi/nar/NarThreadContextClassLoader.java b/minifi-nar-bundles/minifi-framework-bundle/minifi-framework/minifi-nar-utils/src/main/java/org/apache/nifi/nar/NarThreadContextClassLoader.java
index 6a66ba2..364d3a9 100644
--- a/minifi-nar-bundles/minifi-framework-bundle/minifi-framework/minifi-nar-utils/src/main/java/org/apache/nifi/nar/NarThreadContextClassLoader.java
+++ b/minifi-nar-bundles/minifi-framework-bundle/minifi-framework/minifi-nar-utils/src/main/java/org/apache/nifi/nar/NarThreadContextClassLoader.java
@@ -18,7 +18,9 @@
 
 import org.apache.nifi.authentication.LoginIdentityProvider;
 import org.apache.nifi.authorization.Authorizer;
+import org.apache.nifi.bundle.Bundle;
 import org.apache.nifi.components.Validator;
+import org.apache.nifi.components.state.StateProvider;
 import org.apache.nifi.controller.ControllerService;
 import org.apache.nifi.controller.repository.ContentRepository;
 import org.apache.nifi.controller.repository.FlowFileRepository;
@@ -68,6 +70,7 @@
         narSpecificClasses.add(FlowFileRepository.class);
         narSpecificClasses.add(FlowFileSwapManager.class);
         narSpecificClasses.add(ContentRepository.class);
+        narSpecificClasses.add(StateProvider.class);
     }
 
     private NarThreadContextClassLoader() {
@@ -187,15 +190,17 @@
         final ClassLoader originalClassLoader = Thread.currentThread().getContextClassLoader();
         Thread.currentThread().setContextClassLoader(NarThreadContextClassLoader.getInstance());
         try {
-            final ClassLoader detectedClassLoaderForType = ExtensionManager.getClassLoader(implementationClassName);
-            final Class<?> rawClass;
-            if (detectedClassLoaderForType == null) {
-                // try to find from the current class loader
-                rawClass = Class.forName(implementationClassName);
-            } else {
-                // try to find from the registered classloader for that type
-                rawClass = Class.forName(implementationClassName, true, ExtensionManager.getClassLoader(implementationClassName));
+            final List<Bundle> bundles = ExtensionManager.getBundles(implementationClassName);
+            if (bundles.size() == 0) {
+                throw new IllegalStateException(String.format("The specified implementation class '%s' is not known to this nifi.", implementationClassName));
             }
+            if (bundles.size() > 1) {
+                throw new IllegalStateException(String.format("More than one bundle was found for the specified implementation class '%s', only one is allowed.", implementationClassName));
+            }
+
+            final Bundle bundle = bundles.get(0);
+            final ClassLoader detectedClassLoaderForType = bundle.getClassLoader();
+            final Class<?> rawClass = Class.forName(implementationClassName, true, detectedClassLoaderForType);
 
             Thread.currentThread().setContextClassLoader(detectedClassLoaderForType);
             final Class<?> desiredClass = rawClass.asSubclass(typeDefinition);
diff --git a/minifi-nar-bundles/minifi-framework-bundle/minifi-framework/minifi-nar-utils/src/main/java/org/apache/nifi/nar/SystemBundle.java b/minifi-nar-bundles/minifi-framework-bundle/minifi-framework/minifi-nar-utils/src/main/java/org/apache/nifi/nar/SystemBundle.java
new file mode 100644
index 0000000..0fb2bad
--- /dev/null
+++ b/minifi-nar-bundles/minifi-framework-bundle/minifi-framework/minifi-nar-utils/src/main/java/org/apache/nifi/nar/SystemBundle.java
@@ -0,0 +1,57 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.nifi.nar;
+
+import org.apache.nifi.bundle.Bundle;
+import org.apache.nifi.bundle.BundleCoordinate;
+import org.apache.nifi.bundle.BundleDetails;
+import org.apache.nifi.util.NiFiProperties;
+import org.apache.nifi.util.StringUtils;
+
+import java.io.File;
+
+/**
+ * Utility to create the system bundle.
+ */
+public final class SystemBundle {
+
+    public static final BundleCoordinate SYSTEM_BUNDLE_COORDINATE = new BundleCoordinate(
+            BundleCoordinate.DEFAULT_GROUP, "system", BundleCoordinate.DEFAULT_VERSION);
+
+    /**
+     * Returns a bundle representing the system class loader.
+     *
+     * @param niFiProperties a NiFiProperties instance which will be used to obtain the default NAR library path,
+     *                       which will become the working directory of the returned bundle
+     * @return a bundle for the system class loader
+     */
+    public static Bundle create(final NiFiProperties niFiProperties) {
+        final ClassLoader systemClassLoader = ClassLoader.getSystemClassLoader();
+
+        final String narLibraryDirectory = niFiProperties.getProperty(NiFiProperties.NAR_LIBRARY_DIRECTORY);
+        if (StringUtils.isBlank(narLibraryDirectory)) {
+            throw new IllegalStateException("Unable to create system bundle because " + NiFiProperties.NAR_LIBRARY_DIRECTORY + " was null or empty");
+        }
+
+        final BundleDetails systemBundleDetails = new BundleDetails.Builder()
+                .workingDir(new File(narLibraryDirectory))
+                .coordinate(SYSTEM_BUNDLE_COORDINATE)
+                .build();
+
+        return new Bundle(systemBundleDetails, systemClassLoader);
+    }
+}
diff --git a/minifi-nar-bundles/minifi-framework-bundle/minifi-framework/minifi-resources/src/main/resources/conf/logback.xml b/minifi-nar-bundles/minifi-framework-bundle/minifi-framework/minifi-resources/src/main/resources/conf/logback.xml
index 555a25c..d0257a1 100644
--- a/minifi-nar-bundles/minifi-framework-bundle/minifi-framework/minifi-resources/src/main/resources/conf/logback.xml
+++ b/minifi-nar-bundles/minifi-framework-bundle/minifi-framework/minifi-resources/src/main/resources/conf/logback.xml
@@ -36,9 +36,9 @@
             <!-- Provide a cap of 10 MB across all archive files -->
             <totalSizeCap>10MB</totalSizeCap>
         </rollingPolicy>
+        <immediateFlush>true</immediateFlush>
         <encoder class="ch.qos.logback.classic.encoder.PatternLayoutEncoder">
             <pattern>%date %level [%thread] %logger{40} %msg%n</pattern>
-            <immediateFlush>true</immediateFlush>
         </encoder>
     </appender>
 
diff --git a/minifi-nar-bundles/minifi-framework-bundle/minifi-framework/minifi-runtime/src/main/java/org/apache/nifi/minifi/MiNiFi.java b/minifi-nar-bundles/minifi-framework-bundle/minifi-framework/minifi-runtime/src/main/java/org/apache/nifi/minifi/MiNiFi.java
index 573b139..8ee1626 100644
--- a/minifi-nar-bundles/minifi-framework-bundle/minifi-framework/minifi-runtime/src/main/java/org/apache/nifi/minifi/MiNiFi.java
+++ b/minifi-nar-bundles/minifi-framework-bundle/minifi-framework/minifi-runtime/src/main/java/org/apache/nifi/minifi/MiNiFi.java
@@ -16,11 +16,23 @@
  */
 package org.apache.nifi.minifi;
 
+import org.apache.nifi.bundle.Bundle;
+import org.apache.nifi.nar.ExtensionManager;
+import org.apache.nifi.nar.NarClassLoaders;
+import org.apache.nifi.nar.NarUnpacker;
+import org.apache.nifi.nar.SystemBundle;
+import org.apache.nifi.util.FileUtils;
+import org.apache.nifi.util.NiFiProperties;
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+import org.slf4j.bridge.SLF4JBridgeHandler;
+
 import java.io.File;
 import java.io.IOException;
 import java.lang.Thread.UncaughtExceptionHandler;
 import java.lang.reflect.Constructor;
 import java.lang.reflect.InvocationTargetException;
+import java.util.Set;
 import java.util.Timer;
 import java.util.TimerTask;
 import java.util.concurrent.Executors;
@@ -32,15 +44,6 @@
 import java.util.concurrent.atomic.AtomicLong;
 
 // These are from the minifi-nar-utils
-import org.apache.nifi.nar.ExtensionManager;
-import org.apache.nifi.nar.NarClassLoaders;
-import org.apache.nifi.nar.NarUnpacker;
-import org.apache.nifi.util.FileUtils;
-
-import org.apache.nifi.util.NiFiProperties;
-import org.slf4j.Logger;
-import org.slf4j.LoggerFactory;
-import org.slf4j.bridge.SLF4JBridgeHandler;
 
 public class MiNiFi {
 
@@ -111,13 +114,16 @@
         NarClassLoaders.getInstance().init(properties.getFrameworkWorkingDirectory(), properties.getExtensionsWorkingDirectory());
 
         // load the framework classloader
-        final ClassLoader frameworkClassLoader = NarClassLoaders.getInstance().getFrameworkClassLoader();
+        final ClassLoader frameworkClassLoader = NarClassLoaders.getInstance().getFrameworkBundle().getClassLoader();
         if (frameworkClassLoader == null) {
             throw new IllegalStateException("Unable to find the framework NAR ClassLoader.");
         }
 
+        final Bundle systemBundle = SystemBundle.create(properties);
+        final Set<Bundle> narBundles = NarClassLoaders.getInstance().getBundles();
+
         // discover the extensions
-        ExtensionManager.discoverExtensions(NarClassLoaders.getInstance().getExtensionClassLoaders());
+        ExtensionManager.discoverExtensions(systemBundle, narBundles);
         ExtensionManager.logClassLoaderMapping();
 
         // load the server from the framework classloader
diff --git a/minifi-nar-bundles/minifi-provenance-repository-bundle/minifi-persistent-provenance-repository/src/main/java/org/apache/nifi/provenance/MiNiFiPersistentProvenanceRepository.java b/minifi-nar-bundles/minifi-provenance-repository-bundle/minifi-persistent-provenance-repository/src/main/java/org/apache/nifi/provenance/MiNiFiPersistentProvenanceRepository.java
index 5ce83a6..cba2d45 100644
--- a/minifi-nar-bundles/minifi-provenance-repository-bundle/minifi-persistent-provenance-repository/src/main/java/org/apache/nifi/provenance/MiNiFiPersistentProvenanceRepository.java
+++ b/minifi-nar-bundles/minifi-provenance-repository-bundle/minifi-persistent-provenance-repository/src/main/java/org/apache/nifi/provenance/MiNiFiPersistentProvenanceRepository.java
@@ -16,27 +16,47 @@
  */
 package org.apache.nifi.provenance;
 
-import org.apache.commons.lang3.StringUtils;
+import org.apache.nifi.authorization.AuthorizationResult;
+import org.apache.nifi.authorization.AuthorizationResult.Result;
 import org.apache.nifi.authorization.Authorizer;
+import org.apache.nifi.authorization.RequestAction;
+import org.apache.nifi.authorization.resource.Authorizable;
 import org.apache.nifi.authorization.user.NiFiUser;
 import org.apache.nifi.events.EventReporter;
-import org.apache.nifi.processor.DataUnit;
 import org.apache.nifi.provenance.expiration.ExpirationAction;
 import org.apache.nifi.provenance.expiration.FileRemovalAction;
+import org.apache.nifi.provenance.index.EventIndexWriter;
 import org.apache.nifi.provenance.lineage.ComputeLineageSubmission;
+import org.apache.nifi.provenance.lucene.IndexManager;
+import org.apache.nifi.provenance.lucene.IndexingAction;
+import org.apache.nifi.provenance.lucene.LuceneUtil;
+import org.apache.nifi.provenance.lucene.SimpleIndexManager;
+import org.apache.nifi.provenance.lucene.UpdateMinimumEventId;
 import org.apache.nifi.provenance.search.Query;
+import org.apache.nifi.provenance.search.QueryResult;
 import org.apache.nifi.provenance.search.QuerySubmission;
 import org.apache.nifi.provenance.search.SearchableField;
 import org.apache.nifi.provenance.serialization.RecordReader;
 import org.apache.nifi.provenance.serialization.RecordReaders;
 import org.apache.nifi.provenance.serialization.RecordWriter;
 import org.apache.nifi.provenance.serialization.RecordWriters;
+import org.apache.nifi.provenance.serialization.StorageSummary;
 import org.apache.nifi.provenance.toc.TocReader;
 import org.apache.nifi.provenance.toc.TocUtil;
+import org.apache.nifi.provenance.util.NamedThreadFactory;
 import org.apache.nifi.reporting.Severity;
 import org.apache.nifi.util.FormatUtils;
 import org.apache.nifi.util.NiFiProperties;
+import org.apache.nifi.util.RingBuffer;
+import org.apache.nifi.util.RingBuffer.ForEachEvaluator;
 import org.apache.nifi.util.StopWatch;
+import org.apache.nifi.util.Tuple;
+import org.apache.nifi.util.timebuffer.CountSizeEntityAccess;
+import org.apache.nifi.util.timebuffer.LongEntityAccess;
+import org.apache.nifi.util.timebuffer.TimedBuffer;
+import org.apache.nifi.util.timebuffer.TimedCountSize;
+import org.apache.nifi.util.timebuffer.TimestampedLong;
+import org.apache.nifi.web.ResourceNotFoundException;
 import org.slf4j.Logger;
 import org.slf4j.LoggerFactory;
 
@@ -44,10 +64,10 @@
 import java.io.File;
 import java.io.FileFilter;
 import java.io.FileNotFoundException;
+import java.io.FilenameFilter;
 import java.io.IOException;
 import java.nio.file.Files;
 import java.nio.file.Path;
-import java.nio.file.Paths;
 import java.util.ArrayList;
 import java.util.Collection;
 import java.util.Collections;
@@ -61,8 +81,15 @@
 import java.util.Set;
 import java.util.SortedMap;
 import java.util.TreeMap;
+import java.util.concurrent.BlockingQueue;
+import java.util.concurrent.Callable;
+import java.util.concurrent.ConcurrentHashMap;
+import java.util.concurrent.ConcurrentMap;
+import java.util.concurrent.ExecutionException;
+import java.util.concurrent.ExecutorService;
 import java.util.concurrent.Executors;
 import java.util.concurrent.Future;
+import java.util.concurrent.LinkedBlockingQueue;
 import java.util.concurrent.ScheduledExecutorService;
 import java.util.concurrent.ThreadFactory;
 import java.util.concurrent.TimeUnit;
@@ -72,25 +99,24 @@
 import java.util.concurrent.atomic.AtomicReference;
 import java.util.concurrent.locks.Lock;
 import java.util.concurrent.locks.ReadWriteLock;
-import java.util.concurrent.locks.ReentrantLock;
 import java.util.concurrent.locks.ReentrantReadWriteLock;
 import java.util.regex.Pattern;
+import java.util.stream.Collectors;
 
-import static org.apache.nifi.provenance.toc.TocUtil.getTocFile;
-
-
-// TODO: When API, FlowController, and supporting classes are refactored/reimplemented migrate this class and its accompanying imports to minifi package structure
 public class MiNiFiPersistentProvenanceRepository implements ProvenanceRepository {
 
     public static final String EVENT_CATEGORY = "Provenance Repository";
     private static final String FILE_EXTENSION = ".prov";
     private static final String TEMP_FILE_SUFFIX = ".prov.part";
     private static final long PURGE_EVENT_MILLISECONDS = 2500L; //Determines the frequency over which the task to delete old events will occur
-    public static final int SERIALIZATION_VERSION = 8;
     public static final Pattern NUMBER_PATTERN = Pattern.compile("\\d+");
+    public static final Pattern INDEX_PATTERN = Pattern.compile("index-\\d+");
+    public static final Pattern LOG_FILENAME_PATTERN = Pattern.compile("(\\d+).*\\.prov");
+    public static final int MAX_UNDELETED_QUERY_RESULTS = 10;
+    public static final int MAX_INDEXING_FAILURE_COUNT = 5; // how many indexing failures we will tolerate before skipping indexing for a prov file
+    public static final int MAX_JOURNAL_ROLLOVER_RETRIES = 5;
 
-
-    private static final Logger logger = LoggerFactory.getLogger(MiNiFiPersistentProvenanceRepository.class);
+    private static final Logger logger = LoggerFactory.getLogger(PersistentProvenanceRepository.class);
 
     private final long maxPartitionMillis;
     private final long maxPartitionBytes;
@@ -111,15 +137,21 @@
 
     private final AtomicLong streamStartTime = new AtomicLong(System.currentTimeMillis());
     private final RepositoryConfiguration configuration;
+    private final IndexConfiguration indexConfig;
+    private final IndexManager indexManager;
     private final boolean alwaysSync;
     private final int rolloverCheckMillis;
     private final int maxAttributeChars;
 
     private final ScheduledExecutorService scheduledExecService;
     private final ScheduledExecutorService rolloverExecutor;
+    private final ExecutorService queryExecService;
 
     private final List<ExpirationAction> expirationActions = new ArrayList<>();
 
+    private final ConcurrentMap<String, AsyncQuerySubmission> querySubmissionMap = new ConcurrentHashMap<>();
+    private final ConcurrentMap<String, AsyncLineageSubmission> lineageSubmissionMap = new ConcurrentHashMap<>();
+
     private final AtomicLong writerIndex = new AtomicLong(0L);
     private final AtomicLong storageDirectoryIndex = new AtomicLong(0L);
     private final AtomicLong bytesWrittenSinceRollover = new AtomicLong(0L);
@@ -129,26 +161,39 @@
 
     private final AtomicInteger dirtyWriterCount = new AtomicInteger(0);
 
-    private EventReporter eventReporter;
+    // we keep the last 1000 records on hand so that when the UI is opened and it asks for the last 1000 records we don't need to
+    // read them. Since this is a very cheap operation to keep them, it's worth the tiny expense for the improved user experience.
+    private final RingBuffer<ProvenanceEventRecord> latestRecords = new RingBuffer<>(1000);
+    private EventReporter eventReporter; // effectively final
+    private Authorizer authorizer;  // effectively final
+    private ProvenanceAuthorizableFactory resourceFactory;  // effectively final
 
-    private final Lock idLock = new ReentrantLock();
-    private Long maxId = null;
+    private final TimedBuffer<TimedCountSize> updateCounts = new TimedBuffer<>(TimeUnit.SECONDS, 300, new CountSizeEntityAccess());
+    private final TimedBuffer<TimestampedLong> backpressurePauseMillis = new TimedBuffer<>(TimeUnit.SECONDS, 300, new LongEntityAccess());
 
-    public MiNiFiPersistentProvenanceRepository() throws IOException {
+    /**
+     * default no args constructor for service loading only.
+     */
+    public MiNiFiPersistentProvenanceRepository() {
         maxPartitionMillis = 0;
         maxPartitionBytes = 0;
         writers = null;
         configuration = null;
+        indexConfig = null;
+        indexManager = null;
         alwaysSync = false;
         rolloverCheckMillis = 0;
         maxAttributeChars = 0;
         scheduledExecService = null;
         rolloverExecutor = null;
+        queryExecService = null;
         eventReporter = null;
+        authorizer = null;
+        resourceFactory = null;
     }
 
     public MiNiFiPersistentProvenanceRepository(final NiFiProperties nifiProperties) throws IOException {
-        this(createRepositoryConfiguration(nifiProperties), 10000);
+        this(RepositoryConfiguration.create(nifiProperties), 10000);
     }
 
     public MiNiFiPersistentProvenanceRepository(final RepositoryConfiguration configuration, final int rolloverCheckMillis) throws IOException {
@@ -159,7 +204,7 @@
         this.configuration = configuration;
         this.maxAttributeChars = configuration.getMaxAttributeChars();
 
-        for (final File file : configuration.getStorageDirectories()) {
+        for (final File file : configuration.getStorageDirectories().values()) {
             final Path storageDirectory = file.toPath();
             final Path journalDirectory = storageDirectory.resolve("journals");
 
@@ -172,10 +217,13 @@
 
         this.maxPartitionMillis = configuration.getMaxEventFileLife(TimeUnit.MILLISECONDS);
         this.maxPartitionBytes = configuration.getMaxEventFileCapacity();
+        this.indexConfig = new IndexConfiguration(configuration);
+        this.indexManager = new SimpleIndexManager(configuration);
         this.alwaysSync = configuration.isAlwaysSync();
         this.rolloverCheckMillis = rolloverCheckMillis;
 
         scheduledExecService = Executors.newScheduledThreadPool(3, new NamedThreadFactory("Provenance Maintenance Thread"));
+        queryExecService = Executors.newFixedThreadPool(configuration.getQueryThreadPoolSize(), new NamedThreadFactory("Provenance Query Thread"));
 
         // The number of rollover threads is a little bit arbitrary but comes from the idea that multiple storage directories generally
         // live on separate physical partitions. As a result, we want to use at least one thread per partition in order to utilize the
@@ -185,8 +233,13 @@
         rolloverExecutor = Executors.newScheduledThreadPool(numRolloverThreads, new NamedThreadFactory("Provenance Repository Rollover Thread"));
     }
 
+    protected IndexManager getIndexManager() {
+        return indexManager;
+    }
+
     @Override
-    public void initialize(final EventReporter eventReporter, final Authorizer authorizer, final ProvenanceAuthorizableFactory resourceFactory) throws IOException {
+    public void initialize(final EventReporter eventReporter, final Authorizer authorizer, final ProvenanceAuthorizableFactory resourceFactory,
+                           final IdentifierLookup idLookup) throws IOException {
         writeLock.lock();
         try {
             if (initialized.getAndSet(true)) {
@@ -194,6 +247,8 @@
             }
 
             this.eventReporter = eventReporter;
+            this.authorizer = authorizer;
+            this.resourceFactory = resourceFactory;
 
             recover();
 
@@ -229,6 +284,7 @@
                     }
                 }, rolloverCheckMillis, rolloverCheckMillis, TimeUnit.MILLISECONDS);
 
+                expirationActions.add(new UpdateMinimumEventId(indexConfig));
                 expirationActions.add(new FileRemovalAction());
 
                 scheduledExecService.scheduleWithFixedDelay(new Runnable() {
@@ -253,66 +309,10 @@
         }
     }
 
-    private static RepositoryConfiguration createRepositoryConfiguration(final NiFiProperties properties) throws IOException {
-        final Map<String, Path> storageDirectories = properties.getProvenanceRepositoryPaths();
-        if (storageDirectories.isEmpty()) {
-            storageDirectories.put("provenance_repository", Paths.get("provenance_repository"));
-        }
-        final String storageTime = properties.getProperty(NiFiProperties.PROVENANCE_MAX_STORAGE_TIME, "24 hours");
-        final String storageSize = properties.getProperty(NiFiProperties.PROVENANCE_MAX_STORAGE_SIZE, "1 GB");
-        final String rolloverTime = properties.getProperty(NiFiProperties.PROVENANCE_ROLLOVER_TIME, "5 mins");
-        final String rolloverSize = properties.getProperty(NiFiProperties.PROVENANCE_ROLLOVER_SIZE, "100 MB");
-        final String shardSize = properties.getProperty(NiFiProperties.PROVENANCE_INDEX_SHARD_SIZE, "500 MB");
-        final int journalCount = properties.getIntegerProperty(NiFiProperties.PROVENANCE_JOURNAL_COUNT, 16);
-
-        final long storageMillis = FormatUtils.getTimeDuration(storageTime, TimeUnit.MILLISECONDS);
-        final long maxStorageBytes = DataUnit.parseDataSize(storageSize, DataUnit.B).longValue();
-        final long rolloverMillis = FormatUtils.getTimeDuration(rolloverTime, TimeUnit.MILLISECONDS);
-        final long rolloverBytes = DataUnit.parseDataSize(rolloverSize, DataUnit.B).longValue();
-
-        final boolean compressOnRollover = Boolean.parseBoolean(properties.getProperty(NiFiProperties.PROVENANCE_COMPRESS_ON_ROLLOVER));
-
-        final Boolean alwaysSync = Boolean.parseBoolean(properties.getProperty("nifi.provenance.repository.always.sync", "false"));
-
-        final int defaultMaxAttrChars = 65536;
-        final String maxAttrLength = properties.getProperty("nifi.provenance.repository.max.attribute.length", String.valueOf(defaultMaxAttrChars));
-        int maxAttrChars;
-        try {
-            maxAttrChars = Integer.parseInt(maxAttrLength);
-            // must be at least 36 characters because that's the length of the uuid attribute,
-            // which must be kept intact
-            if (maxAttrChars < 36) {
-                maxAttrChars = 36;
-                logger.warn("Found max attribute length property set to " + maxAttrLength + " but minimum length is 36; using 36 instead");
-            }
-        } catch (final Exception e) {
-            maxAttrChars = defaultMaxAttrChars;
-        }
-
-        final RepositoryConfiguration config = new RepositoryConfiguration();
-        for (final Path path : storageDirectories.values()) {
-            config.addStorageDirectory(path.toFile());
-        }
-        config.setCompressOnRollover(compressOnRollover);
-        config.setMaxEventFileCapacity(rolloverBytes);
-        config.setMaxEventFileLife(rolloverMillis, TimeUnit.MILLISECONDS);
-        config.setMaxRecordLife(storageMillis, TimeUnit.MILLISECONDS);
-        config.setMaxStorageCapacity(maxStorageBytes);
-        config.setJournalCount(journalCount);
-        config.setMaxAttributeChars(maxAttrChars);
-
-        if (shardSize != null) {
-            config.setDesiredIndexSize(DataUnit.parseDataSize(shardSize, DataUnit.B).longValue());
-        }
-
-        config.setAlwaysSync(alwaysSync);
-
-        return config;
-    }
 
     // protected in order to override for unit tests
     protected RecordWriter[] createWriters(final RepositoryConfiguration config, final long initialRecordId) throws IOException {
-        final List<File> storageDirectories = config.getStorageDirectories();
+        final List<File> storageDirectories = new ArrayList<>(config.getStorageDirectories().values());
 
         final RecordWriter[] writers = new RecordWriter[config.getJournalCount()];
         for (int i = 0; i < config.getJournalCount(); i++) {
@@ -320,7 +320,7 @@
             final File journalDirectory = new File(storageDirectory, "journals");
             final File journalFile = new File(journalDirectory, String.valueOf(initialRecordId) + ".journal." + i);
 
-            writers[i] = RecordWriters.newSchemaRecordWriter(journalFile, false, false);
+            writers[i] = RecordWriters.newSchemaRecordWriter(journalFile, idGenerator, false, false);
             writers[i].writeHeader(initialRecordId);
         }
 
@@ -329,8 +329,9 @@
     }
 
     /**
-     * @return the maximum number of characters that any Event attribute should contain. If the event contains
-     * more characters than this, the attribute may be truncated on retrieval
+     * @return the maximum number of characters that any Event attribute should
+     * contain. If the event contains more characters than this, the attribute
+     * may be truncated on retrieval
      */
     public int getMaxAttributeCharacters() {
         return maxAttributeChars;
@@ -351,13 +352,55 @@
         persistRecord(events);
     }
 
-    @Override
-    public List<ProvenanceEventRecord> getEvents(final long firstRecordId, final int maxRecords, final NiFiUser user) throws IOException {
-        throw new MethodNotSupportedException("Cannot list events for a specified user.");
+    public boolean isAuthorized(final ProvenanceEventRecord event, final NiFiUser user) {
+        if (authorizer == null || user == null) {
+            return true;
+        }
+
+        final Authorizable eventAuthorizable;
+        try {
+            if (event.isRemotePortType()) {
+                eventAuthorizable = resourceFactory.createRemoteDataAuthorizable(event.getComponentId());
+            } else {
+                eventAuthorizable = resourceFactory.createLocalDataAuthorizable(event.getComponentId());
+            }
+        } catch (final ResourceNotFoundException rnfe) {
+            return false;
+        }
+
+        final AuthorizationResult result = eventAuthorizable.checkAuthorization(authorizer, RequestAction.READ, user, event.getAttributes());
+        return Result.Approved.equals(result.getResult());
+    }
+
+    public void authorize(final ProvenanceEventRecord event, final NiFiUser user) {
+        if (authorizer == null) {
+            return;
+        }
+
+        final Authorizable eventAuthorizable;
+        if (event.isRemotePortType()) {
+            eventAuthorizable = resourceFactory.createRemoteDataAuthorizable(event.getComponentId());
+        } else {
+            eventAuthorizable = resourceFactory.createLocalDataAuthorizable(event.getComponentId());
+        }
+        eventAuthorizable.authorize(authorizer, RequestAction.READ, user, event.getAttributes());
+    }
+
+    public List<ProvenanceEventRecord> filterUnauthorizedEvents(final List<ProvenanceEventRecord> events, final NiFiUser user) {
+        return events.stream().filter(event -> isAuthorized(event, user)).collect(Collectors.<ProvenanceEventRecord>toList());
+    }
+
+    public Set<ProvenanceEventRecord> replaceUnauthorizedWithPlaceholders(final Set<ProvenanceEventRecord> events, final NiFiUser user) {
+        return events.stream().map(event -> isAuthorized(event, user) ? event : new PlaceholderProvenanceEvent(event)).collect(Collectors.toSet());
     }
 
     @Override
     public List<ProvenanceEventRecord> getEvents(final long firstRecordId, final int maxRecords) throws IOException {
+        return getEvents(firstRecordId, maxRecords, null);
+    }
+
+    @Override
+    public List<ProvenanceEventRecord> getEvents(final long firstRecordId, final int maxRecords, final NiFiUser user) throws IOException {
         final List<ProvenanceEventRecord> records = new ArrayList<>(maxRecords);
 
         final List<Path> paths = getPathsForId(firstRecordId);
@@ -382,7 +425,7 @@
 
                 StandardProvenanceEventRecord record;
                 while (records.size() < maxRecords && (record = reader.nextRecord()) != null) {
-                    if (record.getEventId() >= firstRecordId) {
+                    if (record.getEventId() >= firstRecordId && isAuthorized(record, user)) {
                         records.add(record);
                     }
                 }
@@ -459,9 +502,11 @@
 
     private void recover() throws IOException {
         long maxId = -1L;
+        long maxIndexedId = -1L;
+        long minIndexedId = Long.MAX_VALUE;
 
         final List<File> filesToRecover = new ArrayList<>();
-        for (final File file : configuration.getStorageDirectories()) {
+        for (final File file : configuration.getStorageDirectories().values()) {
             final File[] matchingFiles = file.listFiles(new FileFilter() {
                 @Override
                 public boolean accept(final File pathname) {
@@ -490,13 +535,21 @@
         for (final File file : filesToRecover) {
             final String filename = file.getName();
             final String baseName = filename.substring(0, filename.indexOf("."));
-            final long fileFirstId = Long.parseLong(baseName);
-            sortedPathMap.put(fileFirstId, file.toPath());
+            final long firstId = Long.parseLong(baseName);
+            sortedPathMap.put(firstId, file.toPath());
 
-            if (fileFirstId > maxId) {
-                maxId = fileFirstId;
+            if (firstId > maxId) {
+                maxId = firstId;
                 maxIdFile = file;
             }
+
+            if (firstId > maxIndexedId) {
+                maxIndexedId = firstId - 1;
+            }
+
+            if (firstId < minIndexedId) {
+                minIndexedId = firstId;
+            }
         }
 
         if (maxIdFile != null) {
@@ -505,17 +558,28 @@
                 final long eventId = reader.getMaxEventId();
                 if (eventId > maxId) {
                     maxId = eventId;
-                    checkAndSetMaxEventId(maxId);
                 }
 
+                // If the ID is greater than the max indexed id and this file was indexed, then
+                // update the max indexed id
+                if (eventId > maxIndexedId) {
+                    maxIndexedId = eventId;
+                }
             } catch (final IOException ioe) {
                 logger.error("Failed to read Provenance Event File {} due to {}", maxIdFile, ioe);
                 logger.error("", ioe);
             }
         }
 
-        // Establish current max event ID and increment generator to pick up from this point
-        checkAndSetMaxEventId(maxId);
+        if (maxIndexedId > -1L) {
+            // If we have indexed anything then set the min/max ID's indexed.
+            indexConfig.setMaxIdIndexed(maxIndexedId);
+        }
+
+        if (minIndexedId < Long.MAX_VALUE) {
+            indexConfig.setMinIdIndexed(minIndexedId);
+        }
+
         idGenerator.set(maxId + 1);
 
         try {
@@ -531,7 +595,7 @@
                     continue;
                 }
 
-                final String basename = StringUtils.substringBefore(recoveredJournal.getName(), ".");
+                final String basename = LuceneUtil.substringBefore(recoveredJournal.getName(), ".");
                 try {
                     final long minId = Long.parseLong(basename);
 
@@ -562,7 +626,14 @@
         idToPathMap.set(Collections.unmodifiableSortedMap(sortedPathMap));
         logger.trace("In recovery, path map: {}", sortedPathMap);
 
-        logger.info("Recovered records");
+        final long recordsRecovered;
+        if (minIndexedId < Long.MAX_VALUE) {
+            recordsRecovered = idGenerator.get() - minIndexedId;
+        } else {
+            recordsRecovered = idGenerator.get();
+        }
+
+        logger.info("Recovered {} records", recordsRecovered);
         recoveryFinished.set(true);
     }
 
@@ -575,6 +646,9 @@
 
             scheduledExecService.shutdownNow();
             rolloverExecutor.shutdownNow();
+            queryExecService.shutdownNow();
+
+            getIndexManager().close();
 
             if (writers != null) {
                 for (final RecordWriter writer : writers) {
@@ -606,7 +680,7 @@
                 final int numDirty = dirtyWriterCount.get();
                 if (numDirty >= recordWriters.length) {
                     throw new IllegalStateException("Cannot update repository because all partitions are unusable at this time. Writing to the repository would cause corruption. "
-                            + "This most often happens as a result of the repository running out of disk space or the JMV running out of memory.");
+                            + "This most often happens as a result of the repository running out of disk space or the JVM running out of memory.");
                 }
 
                 final long idx = writerIndex.getAndIncrement();
@@ -616,19 +690,23 @@
 
             try {
                 try {
+                    long recordsWritten = 0L;
                     for (final ProvenanceEventRecord nextRecord : records) {
-                        final long eventId = idGenerator.getAndIncrement();
-                        bytesWritten += writer.writeRecord(nextRecord, eventId);
-                        logger.trace("Wrote record with ID {} to {}", eventId, writer);
-                        checkAndSetMaxEventId(eventId);
+                        final StorageSummary persistedEvent = writer.writeRecord(nextRecord);
+                        bytesWritten += persistedEvent.getSerializedLength();
+                        recordsWritten++;
+                        logger.trace("Wrote record with ID {} to {}", persistedEvent.getEventId(), writer);
                     }
 
+                    writer.flush();
+
                     if (alwaysSync) {
                         writer.sync();
                     }
 
                     totalJournalSize = bytesWrittenSinceRollover.addAndGet(bytesWritten);
                     recordsWrittenSinceRollover.getAndIncrement();
+                    this.updateCounts.add(new TimedCountSize(recordsWritten, bytesWritten));
                 } catch (final Throwable t) {
                     // We need to set the repoDirty flag before we release the lock for this journal.
                     // Otherwise, another thread may write to this journal -- this is a problem because
@@ -702,7 +780,8 @@
     }
 
     /**
-     * @return all of the Provenance Event Log Files (not the journals, the merged files) available across all storage directories.
+     * @return all of the Provenance Event Log Files (not the journals, the
+     * merged files) available across all storage directories.
      */
     private List<File> getLogFiles() {
         final List<File> files = new ArrayList<>();
@@ -720,9 +799,11 @@
     /**
      * Returns the size, in bytes, of the Repository storage
      *
-     * @param logFiles   the log files to consider
-     * @param timeCutoff if a log file's last modified date is before timeCutoff, it will be skipped
-     * @return the size of all log files given whose last mod date comes after (or equal to) timeCutoff
+     * @param logFiles the log files to consider
+     * @param timeCutoff if a log file's last modified date is before
+     * timeCutoff, it will be skipped
+     * @return the size of all log files given whose last mod date comes after
+     * (or equal to) timeCutoff
      */
     public long getSize(final List<File> logFiles, final long timeCutoff) {
         long bytesUsed = 0L;
@@ -737,6 +818,8 @@
             bytesUsed += file.length();
         }
 
+        // take into account the size of the indices
+        bytesUsed += indexConfig.getIndexSize();
         return bytesUsed;
     }
 
@@ -774,8 +857,8 @@
         final Comparator<File> sortByBasenameComparator = new Comparator<File>() {
             @Override
             public int compare(final File o1, final File o2) {
-                final String baseName1 = StringUtils.substringBefore(o1.getName(), ".");
-                final String baseName2 = StringUtils.substringBefore(o2.getName(), ".");
+                final String baseName1 = LuceneUtil.substringBefore(o1.getName(), ".");
+                final String baseName2 = LuceneUtil.substringBefore(o2.getName(), ".");
 
                 Long id1 = null;
                 Long id2 = null;
@@ -829,7 +912,7 @@
         // Age off the data.
         final Set<String> removed = new LinkedHashSet<>();
         for (File file : uniqueFilesToPurge) {
-            final String baseName = StringUtils.substringBefore(file.getName(), ".");
+            final String baseName = LuceneUtil.substringBefore(file.getName(), ".");
             ExpirationAction currentAction = null;
             try {
                 for (final ExpirationAction action : expirationActions) {
@@ -852,9 +935,9 @@
                 logger.warn("Failed to perform Expiration Action {} on Provenance Event file {} due to {}; will not perform additional "
                         + "Expiration Actions on this file at this time", currentAction, file, t.toString());
                 logger.warn("", t);
-                eventReporter.reportEvent(Severity.WARNING, EVENT_CATEGORY, "Failed to perform Expiration Action " + currentAction +
-                        " on Provenance Event file " + file + " due to " + t.toString() + "; will not perform additional Expiration Actions " +
-                        "on this file at this time");
+                eventReporter.reportEvent(Severity.WARNING, EVENT_CATEGORY, "Failed to perform Expiration Action " + currentAction
+                        + " on Provenance Event file " + file + " due to " + t.toString() + "; will not perform additional Expiration Actions "
+                        + "on this file at this time");
             }
         }
 
@@ -872,7 +955,7 @@
             while (itr.hasNext()) {
                 final Map.Entry<Long, Path> entry = itr.next();
                 final String filename = entry.getValue().toFile().getName();
-                final String baseName = StringUtils.substringBefore(filename, ".");
+                final String baseName = LuceneUtil.substringBefore(filename, ".");
 
                 if (removed.contains(baseName)) {
                     itr.remove();
@@ -882,6 +965,69 @@
             updated = idToPathMap.compareAndSet(existingPathMap, newPathMap);
             logger.debug("After expiration, path map: {}", newPathMap);
         }
+
+        purgeExpiredIndexes();
+    }
+
+    private void purgeExpiredIndexes() throws IOException {
+        // Now that we have potentially removed expired Provenance Event Log Files, we can look at
+        // whether or not we can delete any of the indexes. An index can be deleted if all of the
+        // data that is associated with that index has already been deleted. In order to test this,
+        // we will get the timestamp of the earliest event and then compare that to the latest timestamp
+        // that would be indexed by the earliest index. If the event occurred after the timestamp of
+        // the latest index, then we can just delete the entire index all together.
+
+        // find all of the index directories
+        final List<File> indexDirs = getAllIndexDirectories();
+        if (indexDirs.size() < 2) {
+            this.firstEventTimestamp = determineFirstEventTimestamp();
+            return;
+        }
+
+        // Indexes are named "index-XXX" where the XXX is the timestamp of the earliest event that
+        // could be in the index. Once we have finished with one index, we move on to another index,
+        // but we don't move on until we are finished with the previous index.
+        // Therefore, an efficient way to determine the latest timestamp of one index is to look at the
+        // timestamp of the next index (these could potentially overlap for one millisecond). This is
+        // efficient because we can determine the earliest timestamp of an index simply by looking at
+        // the name of the Index's directory.
+        final long latestTimestampOfFirstIndex = getIndexTimestamp(indexDirs.get(1));
+
+        // Get the timestamp of the first event in the first Provenance Event Log File and the ID of the last event
+        // in the event file.
+        final List<File> logFiles = getSortedLogFiles();
+        if (logFiles.isEmpty()) {
+            this.firstEventTimestamp = System.currentTimeMillis();
+            return;
+        }
+
+        final File firstLogFile = logFiles.get(0);
+        long earliestEventTime = System.currentTimeMillis();
+        long maxEventId = -1L;
+        try (final RecordReader reader = RecordReaders.newRecordReader(firstLogFile, null, Integer.MAX_VALUE)) {
+            final StandardProvenanceEventRecord event = reader.nextRecord();
+            earliestEventTime = event.getEventTime();
+            maxEventId = reader.getMaxEventId();
+        } catch (final IOException ioe) {
+            logger.warn("Unable to determine the maximum ID for Provenance Event Log File {}; values reported for the number of "
+                    + "events in the Provenance Repository may be inaccurate.", firstLogFile);
+        }
+
+        // check if we can delete the index safely.
+        if (latestTimestampOfFirstIndex <= earliestEventTime) {
+            // we can safely delete the first index because the latest event in the index is an event
+            // that has already been expired from the repository.
+            final File indexingDirectory = indexDirs.get(0);
+            getIndexManager().removeIndex(indexingDirectory);
+            indexConfig.removeIndexDirectory(indexingDirectory);
+            deleteDirectory(indexingDirectory);
+
+            if (maxEventId > -1L) {
+                indexConfig.setMinIdIndexed(maxEventId + 1L);
+            }
+        }
+
+        this.firstEventTimestamp = earliestEventTime;
     }
 
     private long determineFirstEventTimestamp() {
@@ -907,7 +1053,83 @@
     }
 
     /**
-     * Blocks the calling thread until the repository rolls over. This is intended for unit testing.
+     * Recursively deletes the given directory. If unable to delete the
+     * directory, will emit a WARN level log event and move on.
+     *
+     * @param dir the directory to delete
+     */
+    private void deleteDirectory(final File dir) {
+        if (dir == null || !dir.exists()) {
+            return;
+        }
+
+        final File[] children = dir.listFiles();
+        if (children == null) {
+            return;
+        }
+
+        for (final File child : children) {
+            if (child.isDirectory()) {
+                deleteDirectory(child);
+            } else if (!child.delete()) {
+                logger.warn("Unable to remove index directory {}; this directory should be cleaned up manually", child.getAbsolutePath());
+            }
+        }
+
+        if (!dir.delete()) {
+            logger.warn("Unable to remove index directory {}; this directory should be cleaned up manually", dir);
+        }
+    }
+
+    /**
+     * @return a List of all Index directories, sorted by timestamp of the
+     * earliest event that could be present in the index
+     */
+    private List<File> getAllIndexDirectories() {
+        final List<File> allIndexDirs = new ArrayList<>();
+        for (final File storageDir : configuration.getStorageDirectories().values()) {
+            final File[] indexDirs = storageDir.listFiles(new FilenameFilter() {
+                @Override
+                public boolean accept(final File dir, final String name) {
+                    return INDEX_PATTERN.matcher(name).matches();
+                }
+            });
+
+            if (indexDirs != null) {
+                for (final File indexDir : indexDirs) {
+                    allIndexDirs.add(indexDir);
+                }
+            }
+        }
+
+        Collections.sort(allIndexDirs, new Comparator<File>() {
+            @Override
+            public int compare(final File o1, final File o2) {
+                final long time1 = getIndexTimestamp(o1);
+                final long time2 = getIndexTimestamp(o2);
+                return Long.compare(time1, time2);
+            }
+        });
+
+        return allIndexDirs;
+    }
+
+    /**
+     * Takes a File that has a filename "index-" followed by a Long and returns
+     * the value of that Long
+     *
+     * @param indexDirectory the index directory to obtain the timestamp for
+     * @return the timestamp associated with the given index
+     */
+    private long getIndexTimestamp(final File indexDirectory) {
+        final String name = indexDirectory.getName();
+        final int dashIndex = name.indexOf("-");
+        return Long.parseLong(name.substring(dashIndex + 1));
+    }
+
+    /**
+     * Blocks the calling thread until the repository rolls over. This is
+     * intended for unit testing.
      */
     public void waitForRollover() {
         final int count = rolloverCompletions.get();
@@ -920,13 +1142,14 @@
     }
 
     /**
-     * @return the number of journal files that exist across all storage directories
+     * @return the number of journal files that exist across all storage
+     * directories
      */
     // made protected for testing purposes
     protected int getJournalCount() {
         // determine how many 'journals' we have in the journals directories
         int journalFileCount = 0;
-        for (final File storageDir : configuration.getStorageDirectories()) {
+        for (final File storageDir : configuration.getStorageDirectories().values()) {
             final File journalsDir = new File(storageDir, "journals");
             final File[] journalFiles = journalsDir.listFiles();
             if (journalFiles != null) {
@@ -937,7 +1160,6 @@
         return journalFileCount;
     }
 
-
     /**
      * Method is exposed for unit testing
      *
@@ -953,15 +1175,20 @@
         }
     }
 
+    protected long getRolloverRetryMillis() {
+        return 10000L;
+    }
+
     /**
      * <p>
      * MUST be called with the write lock held.
      * </p>
-     * <p>
-     * Rolls over the data in the journal files, merging them into a single Provenance Event Log File, and
-     * compressing as needed.
      *
-     * @param force if true, will force a rollover regardless of whether or not data has been written
+     * Rolls over the data in the journal files, merging them into a single
+     * Provenance Event Log File, and compressing and indexing as needed.
+     *
+     * @param force if true, will force a rollover regardless of whether or not
+     * data has been written
      * @throws IOException if unable to complete rollover
      */
     private void rollover(final boolean force) throws IOException {
@@ -992,76 +1219,89 @@
                 if (journalsToMerge.isEmpty()) {
                     logger.debug("No journals to merge; all RecordWriters were already closed");
                 } else {
-                    logger.debug("Going to merge {} files for journals starting with ID {}", journalsToMerge.size(), StringUtils.substringBefore(journalsToMerge.get(0).getName(), "."));
+                    logger.debug("Going to merge {} files for journals starting with ID {}", journalsToMerge.size(), LuceneUtil.substringBefore(journalsToMerge.get(0).getName(), "."));
                 }
             }
 
             // Choose a storage directory to store the merged file in.
             final long storageDirIdx = storageDirectoryIndex.getAndIncrement();
-            final List<File> storageDirs = configuration.getStorageDirectories();
+            final List<File> storageDirs = new ArrayList<>(configuration.getStorageDirectories().values());
             final File storageDir = storageDirs.get((int) (storageDirIdx % storageDirs.size()));
 
             Future<?> future = null;
             if (!journalsToMerge.isEmpty()) {
                 // Run the rollover logic in a background thread.
                 final AtomicReference<Future<?>> futureReference = new AtomicReference<>();
+                final AtomicInteger retryAttempts = new AtomicInteger(MAX_JOURNAL_ROLLOVER_RETRIES);
                 final int recordsWritten = recordsWrittenSinceRollover.getAndSet(0);
                 final Runnable rolloverRunnable = new Runnable() {
                     @Override
                     public void run() {
-                        try {
-                            final File fileRolledOver;
+                        File fileRolledOver = null;
 
+                        try {
                             try {
                                 fileRolledOver = mergeJournals(journalsToMerge, getMergeFile(journalsToMerge, storageDir), eventReporter);
                             } catch (final IOException ioe) {
                                 logger.error("Failed to merge Journal Files {} into a Provenance Log File due to {}", journalsToMerge, ioe.toString());
                                 logger.error("", ioe);
-                                return;
                             }
 
-                            if (fileRolledOver == null) {
-                                logger.debug("Couldn't merge journals. Will try again in 10 seconds. journalsToMerge: {}, storageDir: {}", journalsToMerge, storageDir);
-                                return;
-                            }
-                            final File file = fileRolledOver;
+                            if (fileRolledOver != null) {
 
-                            // update our map of id to Path
-                            // We need to make sure that another thread doesn't also update the map at the same time. We cannot
-                            // use the write lock when purging old events, and we want to use the same approach here.
-                            boolean updated = false;
-                            final Long fileFirstEventId = Long.valueOf(StringUtils.substringBefore(fileRolledOver.getName(), "."));
-                            while (!updated) {
-                                final SortedMap<Long, Path> existingPathMap = idToPathMap.get();
-                                final SortedMap<Long, Path> newIdToPathMap = new TreeMap<>(new PathMapComparator());
-                                newIdToPathMap.putAll(existingPathMap);
-                                newIdToPathMap.put(fileFirstEventId, file.toPath());
-                                updated = idToPathMap.compareAndSet(existingPathMap, newIdToPathMap);
-                            }
+                                final File file = fileRolledOver;
 
-                            logger.info("Successfully Rolled over Provenance Event file containing {} records", recordsWritten);
-                            rolloverCompletions.getAndIncrement();
-
-                            // We have finished successfully. Cancel the future so that we don't run anymore
-                            Future<?> future;
-                            while ((future = futureReference.get()) == null) {
-                                try {
-                                    Thread.sleep(10L);
-                                } catch (final InterruptedException ie) {
+                                // update our map of id to Path
+                                // We need to make sure that another thread doesn't also update the map at the same time. We cannot
+                                // use the write lock when purging old events, and we want to use the same approach here.
+                                boolean updated = false;
+                                final Long fileFirstEventId = Long.valueOf(LuceneUtil.substringBefore(fileRolledOver.getName(), "."));
+                                while (!updated) {
+                                    final SortedMap<Long, Path> existingPathMap = idToPathMap.get();
+                                    final SortedMap<Long, Path> newIdToPathMap = new TreeMap<>(new PathMapComparator());
+                                    newIdToPathMap.putAll(existingPathMap);
+                                    newIdToPathMap.put(fileFirstEventId, file.toPath());
+                                    updated = idToPathMap.compareAndSet(existingPathMap, newIdToPathMap);
                                 }
+
+                                final TimedCountSize countSize = updateCounts.getAggregateValue(System.currentTimeMillis() - TimeUnit.MILLISECONDS.convert(5, TimeUnit.MINUTES));
+                                logger.info("Successfully Rolled over Provenance Event file containing {} records. In the past 5 minutes, "
+                                                + "{} events have been written to the Provenance Repository, totaling {}",
+                                        recordsWritten, countSize.getCount(), FormatUtils.formatDataSize(countSize.getSize()));
                             }
 
-                            future.cancel(false);
-                        } catch (final Throwable t) {
-                            logger.error("Failed to rollover Provenance repository due to {}", t.toString());
-                            logger.error("", t);
+                            //if files were rolled over or if out of retries stop the future
+                            if (fileRolledOver != null || retryAttempts.decrementAndGet() == 0) {
+
+                                if (fileRolledOver == null && retryAttempts.get() == 0) {
+                                    logger.error("Failed to merge Journal Files {} after {} attempts.", journalsToMerge, MAX_JOURNAL_ROLLOVER_RETRIES);
+                                }
+
+                                rolloverCompletions.getAndIncrement();
+
+                                // Cancel the future so that we don't run anymore
+                                Future<?> future;
+                                while ((future = futureReference.get()) == null) {
+                                    try {
+                                        Thread.sleep(10L);
+                                    } catch (final InterruptedException ie) {
+                                    }
+                                }
+                                future.cancel(false);
+
+                            } else {
+                                logger.warn("Couldn't merge journals. Will try again. journalsToMerge: {}, storageDir: {}", journalsToMerge, storageDir);
+                            }
+                        } catch (final Exception e) {
+                            logger.error("Failed to merge journals. Will try again. journalsToMerge: {}, storageDir: {}, cause: {}", journalsToMerge, storageDir, e.toString());
+                            logger.error("", e);
                         }
                     }
                 };
 
                 // We are going to schedule the future to run immediately and then repeat every 10 seconds. This allows us to keep retrying if we
-                // fail for some reason. When we succeed, the Runnable will cancel itself.
-                future = rolloverExecutor.scheduleWithFixedDelay(rolloverRunnable, 0, 10, TimeUnit.SECONDS);
+                // fail for some reason. When we succeed or if retries are exceeded, the Runnable will cancel itself.
+                future = rolloverExecutor.scheduleWithFixedDelay(rolloverRunnable, 0, getRolloverRetryMillis(), TimeUnit.MILLISECONDS);
                 futureReference.set(future);
             }
 
@@ -1077,12 +1317,14 @@
 
             // check if we need to apply backpressure.
             // If we have too many journal files, or if the repo becomes too large, backpressure is necessary. Without it,
-            // if the rate at which provenance events are registered exceeds the rate at which we can compress/merge them,
-            // then eventually we will end up with all of the data stored in the 'journals' directory. This
+            // if the rate at which provenance events are registered exceeds the rate at which we can compress/merge/index them,
+            // then eventually we will end up with all of the data stored in the 'journals' directory and not yet indexed. This
             // would mean that the data would never even be accessible. In order to prevent this, if we exceeds 110% of the configured
             // max capacity for the repo, or if we have 5 sets of journal files waiting to be merged, we will block here until
             // that is no longer the case.
             if (journalFileCount > journalCountThreshold || repoSize > sizeThreshold) {
+                final long stopTheWorldStart = System.nanoTime();
+
                 logger.warn("The rate of the dataflow is exceeding the provenance recording rate. "
                         + "Slowing down flow to accommodate. Currently, there are {} journal files ({} bytes) and "
                         + "threshold for blocking is {} ({} bytes)", journalFileCount, repoSize, journalCountThreshold, sizeThreshold);
@@ -1124,8 +1366,12 @@
                     repoSize = getSize(getLogFiles(), 0L);
                 }
 
+                final long stopTheWorldNanos = System.nanoTime() - stopTheWorldStart;
+                backpressurePauseMillis.add(new TimestampedLong(stopTheWorldNanos));
+                final TimestampedLong pauseNanosLastFiveMinutes = backpressurePauseMillis.getAggregateValue(System.currentTimeMillis() - TimeUnit.MILLISECONDS.convert(5, TimeUnit.MINUTES));
                 logger.info("Provenance Repository has now caught up with rolling over journal files. Current number of "
-                        + "journal files to be rolled over is {}", journalFileCount);
+                                + "journal files to be rolled over is {}. Provenance Repository Back Pressure paused Session commits for {} ({} total in the last 5 minutes).",
+                        journalFileCount, FormatUtils.formatNanos(stopTheWorldNanos, true), FormatUtils.formatNanos(pauseNanosLastFiveMinutes.getValue(), true));
             }
 
             // we've finished rolling over successfully. Create new writers and reset state.
@@ -1136,7 +1382,6 @@
         }
     }
 
-
     // protected for use in unit tests
     protected Set<File> recoverJournalFiles() throws IOException {
         if (!configuration.isAllowRollover()) {
@@ -1146,8 +1391,8 @@
         final Map<String, List<File>> journalMap = new HashMap<>();
 
         // Map journals' basenames to the files with that basename.
-        final List<File> storageDirs = configuration.getStorageDirectories();
-        for (final File storageDir : configuration.getStorageDirectories()) {
+        final List<File> storageDirs = new ArrayList<>(configuration.getStorageDirectories().values());
+        for (final File storageDir : storageDirs) {
             final File journalDir = new File(storageDir, "journals");
             if (!journalDir.exists()) {
                 continue;
@@ -1163,7 +1408,7 @@
                     continue;
                 }
 
-                final String basename = StringUtils.substringBefore(journalFile.getName(), ".");
+                final String basename = LuceneUtil.substringBefore(journalFile.getName(), ".");
                 List<File> files = journalMap.get(basename);
                 if (files == null) {
                     files = new ArrayList<>();
@@ -1191,7 +1436,7 @@
         // verify that all Journal files have the same basename
         String canonicalBaseName = null;
         for (final File journal : journalFiles) {
-            final String basename =  StringUtils.substringBefore(journal.getName(), ".");
+            final String basename = LuceneUtil.substringBefore(journal.getName(), ".");
             if (canonicalBaseName == null) {
                 canonicalBaseName = basename;
             }
@@ -1205,24 +1450,40 @@
         return mergedFile;
     }
 
+    protected List<File> filterUnavailableFiles(final List<File> journalFiles) {
+        return journalFiles.stream().filter(file -> file.exists()).collect(Collectors.toList());
+    }
+
     /**
      * <p>
-     * Merges all of the given Journal Files into a single, merged Provenance Event Log File. As these records are merged, they will be compressed, if the repository is configured to compress records
-     * </p>
-     * <p>
-     * <p>
-     * If the repository is configured to compress the data, the file written to may not be the same as the <code>suggestedMergeFile</code>, as a filename extension of '.gz' may be appended. If the
-     * journals are successfully merged, the file that they were merged into will be returned. If unable to merge the records (for instance, because the repository has been closed or because the list
-     * of journal files was empty), this method will return <code>null</code>.
+     * Merges all of the given Journal Files into a single, merged Provenance
+     * Event Log File. As these records are merged, they will be compressed, if
+     * the repository is configured to compress records, and will be indexed.
      * </p>
      *
-     * @param journalFiles       the journal files to merge
+     * <p>
+     * If the repository is configured to compress the data, the file written to
+     * may not be the same as the <code>suggestedMergeFile</code>, as a filename
+     * extension of '.gz' may be appended. If the journals are successfully
+     * merged, the file that they were merged into will be returned. If unable
+     * to merge the records (for instance, because the repository has been
+     * closed or because the list of journal files was empty), this method will
+     * return <code>null</code>.
+     * </p>
+     *
+     * @param journalFiles the journal files to merge
      * @param suggestedMergeFile the file to write the merged records to
-     * @param eventReporter      the event reporter to report any warnings or errors to; may be null.
-     * @return the file that the given journals were merged into, or <code>null</code> if no records were merged.
-     * @throws IOException if a problem occurs writing to the mergedFile, reading from a journal
+     * @param eventReporter the event reporter to report any warnings or errors
+     * to; may be null.
+     *
+     * @return the file that the given journals were merged into, or
+     * <code>null</code> if no records were merged.
+     *
+     * @throws IOException if a problem occurs writing to the mergedFile,
+     * reading from a journal, or updating the Lucene Index.
      */
     File mergeJournals(final List<File> journalFiles, final File suggestedMergeFile, final EventReporter eventReporter) throws IOException {
+        logger.debug("Merging {} to {}", journalFiles, suggestedMergeFile);
         if (this.closed.get()) {
             logger.info("Provenance Repository has been closed; will not merge journal files to {}", suggestedMergeFile);
             return null;
@@ -1236,8 +1497,8 @@
         Collections.sort(journalFiles, new Comparator<File>() {
             @Override
             public int compare(final File o1, final File o2) {
-                final String suffix1 = StringUtils.substringAfterLast(o1.getName(), ".");
-                final String suffix2 = StringUtils.substringAfterLast(o2.getName(), ".");
+                final String suffix1 = LuceneUtil.substringAfterLast(o1.getName(), ".");
+                final String suffix2 = LuceneUtil.substringAfterLast(o2.getName(), ".");
 
                 try {
                     final int journalIndex1 = Integer.parseInt(suffix1);
@@ -1249,12 +1510,13 @@
             }
         });
 
-        final String firstJournalFile = journalFiles.get(0).getName();
-        final String firstFileSuffix = StringUtils.substringAfterLast(firstJournalFile, ".");
-        final boolean allPartialFiles = firstFileSuffix.equals("0");
+        // Search for any missing files. At this point they should have been written to disk otherwise cannot continue.
+        // Missing files is most likely due to incomplete cleanup of files post merge
+        final List<File> availableFiles = filterUnavailableFiles(journalFiles);
+        final int numAvailableFiles = availableFiles.size();
 
         // check if we have all of the "partial" files for the journal.
-        if (allPartialFiles) {
+        if (numAvailableFiles > 0) {
             if (suggestedMergeFile.exists()) {
                 // we have all "partial" files and there is already a merged file. Delete the data from the index
                 // because the merge file may not be fully merged. We will re-merge.
@@ -1276,16 +1538,7 @@
                 }
             }
         } else {
-            logger.warn("Cannot merge journal files {} because expected first file to end with extension '.0' "
-                    + "but it did not; assuming that the files were already merged but only some finished deletion "
-                    + "before restart. Deleting remaining partial journal files.", journalFiles);
-
-            for (final File file : journalFiles) {
-                if (!file.delete() && file.exists()) {
-                    logger.warn("Failed to delete unneeded journal file {}; this file should be cleaned up manually", file);
-                }
-            }
-
+            logger.warn("Cannot merge journal files {} because they do not exist on disk", journalFiles);
             return null;
         }
 
@@ -1299,7 +1552,7 @@
         final File writerFile = isCompress ? new File(suggestedMergeFile.getParentFile(), suggestedMergeFile.getName() + ".gz") : suggestedMergeFile;
 
         try {
-            for (final File journalFile : journalFiles) {
+            for (final File journalFile : availableFiles) {
                 try {
                     // Use MAX_VALUE for number of chars because we don't want to truncate the value as we write it
                     // out. This allows us to later decide that we want more characters and still be able to retrieve
@@ -1314,7 +1567,7 @@
                     }
 
                     if (eventReporter != null) {
-                        eventReporter.reportEvent(Severity.ERROR, EVENT_CATEGORY, "re " + ioe.toString());
+                        eventReporter.reportEvent(Severity.ERROR, EVENT_CATEGORY, "Failed to merge Journal Files due to " + ioe.toString());
                     }
                 }
             }
@@ -1337,16 +1590,19 @@
                 try {
                     record = reader.nextRecord();
                 } catch (final EOFException eof) {
+                    // record will be null and reader can no longer be used
                 } catch (final Exception e) {
-                    logger.warn("Failed to generate Provenance Event Record from Journal due to " + e + "; it's possible that the record wasn't "
-                            + "completely written to the file. This record will be skipped.");
+                    logger.warn("Failed to generate Provenance Event Record from Journal due to " + e + "; it's "
+                            + "possible that the record wasn't completely written to the file. This journal will be "
+                            + "skipped.");
                     if (logger.isDebugEnabled()) {
                         logger.warn("", e);
                     }
 
                     if (eventReporter != null) {
-                        eventReporter.reportEvent(Severity.WARNING, EVENT_CATEGORY, "Failed to read Provenance Event Record from Journal due to " + e +
-                                "; it's possible that hte record wasn't completely written to the file. This record will be skipped.");
+                        eventReporter.reportEvent(Severity.WARNING, EVENT_CATEGORY, "Failed to read Provenance Event "
+                                + "Record from Journal due to " + e + "; it's possible that the record wasn't "
+                                + "completely written to the file. This journal will be skipped.");
                     }
                 }
 
@@ -1365,37 +1621,181 @@
                 recordToReaderMap.put(record, reader);
             }
 
+            // We want to keep track of the last 1000 events in the files so that we can add them to 'ringBuffer'.
+            // However, we don't want to add them directly to ringBuffer, because once they are added to ringBuffer, they are
+            // available in query results. As a result, we can have the issue where we've not finished indexing the file
+            // but we try to create the lineage for events in that file. In order to avoid this, we will add the records
+            // to a temporary RingBuffer and after we finish merging the records will then copy the data to the
+            // ringBuffer provided as a method argument.
+            final RingBuffer<ProvenanceEventRecord> latestRecords = new RingBuffer<>(1000);
+
             // loop over each entry in the map, persisting the records to the merged file in order, and populating the map
             // with the next entry from the journal file from which the previous record was written.
-            try (final RecordWriter writer = RecordWriters.newSchemaRecordWriter(writerFile, configuration.isCompressOnRollover(), true)) {
+            try (final RecordWriter writer = RecordWriters.newSchemaRecordWriter(writerFile, idGenerator, configuration.isCompressOnRollover(), true)) {
                 writer.writeHeader(minEventId);
 
-                while (!recordToReaderMap.isEmpty()) {
-                    final Map.Entry<StandardProvenanceEventRecord, RecordReader> entry = recordToReaderMap.entrySet().iterator().next();
-                    final StandardProvenanceEventRecord record = entry.getKey();
-                    final RecordReader reader = entry.getValue();
+                final IndexingAction indexingAction = createIndexingAction();
 
-                    writer.writeRecord(record, record.getEventId());
-                    final int blockIndex = writer.getTocWriter().getCurrentBlockIndex();
+                final File indexingDirectory = indexConfig.getWritableIndexDirectory(writerFile, earliestTimestamp);
+                long maxId = 0L;
 
-                    records++;
+                final BlockingQueue<Tuple<StandardProvenanceEventRecord, Integer>> eventQueue = new LinkedBlockingQueue<>(100);
+                final AtomicBoolean finishedAdding = new AtomicBoolean(false);
+                final List<Future<?>> futures = new ArrayList<>();
 
-                    // Remove this entry from the map
-                    recordToReaderMap.remove(record);
+                final EventIndexWriter indexWriter = getIndexManager().borrowIndexWriter(indexingDirectory);
+                try {
+                    final ExecutorService exec = Executors.newFixedThreadPool(configuration.getIndexThreadPoolSize(), new ThreadFactory() {
+                        @Override
+                        public Thread newThread(final Runnable r) {
+                            final Thread t = Executors.defaultThreadFactory().newThread(r);
+                            t.setName("Index Provenance Events");
+                            return t;
+                        }
+                    });
 
-                    // Get the next entry from this reader and add it to the map
-                    StandardProvenanceEventRecord nextRecord = null;
-
+                    final AtomicInteger indexingFailureCount = new AtomicInteger(0);
                     try {
-                        nextRecord = reader.nextRecord();
-                    } catch (final EOFException eof) {
+                        for (int i = 0; i < configuration.getIndexThreadPoolSize(); i++) {
+                            final Callable<Object> callable = new Callable<Object>() {
+                                @Override
+                                public Object call() throws IOException {
+                                    while (!eventQueue.isEmpty() || !finishedAdding.get()) {
+                                        try {
+                                            final Tuple<StandardProvenanceEventRecord, Integer> tuple;
+                                            try {
+                                                tuple = eventQueue.poll(10, TimeUnit.MILLISECONDS);
+                                            } catch (final InterruptedException ie) {
+                                                Thread.currentThread().interrupt();
+                                                continue;
+                                            }
+
+                                            if (tuple == null) {
+                                                continue;
+                                            }
+
+                                            indexingAction.index(tuple.getKey(), indexWriter.getIndexWriter(), tuple.getValue());
+                                        } catch (final Throwable t) {
+                                            logger.error("Failed to index Provenance Event for " + writerFile + " to " + indexingDirectory, t);
+                                            if (indexingFailureCount.incrementAndGet() >= MAX_INDEXING_FAILURE_COUNT) {
+                                                return null;
+                                            }
+                                        }
+                                    }
+
+                                    return null;
+                                }
+                            };
+
+                            final Future<?> future = exec.submit(callable);
+                            futures.add(future);
+                        }
+
+                        boolean indexEvents = true;
+                        while (!recordToReaderMap.isEmpty()) {
+                            final Map.Entry<StandardProvenanceEventRecord, RecordReader> entry = recordToReaderMap.entrySet().iterator().next();
+                            final StandardProvenanceEventRecord record = entry.getKey();
+                            final RecordReader reader = entry.getValue();
+
+                            writer.writeRecord(record);
+                            final int blockIndex = writer.getTocWriter().getCurrentBlockIndex();
+
+                            boolean accepted = false;
+                            while (!accepted && indexEvents) {
+                                try {
+                                    accepted = eventQueue.offer(new Tuple<>(record, blockIndex), 10, TimeUnit.MILLISECONDS);
+                                } catch (final InterruptedException ie) {
+                                    Thread.currentThread().interrupt();
+                                }
+
+                                // If we weren't able to add anything to the queue, check if we have reached our max failure count.
+                                // We do this here because if we do reach our max failure count, all of the indexing threads will stop
+                                // performing their jobs. As a result, the queue will fill and we won't be able to add anything to it.
+                                // So, if the queue is filled, we will check if this is the case.
+                                if (!accepted && indexingFailureCount.get() >= MAX_INDEXING_FAILURE_COUNT) {
+                                    indexEvents = false;  // don't add anything else to the queue.
+                                    eventQueue.clear();
+
+                                    final String warning = String.format("Indexing Provenance Events for %s has failed %s times. This exceeds the maximum threshold of %s failures, "
+                                            + "so no more Provenance Events will be indexed for this Provenance file.", writerFile, indexingFailureCount.get(), MAX_INDEXING_FAILURE_COUNT);
+                                    logger.warn(warning);
+                                    if (eventReporter != null) {
+                                        eventReporter.reportEvent(Severity.WARNING, EVENT_CATEGORY, warning);
+                                    }
+                                }
+                            }
+
+                            maxId = record.getEventId();
+
+                            latestRecords.add(truncateAttributes(record));
+                            records++;
+
+                            // Remove this entry from the map
+                            recordToReaderMap.remove(record);
+
+                            // Get the next entry from this reader and add it to the map
+                            StandardProvenanceEventRecord nextRecord = null;
+
+                            try {
+                                nextRecord = reader.nextRecord();
+                            } catch (final EOFException eof) {
+                                // record will be null and reader can no longer be used
+                            } catch (final Exception e) {
+                                logger.warn("Failed to generate Provenance Event Record from Journal due to " + e
+                                        + "; it's possible that the record wasn't completely written to the file. "
+                                        + "The remainder of this journal will be skipped.");
+                                if (logger.isDebugEnabled()) {
+                                    logger.warn("", e);
+                                }
+
+                                if (eventReporter != null) {
+                                    eventReporter.reportEvent(Severity.WARNING, EVENT_CATEGORY, "Failed to read "
+                                            + "Provenance Event Record from Journal due to " + e + "; it's possible "
+                                            + "that the record wasn't completely written to the file. The remainder "
+                                            + "of this journal will be skipped.");
+                                }
+                            }
+
+                            if (nextRecord != null) {
+                                recordToReaderMap.put(nextRecord, reader);
+                            }
+                        }
+                    } finally {
+                        finishedAdding.set(true);
+                        exec.shutdown();
                     }
 
-                    if (nextRecord != null) {
-                        recordToReaderMap.put(nextRecord, reader);
+                    for (final Future<?> future : futures) {
+                        try {
+                            future.get();
+                        } catch (final ExecutionException ee) {
+                            final Throwable t = ee.getCause();
+                            if (t instanceof RuntimeException) {
+                                throw (RuntimeException) t;
+                            }
+
+                            throw new RuntimeException(t);
+                        } catch (final InterruptedException e) {
+                            Thread.currentThread().interrupt();
+                            throw new RuntimeException("Thread interrupted");
+                        }
                     }
+                } finally {
+                    getIndexManager().returnIndexWriter(indexWriter);
                 }
+
+                indexConfig.setMaxIdIndexed(maxId);
             }
+
+            // record should now be available in the repository. We can copy the values from latestRecords to ringBuffer.
+            final RingBuffer<ProvenanceEventRecord> latestRecordBuffer = this.latestRecords;
+            latestRecords.forEach(new ForEachEvaluator<ProvenanceEventRecord>() {
+                @Override
+                public boolean evaluate(final ProvenanceEventRecord event) {
+                    latestRecordBuffer.add(event);
+                    return true;
+                }
+            });
         } finally {
             for (final RecordReader reader : readers) {
                 try {
@@ -1406,23 +1806,23 @@
         }
 
         // Success. Remove all of the journal files, as they're no longer needed, now that they've been merged.
-        for (final File journalFile : journalFiles) {
+        for (final File journalFile : availableFiles) {
             if (!journalFile.delete() && journalFile.exists()) {
                 logger.warn("Failed to remove temporary journal file {}; this file should be cleaned up manually", journalFile.getAbsolutePath());
 
                 if (eventReporter != null) {
-                    eventReporter.reportEvent(Severity.WARNING, EVENT_CATEGORY, "Failed to remove temporary journal file " +
-                            journalFile.getAbsolutePath() + "; this file should be cleaned up manually");
+                    eventReporter.reportEvent(Severity.WARNING, EVENT_CATEGORY, "Failed to remove temporary journal file "
+                            + journalFile.getAbsolutePath() + "; this file should be cleaned up manually");
                 }
             }
 
-            final File tocFile = getTocFile(journalFile);
+            final File tocFile = TocUtil.getTocFile(journalFile);
             if (!tocFile.delete() && tocFile.exists()) {
                 logger.warn("Failed to remove temporary journal TOC file {}; this file should be cleaned up manually", tocFile.getAbsolutePath());
 
                 if (eventReporter != null) {
-                    eventReporter.reportEvent(Severity.WARNING, EVENT_CATEGORY, "Failed to remove temporary journal TOC file " +
-                            tocFile.getAbsolutePath() + "; this file should be cleaned up manually");
+                    eventReporter.reportEvent(Severity.WARNING, EVENT_CATEGORY, "Failed to remove temporary journal TOC file "
+                            + tocFile.getAbsolutePath() + "; this file should be cleaned up manually");
                 }
             }
         }
@@ -1434,23 +1834,42 @@
         } else {
             final long nanos = System.nanoTime() - startNanos;
             final long millis = TimeUnit.MILLISECONDS.convert(nanos, TimeUnit.NANOSECONDS);
-            logger.info("Successfully merged {} journal files ({} records) into single Provenance Log File {} in {} milliseconds", journalFiles.size(), records, suggestedMergeFile, millis);
+            logger.info("Successfully merged {} journal files ({} records) into single Provenance Log File {} in {} milliseconds", numAvailableFiles, records, suggestedMergeFile, millis);
         }
 
         return writerFile;
     }
 
+    /**
+     * This method is protected and exists for testing purposes. This allows
+     * unit tests to extend this class and override the createIndexingAction so
+     * that they can mock out the Indexing Action to throw Exceptions, count
+     * events indexed, etc.
+     */
+    protected IndexingAction createIndexingAction() {
+        return new IndexingAction(configuration.getSearchableFields(), configuration.getSearchableAttributes());
+    }
+
     private StandardProvenanceEventRecord truncateAttributes(final StandardProvenanceEventRecord original) {
         boolean requireTruncation = false;
 
-        for (final Map.Entry<String, String> entry : original.getAttributes().entrySet()) {
-            if (entry.getValue().length() > maxAttributeChars) {
+        for (final String updatedAttr : original.getUpdatedAttributes().values()) {
+            if (updatedAttr != null && updatedAttr.length() > maxAttributeChars) {
                 requireTruncation = true;
                 break;
             }
         }
 
         if (!requireTruncation) {
+            for (final String previousAttr : original.getPreviousAttributes().values()) {
+                if (previousAttr != null && previousAttr.length() > maxAttributeChars) {
+                    requireTruncation = true;
+                    break;
+                }
+            }
+        }
+
+        if (!requireTruncation) {
             return original;
         }
 
@@ -1464,15 +1883,47 @@
     private Map<String, String> truncateAttributes(final Map<String, String> original) {
         final Map<String, String> truncatedAttrs = new HashMap<>();
         for (final Map.Entry<String, String> entry : original.entrySet()) {
-            if (entry.getValue().length() > maxAttributeChars) {
-                truncatedAttrs.put(entry.getKey(), entry.getValue().substring(0, maxAttributeChars));
-            } else {
-                truncatedAttrs.put(entry.getKey(), entry.getValue());
-            }
+            String value = entry.getValue() != null && entry.getValue().length() > this.maxAttributeChars
+                    ? entry.getValue().substring(0, this.maxAttributeChars) : entry.getValue();
+            truncatedAttrs.put(entry.getKey(), value);
         }
         return truncatedAttrs;
     }
 
+    QueryResult queryEvents(final Query query, final NiFiUser user) throws IOException {
+        final QuerySubmission submission = submitQuery(query, user);
+        final QueryResult result = submission.getResult();
+        while (!result.isFinished()) {
+            try {
+                Thread.sleep(100L);
+            } catch (final InterruptedException ie) {
+                Thread.currentThread().interrupt();
+            }
+        }
+
+        if (result.getError() != null) {
+            throw new IOException(result.getError());
+        }
+        logger.info("{} got {} hits", query, result.getTotalHitCount());
+        return result;
+    }
+
+
+    @Override
+    public AsyncLineageSubmission submitExpandChildren(final long eventId, final NiFiUser user) {
+        throw new MethodNotSupportedException("Computation of lineage is not available for implementation " + this.getClass().getName());
+    }
+
+    @Override
+    public AsyncLineageSubmission submitExpandParents(final long eventId, final NiFiUser user) {
+        throw new MethodNotSupportedException("Computation of lineage is not available for implementation " + this.getClass().getName());
+    }
+
+    @Override
+    public AsyncLineageSubmission retrieveLineageSubmission(final String lineageIdentifier, final NiFiUser user) {
+        throw new MethodNotSupportedException("Computation of lineage is not available for implementation " + this.getClass().getName());
+    }
+
     @Override
     public ProvenanceEventRecord getEvent(final long id) throws IOException {
         final List<ProvenanceEventRecord> records = getEvents(id, 1);
@@ -1501,7 +1952,8 @@
     }
 
     /**
-     * @return a List of all Provenance Event Log Files, sorted in ascending order by the first Event ID in each file
+     * @return a List of all Provenance Event Log Files, sorted in ascending
+     * order by the first Event ID in each file
      */
     private List<File> getSortedLogFiles() {
         final List<Path> paths = new ArrayList<>(getAllLogFiles());
@@ -1519,8 +1971,14 @@
         return files;
     }
 
+    @Override
+    public ProvenanceEventRepository getProvenanceEventRepository() {
+        return this;
+    }
+
     /**
-     * Returns the Event ID of the first event in the given Provenance Event Log File.
+     * Returns the Event ID of the first event in the given Provenance Event Log
+     * File.
      *
      * @param logFile the log file from which to obtain the first Event ID
      * @return the ID of the first event in the given log file
@@ -1531,27 +1989,6 @@
         return Long.parseLong(name.substring(0, dotIndex));
     }
 
-    @Override
-    public Long getMaxEventId() {
-        idLock.lock();
-        try {
-            return this.maxId;
-        } finally {
-            idLock.unlock();
-        }
-    }
-
-    private void checkAndSetMaxEventId(long id) {
-        idLock.lock();
-        try {
-            if (maxId == null || id > maxId) {
-                maxId = id;
-            }
-        } finally {
-            idLock.unlock();
-        }
-    }
-
     public Collection<Path> getAllLogFiles() {
         final SortedMap<Long, Path> map = idToPathMap.get();
         return map == null ? new ArrayList<Path>() : map.values();
@@ -1574,22 +2011,56 @@
         }
     }
 
+    @Override
+    public Long getMaxEventId() {
+        return indexConfig.getMaxIdIndexed();
+    }
 
-    private static class NamedThreadFactory implements ThreadFactory {
+    private class GetMostRecentRunnable implements Runnable {
 
-        private final AtomicInteger counter = new AtomicInteger(0);
-        private final ThreadFactory defaultThreadFactory = Executors.defaultThreadFactory();
-        private final String namePrefix;
+        private final Query query;
+        private final AsyncQuerySubmission submission;
+        private final NiFiUser user;
 
-        public NamedThreadFactory(final String namePrefix) {
-            this.namePrefix = namePrefix;
+        public GetMostRecentRunnable(final Query query, final AsyncQuerySubmission submission, final NiFiUser user) {
+            this.query = query;
+            this.submission = submission;
+            this.user = user;
         }
 
         @Override
-        public Thread newThread(final Runnable r) {
-            final Thread thread = defaultThreadFactory.newThread(r);
-            thread.setName(namePrefix + "-" + counter.incrementAndGet());
-            return thread;
+        public void run() {
+            // get the max indexed event id
+            final Long maxEventId = indexConfig.getMaxIdIndexed();
+            if (maxEventId == null) {
+                submission.getResult().update(Collections.<ProvenanceEventRecord>emptyList(), 0);
+                return;
+            }
+
+            final int maxResults = query.getMaxResults();
+            final long startIndex = Math.max(maxEventId - query.getMaxResults(), 0L);
+
+            try {
+                Long minIndexedId = indexConfig.getMinIdIndexed();
+                if (minIndexedId == null) {
+                    minIndexedId = 0L;
+                }
+                final long totalNumDocs = maxEventId - minIndexedId;
+
+                final List<ProvenanceEventRecord> mostRecent = getEvents(startIndex, maxResults, user);
+                submission.getResult().update(mostRecent, totalNumDocs);
+            } catch (final IOException ioe) {
+                logger.error("Failed to retrieve records from Provenance Repository: " + ioe.toString());
+                if (logger.isDebugEnabled()) {
+                    logger.error("", ioe);
+                }
+
+                if (ioe.getMessage() == null) {
+                    submission.getResult().setError("Failed to retrieve records from Provenance Repository: " + ioe.toString());
+                } else {
+                    submission.getResult().setError("Failed to retrieve records from Provenance Repository: " + ioe.getMessage());
+                }
+            }
         }
     }
 
@@ -1627,6 +2098,11 @@
     }
 
     @Override
+    public ComputeLineageSubmission submitLineageComputation(String s, NiFiUser niFiUser) {
+        throw new MethodNotSupportedException("Computation of lineage is not available for implementation " + this.getClass().getName());
+    }
+
+    @Override
     public List<SearchableField> getSearchableFields() {
         throw new MethodNotSupportedException("Querying and indexing is not available for implementation " + this.getClass().getName());
     }
@@ -1637,37 +2113,12 @@
     }
 
     @Override
-    public AsyncLineageSubmission submitLineageComputation(final String flowFileUuid, NiFiUser niFiUser) {
-        throw new MethodNotSupportedException("Computation of lineage is not available for implementation " + this.getClass().getName());
-    }
-
-    @Override
     public ComputeLineageSubmission submitLineageComputation(long eventId, NiFiUser user) {
         throw new MethodNotSupportedException("Computation of lineage is not available for implementation " + this.getClass().getName());
     }
 
     @Override
-    public AsyncLineageSubmission submitExpandChildren(final long eventId, NiFiUser niFiUser) {
-        throw new MethodNotSupportedException("Computation of lineage is not available for implementation " + this.getClass().getName());
-    }
-
-    @Override
-    public AsyncLineageSubmission submitExpandParents(final long eventId, NiFiUser niFiUser) {
-        throw new MethodNotSupportedException("Computation of lineage is not available for implementation " + this.getClass().getName());
-    }
-
-    @Override
-    public AsyncLineageSubmission retrieveLineageSubmission(final String lineageIdentifier, NiFiUser niFiUser) {
-        throw new MethodNotSupportedException("Computation of lineage is not available for implementation " + this.getClass().getName());
-    }
-
-    @Override
     public ProvenanceEventRecord getEvent(final long id, final NiFiUser user) throws IOException {
         throw new MethodNotSupportedException("Cannot handle user authorization requests.");
     }
-
-    @Override
-    public ProvenanceEventRepository getProvenanceEventRepository() {
-        return this;
-    }
 }
diff --git a/minifi-nar-bundles/minifi-provenance-repository-bundle/minifi-persistent-provenance-repository/src/test/java/org/apache/nifi/provenance/MiNiFiPersistentProvenanceRepositoryTest.java b/minifi-nar-bundles/minifi-provenance-repository-bundle/minifi-persistent-provenance-repository/src/test/java/org/apache/nifi/provenance/MiNiFiPersistentProvenanceRepositoryTest.java
index 08d5fc6..780c691 100644
--- a/minifi-nar-bundles/minifi-provenance-repository-bundle/minifi-persistent-provenance-repository/src/test/java/org/apache/nifi/provenance/MiNiFiPersistentProvenanceRepositoryTest.java
+++ b/minifi-nar-bundles/minifi-provenance-repository-bundle/minifi-persistent-provenance-repository/src/test/java/org/apache/nifi/provenance/MiNiFiPersistentProvenanceRepositoryTest.java
@@ -16,39 +16,51 @@
  */
 package org.apache.nifi.provenance;
 
+import org.apache.lucene.index.IndexWriter;
 import org.apache.lucene.queryparser.classic.ParseException;
+import org.apache.nifi.authorization.user.NiFiUser;
 import org.apache.nifi.events.EventReporter;
 import org.apache.nifi.flowfile.FlowFile;
-import org.apache.nifi.provenance.MiNiFiPersistentProvenanceRepository.MethodNotSupportedException;
+import org.apache.nifi.provenance.lucene.IndexingAction;
 import org.apache.nifi.provenance.serialization.RecordReader;
 import org.apache.nifi.provenance.serialization.RecordReaders;
 import org.apache.nifi.provenance.serialization.RecordWriter;
+import org.apache.nifi.provenance.serialization.RecordWriters;
 import org.apache.nifi.reporting.Severity;
-import org.apache.nifi.stream.io.DataOutputStream;
+import org.apache.nifi.util.NiFiProperties;
 import org.apache.nifi.util.file.FileUtils;
 import org.junit.After;
 import org.junit.Assert;
 import org.junit.Before;
 import org.junit.BeforeClass;
+import org.junit.ClassRule;
 import org.junit.Rule;
 import org.junit.Test;
+import org.junit.rules.TemporaryFolder;
 import org.junit.rules.TestName;
 
 import java.io.File;
+import java.io.FileInputStream;
 import java.io.FileOutputStream;
 import java.io.IOException;
+import java.lang.reflect.Field;
+import java.lang.reflect.InvocationTargetException;
+import java.lang.reflect.Method;
 import java.util.ArrayList;
+import java.util.Arrays;
 import java.util.Collections;
 import java.util.HashMap;
+import java.util.HashSet;
 import java.util.List;
 import java.util.Map;
+import java.util.Set;
 import java.util.UUID;
 import java.util.concurrent.ExecutorService;
 import java.util.concurrent.Executors;
+import java.util.concurrent.Future;
 import java.util.concurrent.TimeUnit;
 import java.util.concurrent.atomic.AtomicInteger;
 import java.util.concurrent.atomic.AtomicLong;
-import java.util.zip.GZIPOutputStream;
 
 import static org.apache.nifi.provenance.TestUtil.createFlowFile;
 import static org.junit.Assert.assertEquals;
@@ -61,16 +73,23 @@
     @Rule
     public TestName name = new TestName();
 
+    @ClassRule
+    public static TemporaryFolder tempFolder = new TemporaryFolder();
+
     private MiNiFiPersistentProvenanceRepository repo;
-    private RepositoryConfiguration config;
+    private static RepositoryConfiguration config;
 
     public static final int DEFAULT_ROLLOVER_MILLIS = 2000;
     private EventReporter eventReporter;
     private List<ReportedEvent> reportedEvents = Collections.synchronizedList(new ArrayList<ReportedEvent>());
 
+    private static int headerSize;
+    private static int recordSize;
+    private static int recordSize2;
+
     private RepositoryConfiguration createConfiguration() {
         config = new RepositoryConfiguration();
-        config.addStorageDirectory(new File("target/storage/" + UUID.randomUUID().toString()));
+        config.addStorageDirectory("1", new File("target/storage/" + UUID.randomUUID().toString()));
         config.setCompressOnRollover(true);
         config.setMaxEventFileLife(2000L, TimeUnit.SECONDS);
         config.setCompressionBlockBytes(100);
@@ -82,6 +101,44 @@
         System.setProperty("org.slf4j.simpleLogger.log.org.apache.nifi.provenance", "DEBUG");
     }
 
+    @BeforeClass
+    public static void findJournalSizes() throws IOException {
+        // determine header and record size
+
+        final Map<String, String> attributes = new HashMap<>();
+        final ProvenanceEventBuilder builder = new StandardProvenanceEventRecord.Builder();
+        builder.setEventTime(System.currentTimeMillis());
+        builder.setEventType(ProvenanceEventType.RECEIVE);
+        builder.setTransitUri("nifi://unit-test");
+        attributes.put("uuid", "12345678-0000-0000-0000-012345678912");
+        builder.fromFlowFile(createFlowFile(3L, 3000L, attributes));
+        builder.setComponentId("1234");
+        builder.setComponentType("dummy processor");
+        final ProvenanceEventRecord record = builder.build();
+        builder.setComponentId("2345");
+        final ProvenanceEventRecord record2 = builder.build();
+
+        final File tempRecordFile = tempFolder.newFile("record.tmp");
+        System.out.println("findJournalSizes position 0 = " + tempRecordFile.length());
+
+        final AtomicLong idGenerator = new AtomicLong(0L);
+        final RecordWriter writer = RecordWriters.newSchemaRecordWriter(tempRecordFile, idGenerator, false, false);
+        writer.writeHeader(12345L);
+        writer.flush();
+        headerSize = Long.valueOf(tempRecordFile.length()).intValue();
+        writer.writeRecord(record);
+        writer.flush();
+        recordSize = Long.valueOf(tempRecordFile.length()).intValue() - headerSize;
+        writer.writeRecord(record2);
+        writer.flush();
+        recordSize2 = Long.valueOf(tempRecordFile.length()).intValue() - headerSize - recordSize;
+        writer.close();
+
+        System.out.println("headerSize =" + headerSize);
+        System.out.println("recordSize =" + recordSize);
+        System.out.println("recordSize2=" + recordSize2);
+    }
+
     @Before
     public void printTestName() {
         reportedEvents.clear();
@@ -98,31 +155,47 @@
 
     @After
     public void closeRepo() throws IOException {
-        if (repo != null) {
-            try {
-                repo.close();
-            } catch (final IOException ioe) {
-            }
+        if (repo == null) {
+            return;
+        }
+
+        try {
+            repo.close();
+        } catch (final IOException ioe) {
         }
 
         // Delete all of the storage files. We do this in order to clean up the tons of files that
         // we create but also to ensure that we have closed all of the file handles. If we leave any
         // streams open, for instance, this will throw an IOException, causing our unit test to fail.
-        for (final File storageDir : config.getStorageDirectories()) {
-            int i;
-            for (i = 0; i < 3; i++) {
-                try {
-                    FileUtils.deleteFile(storageDir, true);
-                    break;
-                } catch (final IOException ioe) {
-                    // if there is a virus scanner, etc. running in the background we may not be able to
-                    // delete the file. Wait a sec and try again.
-                    if (i == 2) {
-                        throw ioe;
-                    } else {
-                        try {
-                            Thread.sleep(1000L);
-                        } catch (final InterruptedException ie) {
+        if (config != null) {
+            for (final File storageDir : config.getStorageDirectories().values()) {
+                int i;
+                for (i = 0; i < 3; i++) {
+                    try {
+                        FileUtils.deleteFile(storageDir, true);
+                        break;
+                    } catch (final IOException ioe) {
+                        // if there is a virus scanner, etc. running in the background we may not be able to
+                        // delete the file. Wait a sec and try again.
+                        if (i == 2) {
+                            throw ioe;
+                        } else {
+                            try {
+                                System.out.println("file: " + storageDir.toString() + " exists=" + storageDir.exists());
+                                FileUtils.deleteFile(storageDir, true);
+                                break;
+                            } catch (final IOException ioe2) {
+                                // if there is a virus scanner, etc. running in the background we may not be able to
+                                // delete the file. Wait a sec and try again.
+                                if (i == 2) {
+                                    throw ioe2;
+                                } else {
+                                    try {
+                                        Thread.sleep(1000L);
+                                    } catch (final InterruptedException ie) {
+                                    }
+                                }
+                            }
                         }
                     }
                 }
@@ -136,6 +209,47 @@
         return eventReporter;
     }
 
+    private NiFiProperties properties = new NiFiProperties() {
+        @Override
+        public String getProperty(String key) {
+            if (key.equals(NiFiProperties.PROVENANCE_COMPRESS_ON_ROLLOVER)) {
+                return "true";
+            } else if (key.equals(NiFiProperties.PROVENANCE_ROLLOVER_TIME)) {
+                return "2000 millis";
+            } else if (key.equals(NiFiProperties.PROVENANCE_REPO_DIRECTORY_PREFIX + ".default")) {
+                createConfiguration();
+                return config.getStorageDirectories().values().iterator().next().getAbsolutePath();
+            } else {
+                return null;
+            }
+        }
+
+        @Override
+        public Set<String> getPropertyKeys() {
+            return new HashSet<>(Arrays.asList(
+                    NiFiProperties.PROVENANCE_COMPRESS_ON_ROLLOVER,
+                    NiFiProperties.PROVENANCE_ROLLOVER_TIME,
+                    NiFiProperties.PROVENANCE_REPO_DIRECTORY_PREFIX + ".default"));
+        }
+    };
+
+    @Test
+    public void constructorNoArgs() {
+        TestableMiNiFiPersistentProvenanceRepository tppr = new TestableMiNiFiPersistentProvenanceRepository();
+        assertEquals(0, tppr.getRolloverCheckMillis());
+    }
+
+    @Test
+    public void constructorNiFiProperties() throws IOException {
+        TestableMiNiFiPersistentProvenanceRepository tppr = new TestableMiNiFiPersistentProvenanceRepository(properties);
+        assertEquals(10000, tppr.getRolloverCheckMillis());
+    }
+
+    @Test
+    public void constructorConfig() throws IOException {
+        RepositoryConfiguration configuration = RepositoryConfiguration.create(properties);
+        new TestableMiNiFiPersistentProvenanceRepository(configuration, 20000);
+    }
 
     @Test
     public void testAddAndRecover() throws IOException, InterruptedException {
@@ -143,7 +257,7 @@
         config.setMaxEventFileCapacity(1L);
         config.setMaxEventFileLife(1, TimeUnit.SECONDS);
         repo = new MiNiFiPersistentProvenanceRepository(config, DEFAULT_ROLLOVER_MILLIS);
-        repo.initialize(getEventReporter(), null, null);
+        repo.initialize(getEventReporter(), null, null, IdentifierLookup.EMPTY);
 
         final Map<String, String> attributes = new HashMap<>();
         attributes.put("abc", "xyz");
@@ -163,19 +277,15 @@
             repo.registerEvent(record);
         }
 
-        Assert.assertEquals("Did not establish the correct, Max Event Id", 9, repo.getMaxEventId().intValue());
-
         Thread.sleep(1000L);
 
         repo.close();
         Thread.sleep(500L); // Give the repo time to shutdown (i.e., close all file handles, etc.)
 
         repo = new MiNiFiPersistentProvenanceRepository(config, DEFAULT_ROLLOVER_MILLIS);
-        repo.initialize(getEventReporter(), null, null);
+        repo.initialize(getEventReporter(), null, null, IdentifierLookup.EMPTY);
         final List<ProvenanceEventRecord> recoveredRecords = repo.getEvents(0L, 12);
 
-        Assert.assertEquals("Did not establish the correct, Max Event Id through recovery after reloading", 9, repo.getMaxEventId().intValue());
-
         assertEquals(10, recoveredRecords.size());
         for (int i = 0; i < 10; i++) {
             final ProvenanceEventRecord recovered = recoveredRecords.get(i);
@@ -186,14 +296,13 @@
         }
     }
 
-
     @Test
     public void testCompressOnRollover() throws IOException, InterruptedException, ParseException {
         final RepositoryConfiguration config = createConfiguration();
         config.setMaxEventFileLife(500, TimeUnit.MILLISECONDS);
         config.setCompressOnRollover(true);
         repo = new MiNiFiPersistentProvenanceRepository(config, DEFAULT_ROLLOVER_MILLIS);
-        repo.initialize(getEventReporter(), null, null);
+        repo.initialize(getEventReporter(), null, null, IdentifierLookup.EMPTY);
 
         final String uuid = "00000000-0000-0000-0000-000000000000";
         final Map<String, String> attributes = new HashMap<>();
@@ -216,39 +325,17 @@
         }
 
         repo.waitForRollover();
-        final File storageDir = config.getStorageDirectories().get(0);
+        final File storageDir = config.getStorageDirectories().values().iterator().next();
         final File compressedLogFile = new File(storageDir, "0.prov.gz");
         assertTrue(compressedLogFile.exists());
     }
 
-    @Test(expected = MethodNotSupportedException.class)
-    public void testLineageRequestNotSupported() throws IOException, InterruptedException, ParseException {
-        final RepositoryConfiguration config = createConfiguration();
-        config.setMaxRecordLife(3, TimeUnit.SECONDS);
-        config.setMaxStorageCapacity(1024L * 1024L);
-        config.setMaxEventFileLife(500, TimeUnit.MILLISECONDS);
-        config.setMaxEventFileCapacity(1024L * 1024L);
-        config.setSearchableFields(new ArrayList<>(SearchableFields.getStandardFields()));
-
-        repo = new MiNiFiPersistentProvenanceRepository(config, DEFAULT_ROLLOVER_MILLIS);
-        repo.initialize(getEventReporter(), null, null);
-
-        final String uuid = "00000000-0000-0000-0000-000000000001";
-        final Map<String, String> attributes = new HashMap<>();
-        attributes.put("abc", "xyz");
-        attributes.put("uuid", uuid);
-        attributes.put("filename", "file-" + uuid);
-
-        repo.submitLineageComputation(uuid, null);
-    }
-
-
     @Test
     public void testCorrectProvenanceEventIdOnRestore() throws IOException {
         final RepositoryConfiguration config = createConfiguration();
         config.setMaxEventFileLife(1, TimeUnit.SECONDS);
         repo = new MiNiFiPersistentProvenanceRepository(config, DEFAULT_ROLLOVER_MILLIS);
-        repo.initialize(getEventReporter(), null, null);
+        repo.initialize(getEventReporter(), null, null, IdentifierLookup.EMPTY);
 
         final String uuid = "00000000-0000-0000-0000-000000000000";
         final Map<String, String> attributes = new HashMap<>();
@@ -274,7 +361,7 @@
         repo.close();
 
         final MiNiFiPersistentProvenanceRepository secondRepo = new MiNiFiPersistentProvenanceRepository(config, DEFAULT_ROLLOVER_MILLIS);
-        secondRepo.initialize(getEventReporter(), null, null);
+        secondRepo.initialize(getEventReporter(), null, null, IdentifierLookup.EMPTY);
 
         try {
             final ProvenanceEventRecord event11 = builder.build();
@@ -283,45 +370,11 @@
             final ProvenanceEventRecord event11Retrieved = secondRepo.getEvent(10L);
             assertNotNull(event11Retrieved);
             assertEquals(10, event11Retrieved.getEventId());
-            Assert.assertEquals(10, secondRepo.getMaxEventId().intValue());
         } finally {
             secondRepo.close();
         }
     }
 
-    /**
-     * Here the event file is simply corrupted by virtue of not having any event
-     * records while having correct headers
-     */
-    @Test
-    public void testWithWithEventFileMissingRecord() throws Exception {
-        File eventFile = this.prepCorruptedEventFileTests();
-
-        DataOutputStream in = new DataOutputStream(new GZIPOutputStream(new FileOutputStream(eventFile)));
-        in.writeUTF("BlahBlah");
-        in.writeInt(4);
-        in.close();
-        assertTrue(eventFile.exists());
-
-        final List<ProvenanceEventRecord> events = repo.getEvents(0, 100);
-        assertEquals(10, events.size());
-    }
-
-    /**
-     * Here the event file is simply corrupted by virtue of being empty (0
-     * bytes)
-     */
-    @Test
-    public void testWithWithEventFileCorrupted() throws Exception {
-        File eventFile = this.prepCorruptedEventFileTests();
-
-        DataOutputStream in = new DataOutputStream(new GZIPOutputStream(new FileOutputStream(eventFile)));
-        in.close();
-
-        final List<ProvenanceEventRecord> events = repo.getEvents(0, 100);
-        assertEquals(10, events.size());
-    }
-
     private File prepCorruptedEventFileTests() throws Exception {
         RepositoryConfiguration config = createConfiguration();
         config.setMaxStorageCapacity(1024L * 1024L);
@@ -331,13 +384,13 @@
         config.setDesiredIndexSize(10);
 
         repo = new MiNiFiPersistentProvenanceRepository(config, DEFAULT_ROLLOVER_MILLIS);
-        repo.initialize(getEventReporter(), null, null);
+        repo.initialize(getEventReporter(), null, null, IdentifierLookup.EMPTY);
 
         String uuid = UUID.randomUUID().toString();
         for (int i = 0; i < 20; i++) {
             ProvenanceEventRecord record = repo.eventBuilder().fromFlowFile(mock(FlowFile.class))
-                .setEventType(ProvenanceEventType.CREATE).setComponentId("foo-" + i).setComponentType("myComponent")
-                .setFlowFileUUID(uuid).build();
+                    .setEventType(ProvenanceEventType.CREATE).setComponentId("foo-" + i).setComponentType("myComponent")
+                    .setFlowFileUUID(uuid).build();
             repo.registerEvent(record);
             if (i == 9) {
                 repo.waitForRollover();
@@ -345,7 +398,7 @@
             }
         }
         repo.waitForRollover();
-        File eventFile = new File(config.getStorageDirectories().get(0), "10.prov.gz");
+        File eventFile = new File(config.getStorageDirectories().values().iterator().next(), "10.prov.gz");
         assertTrue(eventFile.delete());
         return eventFile;
     }
@@ -364,7 +417,7 @@
                 return journalCountRef.get();
             }
         };
-        repo.initialize(getEventReporter(), null, null);
+        repo.initialize(getEventReporter(), null, null, IdentifierLookup.EMPTY);
 
         final Map<String, String> attributes = new HashMap<>();
         final ProvenanceEventBuilder builder = new StandardProvenanceEventRecord.Builder();
@@ -410,6 +463,30 @@
         builder.fromFlowFile(createFlowFile(15, 3000L, attributes));
         attributes.put("uuid", "00000000-0000-0000-0000-00000000000" + 15);
         repo.registerEvent(builder.build());
+
+        Thread.sleep(3000L);
+    }
+
+    private long checkJournalRecords(final File storageDir, final Boolean exact) throws IOException {
+        File[] storagefiles = storageDir.listFiles();
+        long counter = 0;
+        assertNotNull(storagefiles);
+        for (final File file : storagefiles) {
+            if (file.isFile()) {
+                try (RecordReader reader = RecordReaders.newRecordReader(file, null, 2048)) {
+                    ProvenanceEventRecord r;
+                    ProvenanceEventRecord last = null;
+                    while ((r = reader.nextRecord()) != null) {
+                        if (exact) {
+                            assertTrue(counter++ == r.getEventId());
+                        } else {
+                            assertTrue(counter++ <= r.getEventId());
+                        }
+                    }
+                }
+            }
+        }
+        return counter;
     }
 
     @Test
@@ -417,7 +494,7 @@
         final RepositoryConfiguration config = createConfiguration();
         config.setMaxEventFileLife(3, TimeUnit.SECONDS);
         repo = new MiNiFiPersistentProvenanceRepository(config, DEFAULT_ROLLOVER_MILLIS);
-        repo.initialize(getEventReporter(), null, null);
+        repo.initialize(getEventReporter(), null, null, IdentifierLookup.EMPTY);
 
         final Map<String, String> attributes = new HashMap<>();
 
@@ -444,7 +521,7 @@
 
         repo.waitForRollover();
 
-        final File storageDir = config.getStorageDirectories().get(0);
+        final File storageDir = config.getStorageDirectories().values().iterator().next();
         long counter = 0;
         for (final File file : storageDir.listFiles()) {
             if (file.isFile()) {
@@ -462,16 +539,231 @@
         assertEquals(10000, counter);
     }
 
+    private void corruptJournalFile(final File journalFile, final int position,
+                                    final String original, final String replacement) throws IOException {
+        final int journalLength = Long.valueOf(journalFile.length()).intValue();
+        final byte[] origBytes = original.getBytes();
+        final byte[] replBytes = replacement.getBytes();
+        FileInputStream journalIn = new FileInputStream(journalFile);
+        byte[] content = new byte[journalLength];
+        assertEquals(journalLength, journalIn.read(content, 0, journalLength));
+        journalIn.close();
+        assertEquals(original, new String(Arrays.copyOfRange(content, position, position + origBytes.length)));
+        System.arraycopy(replBytes, 0, content, position, replBytes.length);
+        FileOutputStream journalOut = new FileOutputStream(journalFile);
+        journalOut.write(content, 0, journalLength);
+        journalOut.flush();
+        journalOut.close();
+    }
+
+    @Test
+    public void testMergeJournalsBadFirstRecord() throws IOException, InterruptedException {
+        final RepositoryConfiguration config = createConfiguration();
+        config.setMaxEventFileLife(3, TimeUnit.SECONDS);
+        TestableMiNiFiPersistentProvenanceRepository testRepo = new TestableMiNiFiPersistentProvenanceRepository(config, DEFAULT_ROLLOVER_MILLIS);
+        testRepo.initialize(getEventReporter(), null, null, null);
+
+        final Map<String, String> attributes = new HashMap<>();
+
+        final ProvenanceEventBuilder builder = new StandardProvenanceEventRecord.Builder();
+        builder.setEventTime(System.currentTimeMillis());
+        builder.setEventType(ProvenanceEventType.RECEIVE);
+        builder.setTransitUri("nifi://unit-test");
+        attributes.put("uuid", "12345678-0000-0000-0000-012345678912");
+        builder.fromFlowFile(createFlowFile(3L, 3000L, attributes));
+        builder.setComponentId("1234");
+        builder.setComponentType("dummy processor");
+
+        final ProvenanceEventRecord record = builder.build();
+
+        final ExecutorService exec = Executors.newFixedThreadPool(10);
+        final List<Future> futures = new ArrayList<>();
+        for (int i = 0; i < 10000; i++) {
+            futures.add(exec.submit(new Runnable() {
+                @Override
+                public void run() {
+                    testRepo.registerEvent(record);
+                }
+            }));
+        }
+
+        // wait for writers to finish and then corrupt the first record of the first journal file
+        for (Future future : futures) {
+            while (!future.isDone()) {
+                Thread.sleep(10);
+            }
+        }
+        RecordWriter firstWriter = testRepo.getWriters()[0];
+        corruptJournalFile(firstWriter.getFile(), headerSize + 15,"RECEIVE", "BADTYPE");
+
+        testRepo.recoverJournalFiles();
+
+        final File storageDir = config.getStorageDirectories().values().iterator().next();
+        assertTrue(checkJournalRecords(storageDir, false) < 10000);
+    }
+
+    @Test
+    public void testMergeJournalsBadRecordAfterFirst() throws IOException, InterruptedException {
+        final RepositoryConfiguration config = createConfiguration();
+        config.setMaxEventFileLife(3, TimeUnit.SECONDS);
+        TestableMiNiFiPersistentProvenanceRepository testRepo = new TestableMiNiFiPersistentProvenanceRepository(config, DEFAULT_ROLLOVER_MILLIS);
+        testRepo.initialize(getEventReporter(), null, null, null);
+
+        final Map<String, String> attributes = new HashMap<>();
+
+        final ProvenanceEventBuilder builder = new StandardProvenanceEventRecord.Builder();
+        builder.setEventTime(System.currentTimeMillis());
+        builder.setEventType(ProvenanceEventType.RECEIVE);
+        builder.setTransitUri("nifi://unit-test");
+        attributes.put("uuid", "12345678-0000-0000-0000-012345678912");
+        builder.fromFlowFile(createFlowFile(3L, 3000L, attributes));
+        builder.setComponentId("1234");
+        builder.setComponentType("dummy processor");
+
+        final ProvenanceEventRecord record = builder.build();
+
+        final ExecutorService exec = Executors.newFixedThreadPool(10);
+        final List<Future<?>> futures = new ArrayList<>();
+        for (int i = 0; i < 10000; i++) {
+            futures.add(exec.submit(new Runnable() {
+                @Override
+                public void run() {
+                    testRepo.registerEvent(record);
+                }
+            }));
+        }
+
+        // corrupt the first record of the first journal file
+        for (Future<?> future : futures) {
+            while (!future.isDone()) {
+                Thread.sleep(10);
+            }
+        }
+        RecordWriter firstWriter = testRepo.getWriters()[0];
+        corruptJournalFile(firstWriter.getFile(), headerSize + 15 + recordSize, "RECEIVE", "BADTYPE");
+
+        testRepo.recoverJournalFiles();
+
+        final File storageDir = config.getStorageDirectories().values().iterator().next();
+        assertTrue(checkJournalRecords(storageDir, false) < 10000);
+    }
+
+    @Test
+    public void testMergeJournalsEmptyJournal() throws IOException, InterruptedException {
+        final RepositoryConfiguration config = createConfiguration();
+        config.setMaxEventFileLife(3, TimeUnit.SECONDS);
+        TestableMiNiFiPersistentProvenanceRepository testRepo = new TestableMiNiFiPersistentProvenanceRepository(config, DEFAULT_ROLLOVER_MILLIS);
+        testRepo.initialize(getEventReporter(), null, null, null);
+
+        final Map<String, String> attributes = new HashMap<>();
+
+        final ProvenanceEventBuilder builder = new StandardProvenanceEventRecord.Builder();
+        builder.setEventTime(System.currentTimeMillis());
+        builder.setEventType(ProvenanceEventType.RECEIVE);
+        builder.setTransitUri("nifi://unit-test");
+        attributes.put("uuid", "12345678-0000-0000-0000-012345678912");
+        builder.fromFlowFile(createFlowFile(3L, 3000L, attributes));
+        builder.setComponentId("1234");
+        builder.setComponentType("dummy processor");
+
+        final ProvenanceEventRecord record = builder.build();
+
+        final ExecutorService exec = Executors.newFixedThreadPool(10);
+        final List<Future> futures = new ArrayList<>();
+        for (int i = 0; i < config.getJournalCount() - 1; i++) {
+            futures.add(exec.submit(new Runnable() {
+                @Override
+                public void run() {
+                    testRepo.registerEvent(record);
+                }
+            }));
+        }
+
+        // wait for writers to finish and then corrupt the first record of the first journal file
+        for (Future future : futures) {
+            while (!future.isDone()) {
+                Thread.sleep(10);
+            }
+        }
+
+        testRepo.recoverJournalFiles();
+
+        assertEquals("mergeJournals() should not error on empty journal", 0, reportedEvents.size());
+
+        final File storageDir = config.getStorageDirectories().values().iterator().next();
+        assertEquals(config.getJournalCount() - 1, checkJournalRecords(storageDir, true));
+    }
+
+    @Test
+    public void testRolloverRetry() throws IOException, InterruptedException {
+        final AtomicInteger retryAmount = new AtomicInteger(0);
+        final RepositoryConfiguration config = createConfiguration();
+        config.setMaxEventFileLife(3, TimeUnit.SECONDS);
+
+        repo = new MiNiFiPersistentProvenanceRepository(config, DEFAULT_ROLLOVER_MILLIS){
+            @Override
+            File mergeJournals(List<File> journalFiles, File suggestedMergeFile, EventReporter eventReporter) throws IOException {
+                retryAmount.incrementAndGet();
+                return super.mergeJournals(journalFiles, suggestedMergeFile, eventReporter);
+            }
+
+            // Indicate that there are no files available.
+            @Override
+            protected List<File> filterUnavailableFiles(List<File> journalFiles) {
+                return Collections.emptyList();
+            }
+
+            @Override
+            protected long getRolloverRetryMillis() {
+                return 10L; // retry quickly.
+            }
+        };
+        repo.initialize(getEventReporter(), null, null, IdentifierLookup.EMPTY);
+
+        final Map<String, String> attributes = new HashMap<>();
+
+        final ProvenanceEventBuilder builder = new StandardProvenanceEventRecord.Builder();
+        builder.setEventTime(System.currentTimeMillis());
+        builder.setEventType(ProvenanceEventType.RECEIVE);
+        builder.setTransitUri("nifi://unit-test");
+        attributes.put("uuid", "12345678-0000-0000-0000-012345678912");
+        builder.fromFlowFile(createFlowFile(3L, 3000L, attributes));
+        builder.setComponentId("1234");
+        builder.setComponentType("dummy processor");
+
+        final ProvenanceEventRecord record = builder.build();
+
+        final ExecutorService exec = Executors.newFixedThreadPool(10);
+        for (int i = 0; i < 10000; i++) {
+            exec.submit(new Runnable() {
+                @Override
+                public void run() {
+                    repo.registerEvent(record);
+                }
+            });
+        }
+        exec.shutdown();
+        exec.awaitTermination(10, TimeUnit.SECONDS);
+
+        repo.waitForRollover();
+        assertEquals(5,retryAmount.get());
+    }
+
     @Test
     public void testTruncateAttributes() throws IOException, InterruptedException {
         final RepositoryConfiguration config = createConfiguration();
         config.setMaxAttributeChars(50);
         config.setMaxEventFileLife(3, TimeUnit.SECONDS);
         repo = new MiNiFiPersistentProvenanceRepository(config, DEFAULT_ROLLOVER_MILLIS);
-        repo.initialize(getEventReporter(), null, null);
+        repo.initialize(getEventReporter(), null, null, IdentifierLookup.EMPTY);
 
+        final String maxLengthChars = "12345678901234567890123456789012345678901234567890";
         final Map<String, String> attributes = new HashMap<>();
         attributes.put("75chars", "123456789012345678901234567890123456789012345678901234567890123456789012345");
+        attributes.put("51chars", "123456789012345678901234567890123456789012345678901");
+        attributes.put("50chars", "12345678901234567890123456789012345678901234567890");
+        attributes.put("49chars", "1234567890123456789012345678901234567890123456789");
+        attributes.put("nullChar", null);
 
         final ProvenanceEventBuilder builder = new StandardProvenanceEventRecord.Builder();
         builder.setEventTime(System.currentTimeMillis());
@@ -489,10 +781,65 @@
         final ProvenanceEventRecord retrieved = repo.getEvent(0L);
         assertNotNull(retrieved);
         assertEquals("12345678-0000-0000-0000-012345678912", retrieved.getAttributes().get("uuid"));
-        assertEquals("12345678901234567890123456789012345678901234567890", retrieved.getAttributes().get("75chars"));
+        assertEquals(maxLengthChars, retrieved.getAttributes().get("75chars"));
+        assertEquals(maxLengthChars, retrieved.getAttributes().get("51chars"));
+        assertEquals(maxLengthChars, retrieved.getAttributes().get("50chars"));
+        assertEquals(maxLengthChars.substring(0, 49), retrieved.getAttributes().get("49chars"));
     }
 
 
+    @Test(timeout = 15000)
+    public void testExceptionOnIndex() throws IOException {
+        final RepositoryConfiguration config = createConfiguration();
+        config.setMaxAttributeChars(50);
+        config.setMaxEventFileLife(3, TimeUnit.SECONDS);
+        config.setIndexThreadPoolSize(1);
+
+        final int numEventsToIndex = 10;
+
+        final AtomicInteger indexedEventCount = new AtomicInteger(0);
+        repo = new MiNiFiPersistentProvenanceRepository(config, DEFAULT_ROLLOVER_MILLIS) {
+            @Override
+            protected synchronized IndexingAction createIndexingAction() {
+                return new IndexingAction(config.getSearchableFields(), config.getSearchableAttributes()) {
+                    @Override
+                    public void index(StandardProvenanceEventRecord record, IndexWriter indexWriter, Integer blockIndex) throws IOException {
+                        final int count = indexedEventCount.incrementAndGet();
+                        if (count <= numEventsToIndex) {
+                            return;
+                        }
+
+                        throw new IOException("Unit Test - Intentional Exception");
+                    }
+                };
+            }
+        };
+        repo.initialize(getEventReporter(), null, null, IdentifierLookup.EMPTY);
+
+        final Map<String, String> attributes = new HashMap<>();
+        attributes.put("uuid", "12345678-0000-0000-0000-012345678912");
+
+        final ProvenanceEventBuilder builder = new StandardProvenanceEventRecord.Builder();
+        builder.setEventTime(System.currentTimeMillis());
+        builder.setEventType(ProvenanceEventType.RECEIVE);
+        builder.setTransitUri("nifi://unit-test");
+        builder.fromFlowFile(createFlowFile(3L, 3000L, attributes));
+        builder.setComponentId("1234");
+        builder.setComponentType("dummy processor");
+
+        for (int i=0; i < 1000; i++) {
+            final ProvenanceEventRecord record = builder.build();
+            repo.registerEvent(record);
+        }
+
+        repo.waitForRollover();
+
+        assertEquals(numEventsToIndex + MiNiFiPersistentProvenanceRepository.MAX_INDEXING_FAILURE_COUNT, indexedEventCount.get());
+        assertEquals(1, reportedEvents.size());
+        final ReportedEvent event = reportedEvents.get(0);
+        assertEquals(Severity.WARNING, event.getSeverity());
+    }
+
     @Test
     public void testFailureToCreateWriterDoesNotPreventSubsequentRollover() throws IOException, InterruptedException {
         final RepositoryConfiguration config = createConfiguration();
@@ -515,7 +862,7 @@
         };
 
         // initialize with our event reporter
-        repo.initialize(getEventReporter(), null, null);
+        repo.initialize(getEventReporter(), null, null, IdentifierLookup.EMPTY);
 
         // create some events in the journal files.
         final Map<String, String> attributes = new HashMap<>();
@@ -553,6 +900,7 @@
         assertEquals(0, reportedEvents.size());
     }
 
+
     private static class ReportedEvent {
         private final Severity severity;
         private final String category;
@@ -564,10 +912,12 @@
             this.message = message;
         }
 
+        @SuppressWarnings("unused")
         public String getCategory() {
             return category;
         }
 
+        @SuppressWarnings("unused")
         public String getMessage() {
             return message;
         }
@@ -576,4 +926,88 @@
             return severity;
         }
     }
+
+    private NiFiUser createUser() {
+        return new NiFiUser() {
+            @Override
+            public String getIdentity() {
+                return "unit-test";
+            }
+
+            @Override
+            public NiFiUser getChain() {
+                return null;
+            }
+
+            @Override
+            public boolean isAnonymous() {
+                return false;
+            }
+
+            @Override
+            public String getClientAddress() {
+                return null;
+            }
+
+        };
+    }
+
+    private static class TestableMiNiFiPersistentProvenanceRepository extends MiNiFiPersistentProvenanceRepository {
+
+        TestableMiNiFiPersistentProvenanceRepository() {
+            super();
+        }
+
+        TestableMiNiFiPersistentProvenanceRepository(final NiFiProperties nifiProperties) throws IOException {
+            super(nifiProperties);
+        }
+
+        TestableMiNiFiPersistentProvenanceRepository(final RepositoryConfiguration configuration, final int rolloverCheckMillis) throws IOException {
+            super(configuration, rolloverCheckMillis);
+        }
+
+        RecordWriter[] getWriters() {
+            Class klass = MiNiFiPersistentProvenanceRepository.class;
+            Field writersField;
+            RecordWriter[] writers = null;
+            try {
+                writersField = klass.getDeclaredField("writers");
+                writersField.setAccessible(true);
+                writers = (RecordWriter[]) writersField.get(this);
+            } catch (NoSuchFieldException | IllegalAccessException e) {
+                e.printStackTrace();
+            }
+            return writers;
+        }
+
+        int getRolloverCheckMillis() {
+            Class klass = MiNiFiPersistentProvenanceRepository.class;
+            java.lang.reflect.Field rolloverCheckMillisField;
+            int rolloverCheckMillis = -1;
+            try {
+                rolloverCheckMillisField = klass.getDeclaredField("rolloverCheckMillis");
+                rolloverCheckMillisField.setAccessible(true);
+                rolloverCheckMillis = (int) rolloverCheckMillisField.get(this);
+            } catch (NoSuchFieldException | IllegalAccessException e) {
+                e.printStackTrace();
+            }
+            return rolloverCheckMillis;
+        }
+
+    }
+
+    private RepositoryConfiguration createTestableRepositoryConfiguration(final NiFiProperties properties) {
+        Class klass = MiNiFiPersistentProvenanceRepository.class;
+        Method createRepositoryConfigurationMethod;
+        RepositoryConfiguration configuration = null;
+        try {
+            createRepositoryConfigurationMethod = klass.getDeclaredMethod("createRepositoryConfiguration", NiFiProperties.class);
+            createRepositoryConfigurationMethod.setAccessible(true);
+            configuration = (RepositoryConfiguration)createRepositoryConfigurationMethod.invoke(null, properties);
+        } catch (IllegalAccessException | NoSuchMethodException | InvocationTargetException e) {
+            e.printStackTrace();
+        }
+        return configuration;
+    }
+
 }
diff --git a/minifi-toolkit/minifi-toolkit-configuration/src/main/java/org/apache/nifi/minifi/toolkit/configuration/dto/RemoteProcessGroupSchemaFunction.java b/minifi-toolkit/minifi-toolkit-configuration/src/main/java/org/apache/nifi/minifi/toolkit/configuration/dto/RemoteProcessGroupSchemaFunction.java
index 444088b..e429e56 100644
--- a/minifi-toolkit/minifi-toolkit-configuration/src/main/java/org/apache/nifi/minifi/toolkit/configuration/dto/RemoteProcessGroupSchemaFunction.java
+++ b/minifi-toolkit/minifi-toolkit-configuration/src/main/java/org/apache/nifi/minifi/toolkit/configuration/dto/RemoteProcessGroupSchemaFunction.java
@@ -70,6 +70,7 @@
         map.put(RemoteProcessGroupSchema.PROXY_PORT_KEY, remoteProcessGroupDTO.getProxyPort());
         map.put(RemoteProcessGroupSchema.PROXY_USER_KEY, remoteProcessGroupDTO.getProxyUser());
         map.put(RemoteProcessGroupSchema.PROXY_PASSWORD_KEY, remoteProcessGroupDTO.getProxyPassword());
+        map.put(RemoteProcessGroupSchema.LOCAL_NETWORK_INTERFACE_KEY, remoteProcessGroupDTO.getLocalNetworkInterface());
         return new RemoteProcessGroupSchema(map);
     }
 }
diff --git a/minifi-toolkit/minifi-toolkit-configuration/src/test/java/org/apache/nifi/minifi/toolkit/configuration/ConfigMainTest.java b/minifi-toolkit/minifi-toolkit-configuration/src/test/java/org/apache/nifi/minifi/toolkit/configuration/ConfigMainTest.java
index 4d8bc66..9ae0e41 100644
--- a/minifi-toolkit/minifi-toolkit-configuration/src/test/java/org/apache/nifi/minifi/toolkit/configuration/ConfigMainTest.java
+++ b/minifi-toolkit/minifi-toolkit-configuration/src/test/java/org/apache/nifi/minifi/toolkit/configuration/ConfigMainTest.java
@@ -306,18 +306,19 @@
 
             ConfigSchema configSchemaFromCurrent = new ConfigSchema(yamlMap);
             ConfigSchema.getAllProcessGroups(configSchemaFromCurrent.getProcessGroupSchema()).stream().flatMap(p -> p.getRemoteProcessGroups().stream()).forEach(r -> {
-                clearProxyInfo(r);
+                clearV3orLaterProperties(r);
             });
 
             assertNoMapDifferences(configSchemaUpgradedFromV2.toMap(), configSchemaFromCurrent.toMap());
         }
     }
 
-    private void clearProxyInfo(RemoteProcessGroupSchema remoteProcessGroupSchema) {
+    private void clearV3orLaterProperties(RemoteProcessGroupSchema remoteProcessGroupSchema) {
         remoteProcessGroupSchema.setProxyHost(RemoteProcessGroupSchema.DEFAULT_PROXY_HOST);
         remoteProcessGroupSchema.setProxyPort(RemoteProcessGroupSchema.DEFAULT_PROXY_PORT);
         remoteProcessGroupSchema.setProxyUser(RemoteProcessGroupSchema.DEFAULT_PROXY_USER);
         remoteProcessGroupSchema.setProxyPassword(RemoteProcessGroupSchema.DEFAULT_PROXY_PASSWORD);
+        remoteProcessGroupSchema.setLocalNetworkInterface(RemoteProcessGroupSchema.DEFAULT_NETWORK_INTERFACE);
     }
 
     private void testV1YmlIfPresent(String name, Map<String, Object> yamlMap) throws IOException, SchemaLoaderException {
@@ -371,7 +372,7 @@
             }
 
             ConfigSchema.getAllProcessGroups(configSchemaFromCurrent.getProcessGroupSchema()).stream().flatMap(p -> p.getRemoteProcessGroups().stream()).forEach(r -> {
-                clearProxyInfo(r);
+                clearV3orLaterProperties(r);
                 r.setTransportProtocol(RemoteProcessGroupSchema.TransportProtocolOptions.RAW.name());
             });
             Map<String, Object> v1YamlMap = configSchemaUpgradedFromV1.toMap();
diff --git a/minifi-toolkit/minifi-toolkit-configuration/src/test/java/org/apache/nifi/minifi/toolkit/configuration/dto/RemoteProcessGroupSchemaTest.java b/minifi-toolkit/minifi-toolkit-configuration/src/test/java/org/apache/nifi/minifi/toolkit/configuration/dto/RemoteProcessGroupSchemaTest.java
index a07ccea..30dd5ae 100644
--- a/minifi-toolkit/minifi-toolkit-configuration/src/test/java/org/apache/nifi/minifi/toolkit/configuration/dto/RemoteProcessGroupSchemaTest.java
+++ b/minifi-toolkit/minifi-toolkit-configuration/src/test/java/org/apache/nifi/minifi/toolkit/configuration/dto/RemoteProcessGroupSchemaTest.java
@@ -45,6 +45,7 @@
     private String testTimeout = "11 s";
     private String testYieldPeriod = "22 s";
     private String transportProtocol = "HTTP";
+    private String localNetworkInterface = "eth0";
 
     public RemoteProcessGroupSchemaTest() {
         super(new RemoteProcessGroupSchemaFunction(new RemotePortSchemaFunction()), RemoteProcessGroupSchema::new);
@@ -137,6 +138,20 @@
         assertDtoAndMapConstructorAreSame(0);
     }
 
+    @Test
+    public void testNoLocalNetworkInterface() {
+        dto.setLocalNetworkInterface(null);
+        map.remove(RemoteProcessGroupSchema.LOCAL_NETWORK_INTERFACE_KEY);
+        assertDtoAndMapConstructorAreSame(0);
+    }
+
+    @Test
+    public void testLocalNetworkInterface() {
+        dto.setLocalNetworkInterface(localNetworkInterface);
+        map.put(RemoteProcessGroupSchema.LOCAL_NETWORK_INTERFACE_KEY, localNetworkInterface);
+        assertDtoAndMapConstructorAreSame(0);
+    }
+
     @Override
     public void assertSchemaEquals(RemoteProcessGroupSchema one, RemoteProcessGroupSchema two) {
         assertEquals(one.getName(), two.getName());
@@ -157,5 +172,6 @@
         assertEquals(one.getComment(), two.getComment());
         assertEquals(one.getTimeout(), two.getTimeout());
         assertEquals(one.getYieldPeriod(), two.getYieldPeriod());
+        assertEquals(one.getLocalNetworkInterface(), two.getLocalNetworkInterface());
     }
 }
diff --git a/minifi-toolkit/minifi-toolkit-configuration/src/test/resources/InvokeHttpMiNiFiTemplateTest.xml b/minifi-toolkit/minifi-toolkit-configuration/src/test/resources/InvokeHttpMiNiFiTemplateTest.xml
index 9f83883..0cc5021 100644
--- a/minifi-toolkit/minifi-toolkit-configuration/src/test/resources/InvokeHttpMiNiFiTemplateTest.xml
+++ b/minifi-toolkit/minifi-toolkit-configuration/src/test/resources/InvokeHttpMiNiFiTemplateTest.xml
@@ -643,6 +643,7 @@
       <targetUri>http://nifi:8080/nifi</targetUri>
       <transportProtocol>HTTP</transportProtocol>
       <yieldDuration>10 sec</yieldDuration>
+      <networkInterface></networkInterface>
     </remoteProcessGroups>
   </snippet>
   <timestamp>07/04/2016 21:37:51 UTC</timestamp>
diff --git a/minifi-toolkit/minifi-toolkit-configuration/src/test/resources/InvokeHttpMiNiFiTemplateTest.yml b/minifi-toolkit/minifi-toolkit-configuration/src/test/resources/InvokeHttpMiNiFiTemplateTest.yml
index a043428..b3d3e51 100644
--- a/minifi-toolkit/minifi-toolkit-configuration/src/test/resources/InvokeHttpMiNiFiTemplateTest.yml
+++ b/minifi-toolkit/minifi-toolkit-configuration/src/test/resources/InvokeHttpMiNiFiTemplateTest.yml
@@ -295,6 +295,7 @@
   proxy port: 8675
   proxy user: username
   proxy password: ''
+  local network interface: ''
   Input Ports:
   - name: response
     id: b23a4621-cf19-42e6-967c-ffd3716e6a24
diff --git a/minifi-toolkit/minifi-toolkit-configuration/src/test/resources/ProcessGroupsAndRemoteProcessGroups.xml b/minifi-toolkit/minifi-toolkit-configuration/src/test/resources/ProcessGroupsAndRemoteProcessGroups.xml
index d87c9b2..3c79777 100644
--- a/minifi-toolkit/minifi-toolkit-configuration/src/test/resources/ProcessGroupsAndRemoteProcessGroups.xml
+++ b/minifi-toolkit/minifi-toolkit-configuration/src/test/resources/ProcessGroupsAndRemoteProcessGroups.xml
@@ -364,6 +364,7 @@
               <proxyUser></proxyUser>
               <targetUri>http://localhost:9091/nifi</targetUri>
               <transportProtocol>RAW</transportProtocol>
+              <localNetworkInterface>en0</localNetworkInterface>
               <yieldDuration>10 sec</yieldDuration>
             </remoteProcessGroups>
           </contents>
@@ -440,6 +441,7 @@
           </contents>
           <proxyHost></proxyHost>
           <proxyUser></proxyUser>
+          <localNetworkInterface>en1</localNetworkInterface>
           <targetUri>http://localhost:9090/nifi</targetUri>
           <transportProtocol>HTTP</transportProtocol>
           <yieldDuration>10 sec</yieldDuration>
diff --git a/minifi-toolkit/minifi-toolkit-configuration/src/test/resources/ProcessGroupsAndRemoteProcessGroups.yml b/minifi-toolkit/minifi-toolkit-configuration/src/test/resources/ProcessGroupsAndRemoteProcessGroups.yml
index 2b020c8..64de77c 100644
--- a/minifi-toolkit/minifi-toolkit-configuration/src/test/resources/ProcessGroupsAndRemoteProcessGroups.yml
+++ b/minifi-toolkit/minifi-toolkit-configuration/src/test/resources/ProcessGroupsAndRemoteProcessGroups.yml
@@ -189,6 +189,7 @@
       proxy port: ''
       proxy user: ''
       proxy password: ''
+      local network interface: 'en0'
       Input Ports:
       - id: 21a39aba-0158-1000-a1a0-1b55bcddcd72
         name: input2
@@ -254,6 +255,7 @@
     proxy port: ''
     proxy user: ''
     proxy password: ''
+    local network interface: 'en1'
     Input Ports:
     - id: 21a2fb5e-0158-1000-3b5e-5a7d3aaee01b
       name: input
diff --git a/minifi-toolkit/minifi-toolkit-configuration/src/test/resources/SimpleRPGToLogAttributes.yml b/minifi-toolkit/minifi-toolkit-configuration/src/test/resources/SimpleRPGToLogAttributes.yml
index a0458cc..7aba870 100644
--- a/minifi-toolkit/minifi-toolkit-configuration/src/test/resources/SimpleRPGToLogAttributes.yml
+++ b/minifi-toolkit/minifi-toolkit-configuration/src/test/resources/SimpleRPGToLogAttributes.yml
@@ -101,6 +101,7 @@
   proxy port: ''
   proxy user: ''
   proxy password: ''
+  local network interface: ''
   Input Ports: []
   Output Ports:
   - id: 6b965db7-015a-1000-4c43-9d20931a1b9c
diff --git a/minifi-toolkit/minifi-toolkit-configuration/src/test/resources/SimpleTailFileToRPG.xml b/minifi-toolkit/minifi-toolkit-configuration/src/test/resources/SimpleTailFileToRPG.xml
index da88f2d..4e804be 100644
--- a/minifi-toolkit/minifi-toolkit-configuration/src/test/resources/SimpleTailFileToRPG.xml
+++ b/minifi-toolkit/minifi-toolkit-configuration/src/test/resources/SimpleTailFileToRPG.xml
@@ -207,6 +207,7 @@
       <targetUri>http://localhost:8080/nifi</targetUri>
       <transportProtocol>RAW</transportProtocol>
       <yieldDuration>10 sec</yieldDuration>
+      <networkInterface></networkInterface>
     </remoteProcessGroups>
   </snippet>
   <timestamp>11/29/2016 10:43:33 EST</timestamp>
diff --git a/minifi-toolkit/minifi-toolkit-configuration/src/test/resources/SimpleTailFileToRPG.yml b/minifi-toolkit/minifi-toolkit-configuration/src/test/resources/SimpleTailFileToRPG.yml
index 3ca73ca..9ea68d7 100644
--- a/minifi-toolkit/minifi-toolkit-configuration/src/test/resources/SimpleTailFileToRPG.yml
+++ b/minifi-toolkit/minifi-toolkit-configuration/src/test/resources/SimpleTailFileToRPG.yml
@@ -106,6 +106,7 @@
   proxy port: ''
   proxy user: ''
   proxy password: ''
+  local network interface: ''
   Input Ports:
   - id: aca664f8-0158-1000-a139-92485891d349
     name: test2
diff --git a/pom.xml b/pom.xml
index 7ea5a2c..65d3bc7 100644
--- a/pom.xml
+++ b/pom.xml
@@ -97,10 +97,10 @@
         <project.build.sourceEncoding>UTF-8</project.build.sourceEncoding>
         <project.reporting.outputEncoding>UTF-8</project.reporting.outputEncoding>
         <inceptionYear>2016</inceptionYear>
-        <org.slf4j.version>1.7.12</org.slf4j.version>
-        <org.apache.nifi.version>1.1.0</org.apache.nifi.version>
-        <logback.version>1.1.8</logback.version>
-        <jetty.version>9.3.9.v20160517</jetty.version>
+        <org.slf4j.version>1.7.25</org.slf4j.version>
+        <org.apache.nifi.version>1.2.0</org.apache.nifi.version>
+        <logback.version>1.2.3</logback.version>
+        <jetty.version>9.4.3.v20170317</jetty.version>
         <jersey.version>1.19</jersey.version>
         <yammer.metrics.version>2.2.0</yammer.metrics.version>
         <spring.version>4.2.4.RELEASE</spring.version>
@@ -563,6 +563,11 @@
                 <version>${org.apache.nifi.version}</version>
                 <type>nar</type>
             </dependency>
+            <dependency>
+                <groupId>org.apache.nifi</groupId>
+                <artifactId>nifi-distributed-cache-client-service-api</artifactId>
+                <version>${org.apache.nifi.version}</version>
+            </dependency>
 
             <!-- Manage provided dependencies in lib -->
             <dependency>
@@ -1301,7 +1306,7 @@
             <plugin>
                 <groupId>org.apache.nifi</groupId>
                 <artifactId>nifi-nar-maven-plugin</artifactId>
-                <version>1.1.0</version>
+                <version>1.2.0</version>
                 <extensions>true</extensions>
             </plugin>
             <plugin>