NIFI-9200: Free cache on heap after disabling AbstractCSVLookupService
This closes #5372.
Signed-off-by: Peter Turcsanyi <turcsanyi@apache.org>
diff --git a/nifi-nar-bundles/nifi-standard-services/nifi-lookup-services-bundle/nifi-lookup-services/src/main/java/org/apache/nifi/lookup/CSVRecordLookupService.java b/nifi-nar-bundles/nifi-standard-services/nifi-lookup-services-bundle/nifi-lookup-services/src/main/java/org/apache/nifi/lookup/CSVRecordLookupService.java
index 388ae68..d640fe9 100644
--- a/nifi-nar-bundles/nifi-standard-services/nifi-lookup-services-bundle/nifi-lookup-services/src/main/java/org/apache/nifi/lookup/CSVRecordLookupService.java
+++ b/nifi-nar-bundles/nifi-standard-services/nifi-lookup-services-bundle/nifi-lookup-services/src/main/java/org/apache/nifi/lookup/CSVRecordLookupService.java
@@ -21,6 +21,7 @@
import org.apache.commons.lang3.StringUtils;
import org.apache.nifi.annotation.documentation.CapabilityDescription;
import org.apache.nifi.annotation.documentation.Tags;
+import org.apache.nifi.annotation.lifecycle.OnDisabled;
import org.apache.nifi.annotation.lifecycle.OnEnabled;
import org.apache.nifi.components.PropertyDescriptor;
import org.apache.nifi.controller.ConfigurationContext;
@@ -138,7 +139,7 @@
return Optional.empty();
}
- final String key = (String)coordinates.get(KEY);
+ final String key = (String) coordinates.get(KEY);
if (StringUtils.isBlank(key)) {
return Optional.empty();
}
@@ -159,4 +160,14 @@
return REQUIRED_KEYS;
}
+ @OnDisabled
+ public void onDisabled() {
+ cache = null;
+ }
+
+ // VisibleForTesting
+ boolean isCaching() {
+ return cache != null;
+ }
+
}
diff --git a/nifi-nar-bundles/nifi-standard-services/nifi-lookup-services-bundle/nifi-lookup-services/src/main/java/org/apache/nifi/lookup/SimpleCsvFileLookupService.java b/nifi-nar-bundles/nifi-standard-services/nifi-lookup-services-bundle/nifi-lookup-services/src/main/java/org/apache/nifi/lookup/SimpleCsvFileLookupService.java
index 3a9ace3..76185f5 100644
--- a/nifi-nar-bundles/nifi-standard-services/nifi-lookup-services-bundle/nifi-lookup-services/src/main/java/org/apache/nifi/lookup/SimpleCsvFileLookupService.java
+++ b/nifi-nar-bundles/nifi-standard-services/nifi-lookup-services-bundle/nifi-lookup-services/src/main/java/org/apache/nifi/lookup/SimpleCsvFileLookupService.java
@@ -16,6 +16,20 @@
*/
package org.apache.nifi.lookup;
+import org.apache.commons.csv.CSVRecord;
+import org.apache.commons.lang3.StringUtils;
+import org.apache.nifi.annotation.documentation.CapabilityDescription;
+import org.apache.nifi.annotation.documentation.Tags;
+import org.apache.nifi.annotation.lifecycle.OnDisabled;
+import org.apache.nifi.annotation.lifecycle.OnEnabled;
+import org.apache.nifi.components.PropertyDescriptor;
+import org.apache.nifi.controller.ConfigurationContext;
+import org.apache.nifi.controller.ControllerServiceInitializationContext;
+import org.apache.nifi.expression.ExpressionLanguageScope;
+import org.apache.nifi.logging.ComponentLog;
+import org.apache.nifi.processor.util.StandardValidators;
+import org.apache.nifi.reporting.InitializationException;
+
import java.io.FileInputStream;
import java.io.IOException;
import java.io.InputStream;
@@ -30,20 +44,6 @@
import java.util.stream.Collectors;
import java.util.stream.Stream;
-import org.apache.commons.csv.CSVRecord;
-import org.apache.commons.lang3.StringUtils;
-
-import org.apache.nifi.annotation.documentation.CapabilityDescription;
-import org.apache.nifi.annotation.documentation.Tags;
-import org.apache.nifi.annotation.lifecycle.OnEnabled;
-import org.apache.nifi.components.PropertyDescriptor;
-import org.apache.nifi.controller.ControllerServiceInitializationContext;
-import org.apache.nifi.expression.ExpressionLanguageScope;
-import org.apache.nifi.controller.ConfigurationContext;
-import org.apache.nifi.logging.ComponentLog;
-import org.apache.nifi.processor.util.StandardValidators;
-import org.apache.nifi.reporting.InitializationException;
-
@Tags({"lookup", "cache", "enrich", "join", "csv", "reloadable", "key", "value"})
@CapabilityDescription("A reloadable CSV file-based lookup service. The first line of the csv file is considered as " +
"header.")
@@ -111,7 +111,7 @@
}
@OnEnabled
- public void onEnabled(final ConfigurationContext context) throws IOException, InitializationException {
+ public void onEnabled(final ConfigurationContext context) throws IOException, InitializationException {
super.onEnabled(context);
this.lookupValueColumn = context.getProperty(LOOKUP_VALUE_COLUMN).evaluateAttributeExpressions().getValue();
try {
@@ -148,4 +148,13 @@
return REQUIRED_KEYS;
}
+ @OnDisabled
+ public void onDisabled() {
+ cache = null;
+ }
+
+ // VisibleForTesting
+ boolean isCaching() {
+ return cache != null;
+ }
}
diff --git a/nifi-nar-bundles/nifi-standard-services/nifi-lookup-services-bundle/nifi-lookup-services/src/test/java/org/apache/nifi/lookup/TestCSVRecordLookupService.java b/nifi-nar-bundles/nifi-standard-services/nifi-lookup-services-bundle/nifi-lookup-services/src/test/java/org/apache/nifi/lookup/TestCSVRecordLookupService.java
index 4ef84c6..b95bf07 100644
--- a/nifi-nar-bundles/nifi-standard-services/nifi-lookup-services-bundle/nifi-lookup-services/src/test/java/org/apache/nifi/lookup/TestCSVRecordLookupService.java
+++ b/nifi-nar-bundles/nifi-standard-services/nifi-lookup-services-bundle/nifi-lookup-services/src/test/java/org/apache/nifi/lookup/TestCSVRecordLookupService.java
@@ -16,34 +16,34 @@
*/
package org.apache.nifi.lookup;
-import java.io.IOException;
-import java.util.Collections;
-import java.util.Optional;
-
import org.apache.nifi.csv.CSVUtils;
import org.apache.nifi.reporting.InitializationException;
import org.apache.nifi.serialization.record.Record;
import org.apache.nifi.util.TestRunner;
import org.apache.nifi.util.TestRunners;
-
import org.junit.Test;
+import java.io.IOException;
+import java.util.Collections;
+import java.util.Optional;
+
import static org.hamcrest.CoreMatchers.instanceOf;
import static org.hamcrest.CoreMatchers.is;
-import static org.junit.Assert.assertTrue;
import static org.junit.Assert.assertEquals;
+import static org.junit.Assert.assertFalse;
import static org.junit.Assert.assertThat;
+import static org.junit.Assert.assertTrue;
public class TestCSVRecordLookupService {
private final static Optional<Record> EMPTY_RECORD = Optional.empty();
@Test
- public void testSimpleCsvFileLookupService() throws InitializationException, IOException, LookupFailureException {
+ public void testSimpleCsvRecordLookupService() throws InitializationException, IOException, LookupFailureException {
final TestRunner runner = TestRunners.newTestRunner(TestProcessor.class);
final CSVRecordLookupService service = new CSVRecordLookupService();
- runner.addControllerService("csv-file-lookup-service", service);
+ runner.addControllerService("csv-record-lookup-service", service);
runner.setProperty(service, CSVRecordLookupService.CSV_FILE, "src/test/resources/test.csv");
runner.setProperty(service, CSVRecordLookupService.CSV_FORMAT, "RFC4180");
runner.setProperty(service, CSVRecordLookupService.LOOKUP_KEY_COLUMN, "key");
@@ -53,7 +53,7 @@
final CSVRecordLookupService lookupService =
(CSVRecordLookupService) runner.getProcessContext()
.getControllerServiceLookup()
- .getControllerService("csv-file-lookup-service");
+ .getControllerService("csv-record-lookup-service");
assertThat(lookupService, instanceOf(LookupService.class));
@@ -70,11 +70,11 @@
}
@Test
- public void testSimpleCsvFileLookupServiceWithCharset() throws InitializationException, IOException, LookupFailureException {
+ public void testSimpleCsvRecordLookupServiceWithCharset() throws InitializationException, LookupFailureException {
final TestRunner runner = TestRunners.newTestRunner(TestProcessor.class);
final CSVRecordLookupService service = new CSVRecordLookupService();
- runner.addControllerService("csv-file-lookup-service", service);
+ runner.addControllerService("csv-record-lookup-service", service);
runner.setProperty(service, CSVRecordLookupService.CSV_FILE, "src/test/resources/test_Windows-31J.csv");
runner.setProperty(service, CSVRecordLookupService.CSV_FORMAT, "RFC4180");
runner.setProperty(service, CSVRecordLookupService.CHARSET, "Windows-31J");
@@ -111,5 +111,21 @@
assertEquals("my_value with an escaped |.", my_key.get().getAsString("value"));
}
+ @Test
+ public void testCacheIsClearedWhenDisableService() throws InitializationException {
+ final TestRunner runner = TestRunners.newTestRunner(TestProcessor.class);
+ final CSVRecordLookupService service = new CSVRecordLookupService();
+ runner.addControllerService("csv-record-lookup-service", service);
+ runner.setProperty(service, CSVRecordLookupService.CSV_FILE, "src/test/resources/test.csv");
+ runner.setProperty(service, CSVRecordLookupService.CSV_FORMAT, "RFC4180");
+ runner.setProperty(service, CSVRecordLookupService.LOOKUP_KEY_COLUMN, "key");
+ runner.enableControllerService(service);
+ runner.assertValid(service);
+ assertTrue(service.isCaching());
+
+ runner.disableControllerService(service);
+
+ assertFalse(service.isCaching());
+ }
}
diff --git a/nifi-nar-bundles/nifi-standard-services/nifi-lookup-services-bundle/nifi-lookup-services/src/test/java/org/apache/nifi/lookup/TestSimpleCsvFileLookupService.java b/nifi-nar-bundles/nifi-standard-services/nifi-lookup-services-bundle/nifi-lookup-services/src/test/java/org/apache/nifi/lookup/TestSimpleCsvFileLookupService.java
index a7a90fe..18bc2a3 100644
--- a/nifi-nar-bundles/nifi-standard-services/nifi-lookup-services-bundle/nifi-lookup-services/src/test/java/org/apache/nifi/lookup/TestSimpleCsvFileLookupService.java
+++ b/nifi-nar-bundles/nifi-standard-services/nifi-lookup-services-bundle/nifi-lookup-services/src/test/java/org/apache/nifi/lookup/TestSimpleCsvFileLookupService.java
@@ -16,28 +16,29 @@
*/
package org.apache.nifi.lookup;
-import java.io.IOException;
-import java.util.Collections;
-import java.util.Optional;
-
import org.apache.nifi.csv.CSVUtils;
import org.apache.nifi.reporting.InitializationException;
import org.apache.nifi.util.TestRunner;
import org.apache.nifi.util.TestRunners;
-
import org.junit.Test;
+import java.io.IOException;
+import java.util.Collections;
+import java.util.Optional;
+
import static org.hamcrest.CoreMatchers.instanceOf;
import static org.hamcrest.CoreMatchers.is;
import static org.junit.Assert.assertEquals;
+import static org.junit.Assert.assertFalse;
import static org.junit.Assert.assertThat;
+import static org.junit.Assert.assertTrue;
public class TestSimpleCsvFileLookupService {
final static Optional<String> EMPTY_STRING = Optional.empty();
@Test
- public void testSimpleCsvFileLookupService() throws InitializationException, IOException, LookupFailureException {
+ public void testSimpleCsvFileLookupService() throws InitializationException, LookupFailureException {
final TestRunner runner = TestRunners.newTestRunner(TestProcessor.class);
final SimpleCsvFileLookupService service = new SimpleCsvFileLookupService();
@@ -50,9 +51,9 @@
runner.assertValid(service);
final SimpleCsvFileLookupService lookupService =
- (SimpleCsvFileLookupService) runner.getProcessContext()
- .getControllerServiceLookup()
- .getControllerService("csv-file-lookup-service");
+ (SimpleCsvFileLookupService) runner.getProcessContext()
+ .getControllerServiceLookup()
+ .getControllerService("csv-file-lookup-service");
assertThat(lookupService, instanceOf(LookupService.class));
@@ -106,4 +107,22 @@
final Optional<String> value = service.lookup(Collections.singletonMap("key", "my_key"));
assertEquals(Optional.of("my_value with an escaped |."), value);
}
+
+ @Test
+ public void testCacheIsClearedWhenDisableService() throws InitializationException {
+ final TestRunner runner = TestRunners.newTestRunner(TestProcessor.class);
+ final CSVRecordLookupService service = new CSVRecordLookupService();
+ runner.addControllerService("csv-file-lookup-service", service);
+ runner.setProperty(service, CSVRecordLookupService.CSV_FILE, "src/test/resources/test.csv");
+ runner.setProperty(service, CSVRecordLookupService.CSV_FORMAT, "RFC4180");
+ runner.setProperty(service, CSVRecordLookupService.LOOKUP_KEY_COLUMN, "key");
+ runner.enableControllerService(service);
+ runner.assertValid(service);
+
+ assertTrue(service.isCaching());
+
+ runner.disableControllerService(service);
+
+ assertFalse(service.isCaching());
+ }
}