NIFI-1420 Fixing minor bugs in GetSplunk
- Adding a Time Zone property so the Managed time ranges use the provided time zone when formatting the date strings
- Adding a Time Field Strategy property to choose between searching event time or index time
- Making the next iteration use previousLastTime + 1 ms to avoid overlap
- Fixing bug where GetSplunk incorrectly cleared state on a restart of NiFi
- This closes #299
diff --git a/nifi-nar-bundles/nifi-splunk-bundle/nifi-splunk-processors/src/main/java/org/apache/nifi/processors/splunk/GetSplunk.java b/nifi-nar-bundles/nifi-splunk-bundle/nifi-splunk-processors/src/main/java/org/apache/nifi/processors/splunk/GetSplunk.java
index b9d9e0b..4919e61 100644
--- a/nifi-nar-bundles/nifi-splunk-bundle/nifi-splunk-processors/src/main/java/org/apache/nifi/processors/splunk/GetSplunk.java
+++ b/nifi-nar-bundles/nifi-splunk-bundle/nifi-splunk-processors/src/main/java/org/apache/nifi/processors/splunk/GetSplunk.java
@@ -54,6 +54,7 @@
 import java.io.IOException;
 import java.io.InputStream;
 import java.io.OutputStream;
+import java.text.ParseException;
 import java.text.SimpleDateFormat;
 import java.util.ArrayList;
 import java.util.Collection;
@@ -64,6 +65,7 @@
 import java.util.List;
 import java.util.Map;
 import java.util.Set;
+import java.util.TimeZone;
 import java.util.concurrent.atomic.AtomicBoolean;
 
 @TriggerSerially
@@ -113,6 +115,19 @@
             .required(true)
             .build();
 
+    public static final AllowableValue EVENT_TIME_VALUE = new AllowableValue("Event Time", "Event Time",
+            "Search based on the time of the event which may be different than when the event was indexed.");
+    public static final AllowableValue INDEX_TIME_VALUE = new AllowableValue("Index Time", "Index Time",
+            "Search based on the time the event was indexed in Splunk.");
+
+    public static final PropertyDescriptor TIME_FIELD_STRATEGY = new PropertyDescriptor.Builder()
+            .name("Time Field Strategy")
+            .description("Indicates whether to search by the time attached to the event, or by the time the event was indexed in Splunk.")
+            .allowableValues(EVENT_TIME_VALUE, INDEX_TIME_VALUE)
+            .defaultValue(EVENT_TIME_VALUE.getValue())
+            .required(true)
+            .build();
+
     public static final AllowableValue MANAGED_BEGINNING_VALUE = new AllowableValue("Managed from Beginning", "Managed from Beginning",
             "The processor will manage the date ranges of the query starting from the beginning of time.");
     public static final AllowableValue MANAGED_CURRENT_VALUE = new AllowableValue("Managed from Current", "Managed from Current",
@@ -147,6 +162,13 @@
             .addValidator(StandardValidators.NON_EMPTY_VALIDATOR)
             .required(false)
             .build();
+    public static final PropertyDescriptor TIME_ZONE = new PropertyDescriptor.Builder()
+            .name("Time Zone")
+            .description("The Time Zone to use for formatting dates when performing a search. Only used with Managed time strategies.")
+            .allowableValues(TimeZone.getAvailableIDs())
+            .defaultValue("UTC")
+            .required(true)
+            .build();
     public static final PropertyDescriptor APP = new PropertyDescriptor.Builder()
             .name("Application")
             .description("The Splunk Application to query.")
@@ -213,7 +235,7 @@
             .description("Results retrieved from Splunk are sent out this relationship.")
             .build();
 
-    public static final String DATE_TIME_FORMAT = "yyyy-MM-dd'T'HH:mm:ss.SSS'Z'";
+    public static final String DATE_TIME_FORMAT = "yyyy-MM-dd'T'HH:mm:ss.SSSZ";
     public static final String EARLIEST_TIME_KEY = "earliestTime";
     public static final String LATEST_TIME_KEY = "latestTime";
 
@@ -236,9 +258,11 @@
         descriptors.add(HOSTNAME);
         descriptors.add(PORT);
         descriptors.add(QUERY);
+        descriptors.add(TIME_FIELD_STRATEGY);
         descriptors.add(TIME_RANGE_STRATEGY);
         descriptors.add(EARLIEST_TIME);
         descriptors.add(LATEST_TIME);
+        descriptors.add(TIME_ZONE);
         descriptors.add(APP);
         descriptors.add(OWNER);
         descriptors.add(TOKEN);
@@ -290,13 +314,16 @@
 
     @Override
     public void onPropertyModified(PropertyDescriptor descriptor, String oldValue, String newValue) {
-        if ( ((oldValue != null && !oldValue.equals(newValue)) || (oldValue == null && newValue != null))
+        if ( ((oldValue != null && !oldValue.equals(newValue)))
                 && (descriptor.equals(QUERY)
+                || descriptor.equals(TIME_FIELD_STRATEGY)
                 || descriptor.equals(TIME_RANGE_STRATEGY)
                 || descriptor.equals(EARLIEST_TIME)
                 || descriptor.equals(LATEST_TIME)
                 || descriptor.equals(HOSTNAME))
                 ) {
+            getLogger().debug("A property that require resetting state was modified - {} oldValue {} newValue {}",
+                    new Object[] {descriptor.getDisplayName(), oldValue, newValue});
             resetState = true;
         }
     }
@@ -311,6 +338,7 @@
         // if properties changed since last execution then remove any previous state
         if (resetState) {
             try {
+                getLogger().debug("Clearing state based on property modifications");
                 context.getStateManager().clear(Scope.CLUSTER);
             } catch (final IOException ioe) {
                 getLogger().warn("Failed to clear state", ioe);
@@ -351,6 +379,8 @@
         final String query = context.getProperty(QUERY).getValue();
         final String outputMode = context.getProperty(OUTPUT_MODE).getValue();
         final String timeRangeStrategy = context.getProperty(TIME_RANGE_STRATEGY).getValue();
+        final String timeZone = context.getProperty(TIME_ZONE).getValue();
+        final String timeFieldStrategy = context.getProperty(TIME_FIELD_STRATEGY).getValue();
 
         final JobExportArgs exportArgs = new JobExportArgs();
         exportArgs.setSearchMode(JobExportArgs.SearchMode.NORMAL);
@@ -368,6 +398,7 @@
                 // not provided so we need to check the previous state
                 final TimeRange previousRange = loadState(context.getStateManager());
                 final SimpleDateFormat dateFormat = new SimpleDateFormat(DATE_TIME_FORMAT);
+                dateFormat.setTimeZone(TimeZone.getTimeZone(timeZone));
 
                 if (previousRange == null) {
                     // no previous state so set the earliest time based on the strategy
@@ -386,9 +417,16 @@
                     }
 
                 } else {
-                    // we have previous state so set earliestTime to latestTime of last range
-                    earliestTime = previousRange.getLatestTime();
-                    latestTime = dateFormat.format(new Date(currentTime));
+                    // we have previous state so set earliestTime to (latestTime + 1) of last range
+                    try {
+                        final String previousLastTime = previousRange.getLatestTime();
+                        final Date previousLastDate = dateFormat.parse(previousLastTime);
+
+                        earliestTime = dateFormat.format(new Date(previousLastDate.getTime() + 1));
+                        latestTime = dateFormat.format(new Date(currentTime));
+                    } catch (ParseException e) {
+                       throw new ProcessException(e);
+                    }
                 }
 
             } catch (IOException e) {
@@ -399,14 +437,26 @@
         }
 
         if (!StringUtils.isBlank(earliestTime)) {
-            exportArgs.setEarliestTime(earliestTime);
+            if (EVENT_TIME_VALUE.getValue().equalsIgnoreCase(timeFieldStrategy)) {
+                exportArgs.setEarliestTime(earliestTime);
+            } else {
+                exportArgs.setIndexEarliest(earliestTime);
+            }
         }
 
         if (!StringUtils.isBlank(latestTime)) {
-            exportArgs.setLatestTime(latestTime);
+            if (EVENT_TIME_VALUE.getValue().equalsIgnoreCase(timeFieldStrategy)) {
+                exportArgs.setLatestTime(latestTime);
+            } else {
+                exportArgs.setIndexLatest(latestTime);
+            }
         }
 
-        getLogger().debug("Using earliestTime of {} and latestTime of {}", new Object[] {earliestTime, latestTime});
+        if (EVENT_TIME_VALUE.getValue().equalsIgnoreCase(timeFieldStrategy)) {
+            getLogger().debug("Using earliest_time of {} and latest_time of {}", new Object[]{earliestTime, latestTime});
+        } else {
+            getLogger().debug("Using index_earliest of {} and index_latest of {}", new Object[]{earliestTime, latestTime});
+        }
 
         final InputStream exportSearch = splunkService.export(query, exportArgs);
 
diff --git a/nifi-nar-bundles/nifi-splunk-bundle/nifi-splunk-processors/src/test/java/org/apache/nifi/processors/splunk/TestGetSplunk.java b/nifi-nar-bundles/nifi-splunk-bundle/nifi-splunk-processors/src/test/java/org/apache/nifi/processors/splunk/TestGetSplunk.java
index 42daab6..5ad7881 100644
--- a/nifi-nar-bundles/nifi-splunk-bundle/nifi-splunk-processors/src/test/java/org/apache/nifi/processors/splunk/TestGetSplunk.java
+++ b/nifi-nar-bundles/nifi-splunk-bundle/nifi-splunk-processors/src/test/java/org/apache/nifi/processors/splunk/TestGetSplunk.java
@@ -36,7 +36,11 @@
 
 import java.io.IOException;
 import java.nio.charset.StandardCharsets;
+import java.text.ParseException;
+import java.text.SimpleDateFormat;
+import java.util.Date;
 import java.util.List;
+import java.util.TimeZone;
 
 import static org.mockito.Mockito.any;
 import static org.mockito.Mockito.argThat;
@@ -149,7 +153,7 @@
     }
 
     @Test
-    public void testGetWithManagedFromBeginning() {
+    public void testGetWithManagedFromBeginning() throws ParseException {
         final String query = "search tcp:7879";
         final String outputMode = GetSplunk.ATOM_VALUE.getValue();
 
@@ -176,7 +180,13 @@
         Assert.assertNotNull(actualArgs1.get("latest_time"));
 
         // save the latest time from the first run which should be earliest time of next run
-        final String expectedLatest = (String) actualArgs1.get("latest_time");
+        final String lastLatest = (String) actualArgs1.get("latest_time");
+
+        final SimpleDateFormat format = new SimpleDateFormat(GetSplunk.DATE_TIME_FORMAT);
+        format.setTimeZone(TimeZone.getTimeZone("UTC"));
+
+        final Date lastLatestDate = format.parse(lastLatest);
+        final String expectedLatest = format.format(new Date(lastLatestDate.getTime() + 1));
 
         // run again
         runner.run(1, false);
@@ -193,7 +203,109 @@
     }
 
     @Test
-    public void testGetWithManagedFromCurrent() throws IOException {
+    public void testGetWithManagedFromBeginningWithDifferentTimeZone() throws ParseException {
+        final String query = "search tcp:7879";
+        final String outputMode = GetSplunk.ATOM_VALUE.getValue();
+        final TimeZone timeZone = TimeZone.getTimeZone("PST");
+
+        runner.setProperty(GetSplunk.QUERY, query);
+        runner.setProperty(GetSplunk.OUTPUT_MODE, outputMode);
+        runner.setProperty(GetSplunk.TIME_RANGE_STRATEGY, GetSplunk.MANAGED_BEGINNING_VALUE.getValue());
+        runner.setProperty(GetSplunk.TIME_ZONE, timeZone.getID());
+
+        final String resultContent = "fake results";
+        final ByteArrayInputStream input = new ByteArrayInputStream(resultContent.getBytes(StandardCharsets.UTF_8));
+        when(service.export(eq(query), any(JobExportArgs.class))).thenReturn(input);
+
+        // run once and don't shut down
+        runner.run(1, false);
+        runner.assertAllFlowFilesTransferred(GetSplunk.REL_SUCCESS, 1);
+
+        // capture what the args were on last run
+        final ArgumentCaptor<JobExportArgs> capture1 = ArgumentCaptor.forClass(JobExportArgs.class);
+        verify(service, times(1)).export(eq(query), capture1.capture());
+
+        // first execution with no previous state and "managed from beginning" should have a latest time and no earliest time
+        final JobExportArgs actualArgs1 = capture1.getValue();
+        Assert.assertNotNull(actualArgs1);
+        Assert.assertNull(actualArgs1.get("earliest_time"));
+        Assert.assertNotNull(actualArgs1.get("latest_time"));
+
+        // save the latest time from the first run which should be earliest time of next run
+        final String lastLatest = (String) actualArgs1.get("latest_time");
+
+        final SimpleDateFormat format = new SimpleDateFormat(GetSplunk.DATE_TIME_FORMAT);
+        format.setTimeZone(timeZone);
+
+        final Date lastLatestDate = format.parse(lastLatest);
+        final String expectedLatest = format.format(new Date(lastLatestDate.getTime() + 1));
+
+        // run again
+        runner.run(1, false);
+        runner.assertAllFlowFilesTransferred(GetSplunk.REL_SUCCESS, 2);
+
+        final ArgumentCaptor<JobExportArgs> capture2 = ArgumentCaptor.forClass(JobExportArgs.class);
+        verify(service, times(2)).export(eq(query), capture2.capture());
+
+        // second execution the earliest time should be the previous latest_time
+        final JobExportArgs actualArgs2 = capture2.getValue();
+        Assert.assertNotNull(actualArgs2);
+        Assert.assertEquals(expectedLatest, actualArgs2.get("earliest_time"));
+        Assert.assertNotNull(actualArgs2.get("latest_time"));
+    }
+
+    @Test
+    public void testGetWithManagedFromBeginningWithShutdown() throws ParseException {
+        final String query = "search tcp:7879";
+        final String outputMode = GetSplunk.ATOM_VALUE.getValue();
+
+        runner.setProperty(GetSplunk.QUERY, query);
+        runner.setProperty(GetSplunk.OUTPUT_MODE, outputMode);
+        runner.setProperty(GetSplunk.TIME_RANGE_STRATEGY, GetSplunk.MANAGED_BEGINNING_VALUE.getValue());
+
+        final String resultContent = "fake results";
+        final ByteArrayInputStream input = new ByteArrayInputStream(resultContent.getBytes(StandardCharsets.UTF_8));
+        when(service.export(eq(query), any(JobExportArgs.class))).thenReturn(input);
+
+        // run once and shut down
+        runner.run(1, true);
+        runner.assertAllFlowFilesTransferred(GetSplunk.REL_SUCCESS, 1);
+
+        // capture what the args were on last run
+        final ArgumentCaptor<JobExportArgs> capture1 = ArgumentCaptor.forClass(JobExportArgs.class);
+        verify(service, times(1)).export(eq(query), capture1.capture());
+
+        // first execution with no previous state and "managed from beginning" should have a latest time and no earliest time
+        final JobExportArgs actualArgs1 = capture1.getValue();
+        Assert.assertNotNull(actualArgs1);
+        Assert.assertNull(actualArgs1.get("earliest_time"));
+        Assert.assertNotNull(actualArgs1.get("latest_time"));
+
+        // save the latest time from the first run which should be earliest time of next run
+        final String lastLatest = (String) actualArgs1.get("latest_time");
+
+        final SimpleDateFormat format = new SimpleDateFormat(GetSplunk.DATE_TIME_FORMAT);
+        format.setTimeZone(TimeZone.getTimeZone("UTC"));
+
+        final Date lastLatestDate = format.parse(lastLatest);
+        final String expectedLatest = format.format(new Date(lastLatestDate.getTime() + 1));
+
+        // run again
+        runner.run(1, true);
+        runner.assertAllFlowFilesTransferred(GetSplunk.REL_SUCCESS, 2);
+
+        final ArgumentCaptor<JobExportArgs> capture2 = ArgumentCaptor.forClass(JobExportArgs.class);
+        verify(service, times(2)).export(eq(query), capture2.capture());
+
+        // second execution the earliest time should be the previous latest_time
+        final JobExportArgs actualArgs2 = capture2.getValue();
+        Assert.assertNotNull(actualArgs2);
+        Assert.assertEquals(expectedLatest, actualArgs2.get("earliest_time"));
+        Assert.assertNotNull(actualArgs2.get("latest_time"));
+    }
+
+    @Test
+    public void testGetWithManagedFromCurrentUsingEventTime() throws IOException, ParseException {
         final String query = "search tcp:7879";
         final String outputMode = GetSplunk.ATOM_VALUE.getValue();
 
@@ -217,7 +329,13 @@
         Assert.assertTrue(state.getVersion() > 0);
 
         // save the latest time from the first run which should be earliest time of next run
-        final String expectedLatest = state.get(GetSplunk.LATEST_TIME_KEY);
+        final String lastLatest = state.get(GetSplunk.LATEST_TIME_KEY);
+
+        final SimpleDateFormat format = new SimpleDateFormat(GetSplunk.DATE_TIME_FORMAT);
+        format.setTimeZone(TimeZone.getTimeZone("UTC"));
+
+        final Date lastLatestDate = format.parse(lastLatest);
+        final String expectedLatest = format.format(new Date(lastLatestDate.getTime() + 1));
 
         // run again
         runner.run(1, false);
@@ -233,6 +351,55 @@
         Assert.assertNotNull(actualArgs.get("latest_time"));
     }
 
+    @Test
+    public void testGetWithManagedFromCurrentUsingIndexTime() throws IOException, ParseException {
+        final String query = "search tcp:7879";
+        final String outputMode = GetSplunk.ATOM_VALUE.getValue();
+
+        runner.setProperty(GetSplunk.QUERY, query);
+        runner.setProperty(GetSplunk.OUTPUT_MODE, outputMode);
+        runner.setProperty(GetSplunk.TIME_RANGE_STRATEGY, GetSplunk.MANAGED_CURRENT_VALUE.getValue());
+        runner.setProperty(GetSplunk.TIME_FIELD_STRATEGY, GetSplunk.INDEX_TIME_VALUE.getValue());
+
+        final String resultContent = "fake results";
+        final ByteArrayInputStream input = new ByteArrayInputStream(resultContent.getBytes(StandardCharsets.UTF_8));
+        when(service.export(eq(query), any(JobExportArgs.class))).thenReturn(input);
+
+        // run once and don't shut down, shouldn't produce any results first time
+        runner.run(1, false);
+        runner.assertAllFlowFilesTransferred(GetSplunk.REL_SUCCESS, 0);
+
+        // capture what the args were on last run
+        verify(service, times(0)).export(eq(query), any(JobExportArgs.class));
+
+        final StateMap state = runner.getStateManager().getState(Scope.CLUSTER);
+        Assert.assertNotNull(state);
+        Assert.assertTrue(state.getVersion() > 0);
+
+        // save the latest time from the first run which should be earliest time of next run
+        final String lastLatest = state.get(GetSplunk.LATEST_TIME_KEY);
+
+        final SimpleDateFormat format = new SimpleDateFormat(GetSplunk.DATE_TIME_FORMAT);
+        format.setTimeZone(TimeZone.getTimeZone("UTC"));
+
+        final Date lastLatestDate = format.parse(lastLatest);
+        final String expectedLatest = format.format(new Date(lastLatestDate.getTime() + 1));
+
+        // run again
+        runner.run(1, false);
+        runner.assertAllFlowFilesTransferred(GetSplunk.REL_SUCCESS, 1);
+
+        final ArgumentCaptor<JobExportArgs> capture = ArgumentCaptor.forClass(JobExportArgs.class);
+        verify(service, times(1)).export(eq(query), capture.capture());
+
+        // second execution the earliest time should be the previous latest_time
+        final JobExportArgs actualArgs = capture.getValue();
+        Assert.assertNotNull(actualArgs);
+
+        Assert.assertEquals(expectedLatest, actualArgs.get("index_earliest"));
+        Assert.assertNotNull(actualArgs.get("index_latest"));
+    }
+
 
     /**
      * Testable implementation of GetSplunk to return a Mock Splunk Service.