NIFI-9200: Free cache on heap after disabling AbstractCSVLookupService

This closes #5372.

Signed-off-by: Peter Turcsanyi <turcsanyi@apache.org>
diff --git a/nifi-nar-bundles/nifi-standard-services/nifi-lookup-services-bundle/nifi-lookup-services/src/main/java/org/apache/nifi/lookup/CSVRecordLookupService.java b/nifi-nar-bundles/nifi-standard-services/nifi-lookup-services-bundle/nifi-lookup-services/src/main/java/org/apache/nifi/lookup/CSVRecordLookupService.java
index 388ae68..d640fe9 100644
--- a/nifi-nar-bundles/nifi-standard-services/nifi-lookup-services-bundle/nifi-lookup-services/src/main/java/org/apache/nifi/lookup/CSVRecordLookupService.java
+++ b/nifi-nar-bundles/nifi-standard-services/nifi-lookup-services-bundle/nifi-lookup-services/src/main/java/org/apache/nifi/lookup/CSVRecordLookupService.java
@@ -21,6 +21,7 @@
 import org.apache.commons.lang3.StringUtils;
 import org.apache.nifi.annotation.documentation.CapabilityDescription;
 import org.apache.nifi.annotation.documentation.Tags;
+import org.apache.nifi.annotation.lifecycle.OnDisabled;
 import org.apache.nifi.annotation.lifecycle.OnEnabled;
 import org.apache.nifi.components.PropertyDescriptor;
 import org.apache.nifi.controller.ConfigurationContext;
@@ -138,7 +139,7 @@
             return Optional.empty();
         }
 
-        final String key = (String)coordinates.get(KEY);
+        final String key = (String) coordinates.get(KEY);
         if (StringUtils.isBlank(key)) {
             return Optional.empty();
         }
@@ -159,4 +160,14 @@
         return REQUIRED_KEYS;
     }
 
+    @OnDisabled
+    public void onDisabled() {
+        cache = null;
+    }
+
+    // VisibleForTesting
+    boolean isCaching() {
+        return cache != null;
+    }
+
 }
diff --git a/nifi-nar-bundles/nifi-standard-services/nifi-lookup-services-bundle/nifi-lookup-services/src/main/java/org/apache/nifi/lookup/SimpleCsvFileLookupService.java b/nifi-nar-bundles/nifi-standard-services/nifi-lookup-services-bundle/nifi-lookup-services/src/main/java/org/apache/nifi/lookup/SimpleCsvFileLookupService.java
index 3a9ace3..76185f5 100644
--- a/nifi-nar-bundles/nifi-standard-services/nifi-lookup-services-bundle/nifi-lookup-services/src/main/java/org/apache/nifi/lookup/SimpleCsvFileLookupService.java
+++ b/nifi-nar-bundles/nifi-standard-services/nifi-lookup-services-bundle/nifi-lookup-services/src/main/java/org/apache/nifi/lookup/SimpleCsvFileLookupService.java
@@ -16,6 +16,20 @@
  */
 package org.apache.nifi.lookup;
 
+import org.apache.commons.csv.CSVRecord;
+import org.apache.commons.lang3.StringUtils;
+import org.apache.nifi.annotation.documentation.CapabilityDescription;
+import org.apache.nifi.annotation.documentation.Tags;
+import org.apache.nifi.annotation.lifecycle.OnDisabled;
+import org.apache.nifi.annotation.lifecycle.OnEnabled;
+import org.apache.nifi.components.PropertyDescriptor;
+import org.apache.nifi.controller.ConfigurationContext;
+import org.apache.nifi.controller.ControllerServiceInitializationContext;
+import org.apache.nifi.expression.ExpressionLanguageScope;
+import org.apache.nifi.logging.ComponentLog;
+import org.apache.nifi.processor.util.StandardValidators;
+import org.apache.nifi.reporting.InitializationException;
+
 import java.io.FileInputStream;
 import java.io.IOException;
 import java.io.InputStream;
@@ -30,20 +44,6 @@
 import java.util.stream.Collectors;
 import java.util.stream.Stream;
 
-import org.apache.commons.csv.CSVRecord;
-import org.apache.commons.lang3.StringUtils;
-
-import org.apache.nifi.annotation.documentation.CapabilityDescription;
-import org.apache.nifi.annotation.documentation.Tags;
-import org.apache.nifi.annotation.lifecycle.OnEnabled;
-import org.apache.nifi.components.PropertyDescriptor;
-import org.apache.nifi.controller.ControllerServiceInitializationContext;
-import org.apache.nifi.expression.ExpressionLanguageScope;
-import org.apache.nifi.controller.ConfigurationContext;
-import org.apache.nifi.logging.ComponentLog;
-import org.apache.nifi.processor.util.StandardValidators;
-import org.apache.nifi.reporting.InitializationException;
-
 @Tags({"lookup", "cache", "enrich", "join", "csv", "reloadable", "key", "value"})
 @CapabilityDescription("A reloadable CSV file-based lookup service. The first line of the csv file is considered as " +
         "header.")
@@ -111,7 +111,7 @@
     }
 
     @OnEnabled
-    public void onEnabled(final ConfigurationContext context) throws  IOException, InitializationException {
+    public void onEnabled(final ConfigurationContext context) throws IOException, InitializationException {
         super.onEnabled(context);
         this.lookupValueColumn = context.getProperty(LOOKUP_VALUE_COLUMN).evaluateAttributeExpressions().getValue();
         try {
@@ -148,4 +148,13 @@
         return REQUIRED_KEYS;
     }
 
+    @OnDisabled
+    public void onDisabled() {
+        cache = null;
+    }
+
+    // VisibleForTesting
+    boolean isCaching() {
+        return cache != null;
+    }
 }
diff --git a/nifi-nar-bundles/nifi-standard-services/nifi-lookup-services-bundle/nifi-lookup-services/src/test/java/org/apache/nifi/lookup/TestCSVRecordLookupService.java b/nifi-nar-bundles/nifi-standard-services/nifi-lookup-services-bundle/nifi-lookup-services/src/test/java/org/apache/nifi/lookup/TestCSVRecordLookupService.java
index 4ef84c6..b95bf07 100644
--- a/nifi-nar-bundles/nifi-standard-services/nifi-lookup-services-bundle/nifi-lookup-services/src/test/java/org/apache/nifi/lookup/TestCSVRecordLookupService.java
+++ b/nifi-nar-bundles/nifi-standard-services/nifi-lookup-services-bundle/nifi-lookup-services/src/test/java/org/apache/nifi/lookup/TestCSVRecordLookupService.java
@@ -16,34 +16,34 @@
  */
 package org.apache.nifi.lookup;
 
-import java.io.IOException;
-import java.util.Collections;
-import java.util.Optional;
-
 import org.apache.nifi.csv.CSVUtils;
 import org.apache.nifi.reporting.InitializationException;
 import org.apache.nifi.serialization.record.Record;
 import org.apache.nifi.util.TestRunner;
 import org.apache.nifi.util.TestRunners;
-
 import org.junit.Test;
 
+import java.io.IOException;
+import java.util.Collections;
+import java.util.Optional;
+
 import static org.hamcrest.CoreMatchers.instanceOf;
 import static org.hamcrest.CoreMatchers.is;
-import static org.junit.Assert.assertTrue;
 import static org.junit.Assert.assertEquals;
+import static org.junit.Assert.assertFalse;
 import static org.junit.Assert.assertThat;
+import static org.junit.Assert.assertTrue;
 
 public class TestCSVRecordLookupService {
 
     private final static Optional<Record> EMPTY_RECORD = Optional.empty();
 
     @Test
-    public void testSimpleCsvFileLookupService() throws InitializationException, IOException, LookupFailureException {
+    public void testSimpleCsvRecordLookupService() throws InitializationException, IOException, LookupFailureException {
         final TestRunner runner = TestRunners.newTestRunner(TestProcessor.class);
         final CSVRecordLookupService service = new CSVRecordLookupService();
 
-        runner.addControllerService("csv-file-lookup-service", service);
+        runner.addControllerService("csv-record-lookup-service", service);
         runner.setProperty(service, CSVRecordLookupService.CSV_FILE, "src/test/resources/test.csv");
         runner.setProperty(service, CSVRecordLookupService.CSV_FORMAT, "RFC4180");
         runner.setProperty(service, CSVRecordLookupService.LOOKUP_KEY_COLUMN, "key");
@@ -53,7 +53,7 @@
         final CSVRecordLookupService lookupService =
                 (CSVRecordLookupService) runner.getProcessContext()
                         .getControllerServiceLookup()
-                        .getControllerService("csv-file-lookup-service");
+                        .getControllerService("csv-record-lookup-service");
 
         assertThat(lookupService, instanceOf(LookupService.class));
 
@@ -70,11 +70,11 @@
     }
 
     @Test
-    public void testSimpleCsvFileLookupServiceWithCharset() throws InitializationException, IOException, LookupFailureException {
+    public void testSimpleCsvRecordLookupServiceWithCharset() throws InitializationException, LookupFailureException {
         final TestRunner runner = TestRunners.newTestRunner(TestProcessor.class);
         final CSVRecordLookupService service = new CSVRecordLookupService();
 
-        runner.addControllerService("csv-file-lookup-service", service);
+        runner.addControllerService("csv-record-lookup-service", service);
         runner.setProperty(service, CSVRecordLookupService.CSV_FILE, "src/test/resources/test_Windows-31J.csv");
         runner.setProperty(service, CSVRecordLookupService.CSV_FORMAT, "RFC4180");
         runner.setProperty(service, CSVRecordLookupService.CHARSET, "Windows-31J");
@@ -111,5 +111,21 @@
         assertEquals("my_value with an escaped |.", my_key.get().getAsString("value"));
     }
 
+    @Test
+    public void testCacheIsClearedWhenDisableService() throws InitializationException {
+        final TestRunner runner = TestRunners.newTestRunner(TestProcessor.class);
+        final CSVRecordLookupService service = new CSVRecordLookupService();
+        runner.addControllerService("csv-record-lookup-service", service);
+        runner.setProperty(service, CSVRecordLookupService.CSV_FILE, "src/test/resources/test.csv");
+        runner.setProperty(service, CSVRecordLookupService.CSV_FORMAT, "RFC4180");
+        runner.setProperty(service, CSVRecordLookupService.LOOKUP_KEY_COLUMN, "key");
+        runner.enableControllerService(service);
+        runner.assertValid(service);
 
+        assertTrue(service.isCaching());
+
+        runner.disableControllerService(service);
+
+        assertFalse(service.isCaching());
+    }
 }
diff --git a/nifi-nar-bundles/nifi-standard-services/nifi-lookup-services-bundle/nifi-lookup-services/src/test/java/org/apache/nifi/lookup/TestSimpleCsvFileLookupService.java b/nifi-nar-bundles/nifi-standard-services/nifi-lookup-services-bundle/nifi-lookup-services/src/test/java/org/apache/nifi/lookup/TestSimpleCsvFileLookupService.java
index a7a90fe..18bc2a3 100644
--- a/nifi-nar-bundles/nifi-standard-services/nifi-lookup-services-bundle/nifi-lookup-services/src/test/java/org/apache/nifi/lookup/TestSimpleCsvFileLookupService.java
+++ b/nifi-nar-bundles/nifi-standard-services/nifi-lookup-services-bundle/nifi-lookup-services/src/test/java/org/apache/nifi/lookup/TestSimpleCsvFileLookupService.java
@@ -16,28 +16,29 @@
  */
 package org.apache.nifi.lookup;
 
-import java.io.IOException;
-import java.util.Collections;
-import java.util.Optional;
-
 import org.apache.nifi.csv.CSVUtils;
 import org.apache.nifi.reporting.InitializationException;
 import org.apache.nifi.util.TestRunner;
 import org.apache.nifi.util.TestRunners;
-
 import org.junit.Test;
 
+import java.io.IOException;
+import java.util.Collections;
+import java.util.Optional;
+
 import static org.hamcrest.CoreMatchers.instanceOf;
 import static org.hamcrest.CoreMatchers.is;
 import static org.junit.Assert.assertEquals;
+import static org.junit.Assert.assertFalse;
 import static org.junit.Assert.assertThat;
+import static org.junit.Assert.assertTrue;
 
 public class TestSimpleCsvFileLookupService {
 
     final static Optional<String> EMPTY_STRING = Optional.empty();
 
     @Test
-    public void testSimpleCsvFileLookupService() throws InitializationException, IOException, LookupFailureException {
+    public void testSimpleCsvFileLookupService() throws InitializationException, LookupFailureException {
         final TestRunner runner = TestRunners.newTestRunner(TestProcessor.class);
         final SimpleCsvFileLookupService service = new SimpleCsvFileLookupService();
 
@@ -50,9 +51,9 @@
         runner.assertValid(service);
 
         final SimpleCsvFileLookupService lookupService =
-            (SimpleCsvFileLookupService) runner.getProcessContext()
-                .getControllerServiceLookup()
-                .getControllerService("csv-file-lookup-service");
+                (SimpleCsvFileLookupService) runner.getProcessContext()
+                        .getControllerServiceLookup()
+                        .getControllerService("csv-file-lookup-service");
 
         assertThat(lookupService, instanceOf(LookupService.class));
 
@@ -106,4 +107,22 @@
         final Optional<String> value = service.lookup(Collections.singletonMap("key", "my_key"));
         assertEquals(Optional.of("my_value with an escaped |."), value);
     }
+
+    @Test
+    public void testCacheIsClearedWhenDisableService() throws InitializationException {
+        final TestRunner runner = TestRunners.newTestRunner(TestProcessor.class);
+        final CSVRecordLookupService service = new CSVRecordLookupService();
+        runner.addControllerService("csv-file-lookup-service", service);
+        runner.setProperty(service, CSVRecordLookupService.CSV_FILE, "src/test/resources/test.csv");
+        runner.setProperty(service, CSVRecordLookupService.CSV_FORMAT, "RFC4180");
+        runner.setProperty(service, CSVRecordLookupService.LOOKUP_KEY_COLUMN, "key");
+        runner.enableControllerService(service);
+        runner.assertValid(service);
+
+        assertTrue(service.isCaching());
+
+        runner.disableControllerService(service);
+
+        assertFalse(service.isCaching());
+    }
 }