NIFI-8773: Implemented Line Start Pattern in TailFile

Each message encountered in the tailed file will be buffered (up to some configurable max) until the subsequent message arrives. At that point, the previous message will be flushed.

This closes #5251

Signed-off-by: David Handermann <exceptionfactory@apache.org>
diff --git a/nifi-nar-bundles/nifi-standard-bundle/nifi-standard-processors/src/main/java/org/apache/nifi/processors/standard/TailFile.java b/nifi-nar-bundles/nifi-standard-bundle/nifi-standard-processors/src/main/java/org/apache/nifi/processors/standard/TailFile.java
index 2b1bfcd..19e5724 100644
--- a/nifi-nar-bundles/nifi-standard-bundle/nifi-standard-processors/src/main/java/org/apache/nifi/processors/standard/TailFile.java
+++ b/nifi-nar-bundles/nifi-standard-bundle/nifi-standard-processors/src/main/java/org/apache/nifi/processors/standard/TailFile.java
@@ -32,6 +32,7 @@
 import org.apache.nifi.annotation.notification.OnPrimaryNodeStateChange;
 import org.apache.nifi.components.AllowableValue;
 import org.apache.nifi.components.PropertyDescriptor;
+import org.apache.nifi.components.PropertyDescriptor.Builder;
 import org.apache.nifi.components.RequiredPermission;
 import org.apache.nifi.components.ValidationContext;
 import org.apache.nifi.components.ValidationResult;
@@ -40,6 +41,7 @@
 import org.apache.nifi.flowfile.FlowFile;
 import org.apache.nifi.flowfile.attributes.CoreAttributes;
 import org.apache.nifi.processor.AbstractProcessor;
+import org.apache.nifi.processor.DataUnit;
 import org.apache.nifi.processor.ProcessContext;
 import org.apache.nifi.processor.ProcessSession;
 import org.apache.nifi.processor.Relationship;
@@ -60,6 +62,7 @@
 import java.io.OutputStream;
 import java.nio.ByteBuffer;
 import java.nio.channels.FileChannel;
+import java.nio.charset.StandardCharsets;
 import java.nio.file.DirectoryStream;
 import java.nio.file.Files;
 import java.nio.file.Path;
@@ -84,6 +87,8 @@
 
 import static org.apache.nifi.expression.ExpressionLanguageScope.NONE;
 import static org.apache.nifi.expression.ExpressionLanguageScope.VARIABLE_REGISTRY;
+import static org.apache.nifi.processor.util.StandardValidators.DATA_SIZE_VALIDATOR;
+import static org.apache.nifi.processor.util.StandardValidators.REGULAR_EXPRESSION_VALIDATOR;
 
 // note: it is important that this Processor is not marked as @SupportsBatching because the session commits must complete before persisting state locally; otherwise, data loss may occur
 @TriggerSerially
@@ -108,6 +113,7 @@
 public class TailFile extends AbstractProcessor {
 
     static final String MAP_PREFIX = "file.";
+    private static final byte[] NEW_LINE_BYTES = "\n".getBytes(StandardCharsets.UTF_8);
 
     static final AllowableValue LOCATION_LOCAL = new AllowableValue("Local", "Local",
             "State is stored locally. Each node in a cluster will tail a different file.");
@@ -130,7 +136,7 @@
             "Start with the data at the end of the File to Tail. Do not ingest any data thas has already been rolled over or any "
             + "data in the File to Tail that has already been written.");
 
-    static final PropertyDescriptor BASE_DIRECTORY = new PropertyDescriptor.Builder()
+    static final PropertyDescriptor BASE_DIRECTORY = new Builder()
             .name("tail-base-directory")
             .displayName("Base directory")
             .description("Base directory used to look for files to tail. This property is required when using Multifile mode.")
@@ -139,7 +145,7 @@
             .required(false)
             .build();
 
-    static final PropertyDescriptor MODE = new PropertyDescriptor.Builder()
+    static final PropertyDescriptor MODE = new Builder()
             .name("tail-mode")
             .displayName("Tailing mode")
             .description("Mode to use: single file will tail only one file, multiple file will look for a list of file. In Multiple mode"
@@ -150,7 +156,7 @@
             .defaultValue(MODE_SINGLEFILE.getValue())
             .build();
 
-    static final PropertyDescriptor FILENAME = new PropertyDescriptor.Builder()
+    static final PropertyDescriptor FILENAME = new Builder()
             .displayName("File(s) to Tail")
             .name("File to Tail")
             .description("Path of the file to tail in case of single file mode. If using multifile mode, regular expression to find files "
@@ -161,7 +167,7 @@
             .required(true)
             .build();
 
-    static final PropertyDescriptor ROLLING_FILENAME_PATTERN = new PropertyDescriptor.Builder()
+    static final PropertyDescriptor ROLLING_FILENAME_PATTERN = new Builder()
             .name("Rolling Filename Pattern")
             .description("If the file to tail \"rolls over\" as would be the case with log files, this filename pattern will be used to "
                     + "identify files that have rolled over so that if NiFi is restarted, and the file has rolled over, it will be able to pick up where it left off. "
@@ -173,7 +179,7 @@
             .required(false)
             .build();
 
-    static final PropertyDescriptor POST_ROLLOVER_TAIL_PERIOD = new PropertyDescriptor.Builder()
+    static final PropertyDescriptor POST_ROLLOVER_TAIL_PERIOD = new Builder()
         .name("Post-Rollover Tail Period")
         .description("When a file is rolled over, the processor will continue tailing the rolled over file until it has not been modified for this amount of time. " +
             "This allows for another process to rollover a file, and then flush out any buffered data. Note that when this value is set, and the tailed file rolls over, " +
@@ -186,7 +192,7 @@
         .defaultValue("0 sec")
         .build();
 
-    static final PropertyDescriptor STATE_LOCATION = new PropertyDescriptor.Builder()
+    static final PropertyDescriptor STATE_LOCATION = new Builder()
             .displayName("State Location")
             .name("File Location") //retained name of property for backward compatibility of configs
             .description("Specifies where the state is located either local or cluster so that state can be stored "
@@ -196,7 +202,7 @@
             .defaultValue(LOCATION_LOCAL.getValue())
             .build();
 
-    static final PropertyDescriptor START_POSITION = new PropertyDescriptor.Builder()
+    static final PropertyDescriptor START_POSITION = new Builder()
             .name("Initial Start Position")
             .description("When the Processor first begins to tail data, this property specifies where the Processor should begin reading data. Once data has been ingested from a file, "
                     + "the Processor will continue from the last point from which it has received data.")
@@ -205,7 +211,7 @@
             .required(true)
             .build();
 
-    static final PropertyDescriptor RECURSIVE = new PropertyDescriptor.Builder()
+    static final PropertyDescriptor RECURSIVE = new Builder()
             .name("tailfile-recursive-lookup")
             .displayName("Recursive lookup")
             .description("When using Multiple files mode, this property defines if files must be listed recursively or not"
@@ -215,7 +221,7 @@
             .required(true)
             .build();
 
-    static final PropertyDescriptor LOOKUP_FREQUENCY = new PropertyDescriptor.Builder()
+    static final PropertyDescriptor LOOKUP_FREQUENCY = new Builder()
             .name("tailfile-lookup-frequency")
             .displayName("Lookup frequency")
             .description("Only used in Multiple files mode. It specifies the minimum "
@@ -225,7 +231,7 @@
             .addValidator(StandardValidators.TIME_PERIOD_VALIDATOR)
             .build();
 
-    static final PropertyDescriptor MAXIMUM_AGE = new PropertyDescriptor.Builder()
+    static final PropertyDescriptor MAXIMUM_AGE = new Builder()
             .name("tailfile-maximum-age")
             .displayName("Maximum age")
             .description("Only used in Multiple files mode. It specifies the necessary "
@@ -237,7 +243,7 @@
             .addValidator(StandardValidators.TIME_PERIOD_VALIDATOR)
             .build();
 
-    static final PropertyDescriptor REREAD_ON_NUL = new PropertyDescriptor.Builder()
+    static final PropertyDescriptor REREAD_ON_NUL = new Builder()
             .name("reread-on-nul")
             .displayName("Reread when NUL encountered")
             .description("If this option is set to 'true', when a NUL character is read, the processor will yield and try to read the same part again later. "
@@ -251,6 +257,30 @@
             .defaultValue("false")
             .build();
 
+    static final PropertyDescriptor LINE_START_PATTERN = new Builder()
+        .name("Line Start Pattern")
+        .displayName("Line Start Pattern")
+        .description("A Regular Expression to match against the start of a log line. If specified, any line that matches the expression, and any following lines, will be buffered until another line" +
+            " matches the Expression. In doing this, we can avoid splitting apart multi-line messages in the file. This assumes that the data is in UTF-8 format.")
+        .required(false)
+        .addValidator(REGULAR_EXPRESSION_VALIDATOR)
+        .expressionLanguageSupported(NONE)
+        .dependsOn(MODE, MODE_SINGLEFILE)
+        .build();
+
+    static final PropertyDescriptor MAX_BUFFER_LENGTH = new Builder()
+        .name("Max Buffer Size")
+        .displayName("Max Buffer Size")
+        .description("When using the Line Start Pattern, there may be situations in which the data in the file being tailed never matches the Regular Expression. This would result in the processor " +
+            "buffering all data from the tailed file, which can quickly exhaust the heap. To avoid this, the Processor will buffer only up to this amount of data before flushing the buffer, even if" +
+            " it means ingesting partial data from the file.")
+        .required(true)
+        .addValidator(DATA_SIZE_VALIDATOR)
+        .expressionLanguageSupported(NONE)
+        .defaultValue("64 KB")
+        .dependsOn(LINE_START_PATTERN)
+        .build();
+
     static final Relationship REL_SUCCESS = new Relationship.Builder()
             .name("success")
             .description("All FlowFiles are routed to this Relationship.")
@@ -261,6 +291,10 @@
     private volatile AtomicBoolean isMultiChanging = new AtomicBoolean(false);
     private volatile boolean requireStateLookup = true;
 
+    private volatile ByteArrayOutputStream linesBuffer = new ByteArrayOutputStream();
+    private volatile Pattern lineStartPattern;
+    private volatile long maxBufferBytes;
+
     @Override
     protected List<PropertyDescriptor> getSupportedPropertyDescriptors() {
         final List<PropertyDescriptor> properties = new ArrayList<>();
@@ -275,6 +309,8 @@
         properties.add(LOOKUP_FREQUENCY);
         properties.add(MAXIMUM_AGE);
         properties.add(REREAD_ON_NUL);
+        properties.add(LINE_START_PATTERN);
+        properties.add(MAX_BUFFER_LENGTH);
         return properties;
     }
 
@@ -341,6 +377,14 @@
     }
 
     @OnScheduled
+    public void compileRegex(final ProcessContext context) {
+        final String regex = context.getProperty(LINE_START_PATTERN).getValue();
+        lineStartPattern = (regex == null) ? null : Pattern.compile(regex);
+
+        this.maxBufferBytes = context.getProperty(MAX_BUFFER_LENGTH).asDataSize(DataUnit.B).longValue();
+    }
+
+    @OnScheduled
     public void recoverState(final ProcessContext context) throws IOException {
         // set isMultiChanging
         isMultiChanging.set(context.getProperty(MODE).getValue().equals(MODE_MULTIFILE.getValue()));
@@ -581,13 +625,16 @@
     }
 
     @OnStopped
-    public void cleanup() {
+    public void cleanup(final ProcessContext context) {
         for (TailFileObject tfo : states.values()) {
             cleanReader(tfo);
             final TailFileState state = tfo.getState();
-            tfo.setState(new TailFileState(state.getFilename(), state.getFile(), null, state.getPosition(),
+            tfo.setState(new TailFileState(state.getFilename(), state.getFile(), null, state.getPosition() - linesBuffer.size(),
                 state.getTimestamp(), state.getLength(), state.getChecksum(), state.getBuffer(), state.isTailingPostRollover()));
+            persistState(tfo, null, context);
         }
+
+        linesBuffer.reset();
     }
 
     private void cleanReader(TailFileObject tfo) {
@@ -643,7 +690,7 @@
             return;
         }
 
-        for (String tailFile : states.keySet()) {
+        for (final String tailFile : states.keySet()) {
             try {
                 processTailFile(context, session, tailFile);
             } catch (NulCharacterEncounteredException e) {
@@ -652,6 +699,12 @@
                 return;
             }
         }
+
+        // If a Line Start Pattern is being used and data is buffered, the Position that has been stored in the state will
+        // not be accurate. To address this, we call cleanup(), which will handle updating the state to the correct values for us.
+        if (lineStartPattern != null && linesBuffer.size() > 0) {
+            cleanup(context);
+        }
     }
 
     private void processTailFile(final ProcessContext context, final ProcessSession session, final String tailFile) {
@@ -667,7 +720,7 @@
             if (START_BEGINNING_OF_TIME.getValue().equals(recoverPosition)) {
                 recoverRolledFiles(context, session, tailFile, tfo.getExpectedRecoveryChecksum(), tfo.getState().getTimestamp(), tfo.getState().getPosition());
             } else if (START_CURRENT_FILE.getValue().equals(recoverPosition)) {
-                cleanup();
+                cleanup(context);
                 tfo.setState(new TailFileState(tailFile, null, null, 0L, 0L, 0L, null, tfo.getState().getBuffer()));
             } else {
                 final String filename = tailFile;
@@ -687,7 +740,7 @@
                     }
 
                     fileChannel.position(position);
-                    cleanup();
+                    cleanup(context);
                     tfo.setState(new TailFileState(filename, file, fileChannel, position, timestamp, file.length(), checksum, tfo.getState().getBuffer()));
                 } catch (final IOException ioe) {
                     getLogger().error("Attempted to position Reader at current position in file {} but failed to do so due to {}", new Object[]{file, ioe.toString()}, ioe);
@@ -847,8 +900,12 @@
             flowFile = session.putAllAttributes(flowFile, attributes);
 
             session.getProvenanceReporter().receive(flowFile, file.toURI().toString(), "FlowFile contains bytes " + position + " through " + positionHolder.get() + " of source file",
-                    TimeUnit.NANOSECONDS.toMillis(System.nanoTime() - startNanos));
+                TimeUnit.NANOSECONDS.toMillis(System.nanoTime() - startNanos));
             session.transfer(flowFile, REL_SUCCESS);
+            getLogger().debug("Created {} and routed to success", flowFile);
+        }
+
+        if (flowFile.getSize() > 0 || linesBuffer.size() > 0) {
             position = positionHolder.get();
 
             // Set timestamp to the latest of when the file was modified and the current timestamp stored in the state.
@@ -858,7 +915,6 @@
             // rotated file a second time.
             timestamp = Math.max(state.getTimestamp(), file.lastModified());
             length = file.length();
-            getLogger().debug("Created {} and routed to success", new Object[]{flowFile});
         }
 
         // Create a new state object to represent our current position, timestamp, etc.
@@ -930,7 +986,7 @@
                         case '\n': {
                             baos.write(ch);
                             seenCR = false;
-                            flushByteArrayOutputStream(baos, out, checksum);
+                            flushByteArrayOutputStream(baos, out, checksum, false);
                             rePos = pos + i + 1;
                             linesRead++;
                             break;
@@ -948,7 +1004,7 @@
                         default: {
                             if (seenCR) {
                                 seenCR = false;
-                                flushByteArrayOutputStream(baos, out, checksum);
+                                flushByteArrayOutputStream(baos, out, checksum, false);
                                 linesRead++;
                                 baos.write(ch);
                                 rePos = pos + i;
@@ -963,7 +1019,7 @@
             }
 
             if (readFully) {
-                flushByteArrayOutputStream(baos, out, checksum);
+                flushByteArrayOutputStream(baos, out, checksum, true);
                 rePos = reader.position();
             }
 
@@ -976,14 +1032,53 @@
         }
     }
 
-    private void flushByteArrayOutputStream(final ByteArrayOutputStream baos, final OutputStream out, final Checksum checksum) throws IOException {
-        baos.writeTo(out);
+    private void flushByteArrayOutputStream(final ByteArrayOutputStream baos, final OutputStream out, final Checksum checksum, final boolean ignoreRegex) throws IOException {
         final byte[] baosBuffer = baos.toByteArray();
-        checksum.update(baosBuffer, 0, baos.size());
+        baos.reset();
+
+        // If the regular expression is being ignored, we need to flush anything that is buffered.
+        // This happens, for example, when a file has been rolled over. At that point, we want to flush whatever we have,
+        // even if the regex hasn't been matched.
+        if (ignoreRegex) {
+            flushLinesBuffer(out, checksum);
+        }
+
+        if (lineStartPattern == null) {
+            out.write(baosBuffer);
+
+            checksum.update(baosBuffer, 0, baosBuffer.length);
+            if (getLogger().isTraceEnabled()) {
+                getLogger().trace("Checksum updated to {}", checksum.getValue());
+            }
+
+            return;
+        }
+
+        final String bufferAsString = new String(baosBuffer, StandardCharsets.UTF_8);
+        final String[] lines = bufferAsString.split("\n");
+        for (final String line : lines) {
+            final boolean startsWithRegex = lineStartPattern.matcher(line).lookingAt();
+
+            if (startsWithRegex || linesBuffer.size() >= maxBufferBytes) {
+                // We found a line that matches our start regex. Flush what we have buffered and reset our buffer.
+                flushLinesBuffer(out, checksum);
+            }
+
+            // This line does not match our start regex. Buffer this line until we encounter a line that does.
+            linesBuffer.write(line.getBytes(StandardCharsets.UTF_8));
+            linesBuffer.write(NEW_LINE_BYTES);
+        }
+    }
+
+    private void flushLinesBuffer(final OutputStream out, final Checksum checksum) throws IOException {
+        linesBuffer.writeTo(out);
+
+        checksum.update(linesBuffer.toByteArray(), 0, linesBuffer.size());
         if (getLogger().isTraceEnabled()) {
             getLogger().trace("Checksum updated to {}", checksum.getValue());
         }
-        baos.reset();
+
+        linesBuffer.reset();
     }
 
     /**
@@ -1074,7 +1169,8 @@
 
     private void persistState(final Map<String, String> state, final ProcessSession session, final ProcessContext context) {
         try {
-            final StateMap oldState = session.getState(getStateScope(context));
+            final Scope scope = getStateScope(context);
+            final StateMap oldState = session == null ? context.getStateManager().getState(scope) : session.getState(scope);
             Map<String, String> updatedState = new HashMap<>();
 
             for(String key : oldState.toMap().keySet()) {
@@ -1092,7 +1188,11 @@
 
             updatedState.putAll(state);
 
-            session.setState(updatedState, getStateScope(context));
+            if (session == null) {
+                context.getStateManager().setState(updatedState, scope);
+            } else {
+                session.setState(updatedState, scope);
+            }
         } catch (final IOException e) {
             getLogger().warn("Failed to store state due to {}; some data may be duplicated on restart of NiFi", new Object[]{e});
         }
@@ -1361,7 +1461,7 @@
 
                 // use a timestamp of lastModified() + 1 so that we do not ingest this file again.
                 getLogger().debug("Completed tailing of file {}; will cleanup state", tailFile);
-                cleanup();
+                cleanup(context);
                 tfo.setState(new TailFileState(tailFile, null, null, 0L, fileToTail.lastModified() + 1L, fileToTail.length(), null, tfo.getState().getBuffer(), tailingPostRollover));
             }
 
@@ -1383,9 +1483,16 @@
      * @return the new, updated state that reflects that the given file has been
      * ingested.
      */
-    private TailFileState consumeFileFully(final File file, final ProcessContext context, final ProcessSession session, TailFileObject tfo) {
+    private TailFileState consumeFileFully(final File file, final ProcessContext context, final ProcessSession session, TailFileObject tfo) throws IOException {
         FlowFile flowFile = session.create();
-        flowFile = session.importFrom(file.toPath(), true, flowFile);
+
+        try (final InputStream fis = new FileInputStream(file)) {
+            flowFile = session.write(flowFile, out -> {
+                flushLinesBuffer(out, new CRC32());
+                StreamUtils.copy(fis, out);
+            });
+        }
+
         if (flowFile.getSize() == 0L) {
             session.remove(flowFile);
         } else {
@@ -1399,7 +1506,7 @@
             getLogger().debug("Created {} from {} and routed to success", new Object[]{flowFile, file});
 
             // use a timestamp of lastModified() + 1 so that we do not ingest this file again.
-            cleanup();
+            cleanup(context);
             tfo.setState(new TailFileState(context.getProperty(FILENAME).evaluateAttributeExpressions().getValue(), null, null, 0L, file.lastModified() + 1L, file.length(), null,
                     tfo.getState().getBuffer()));
 
diff --git a/nifi-nar-bundles/nifi-standard-bundle/nifi-standard-processors/src/main/resources/docs/org.apache.nifi.processors.standard.TailFile/additionalDetails.html b/nifi-nar-bundles/nifi-standard-bundle/nifi-standard-processors/src/main/resources/docs/org.apache.nifi.processors.standard.TailFile/additionalDetails.html
index 1585e71..e5013b2 100644
--- a/nifi-nar-bundles/nifi-standard-bundle/nifi-standard-processors/src/main/resources/docs/org.apache.nifi.processors.standard.TailFile/additionalDetails.html
+++ b/nifi-nar-bundles/nifi-standard-bundle/nifi-standard-processors/src/main/resources/docs/org.apache.nifi.processors.standard.TailFile/additionalDetails.html
@@ -22,27 +22,48 @@
     </head>
 
     <body>
-        <h3>Modes</h3>
+
+		<h3>Introduction</h3>
+		<p>
+			This processor offers a powerful capability, allowing the user to periodically look at a file that is actively being written to by another process.
+			When the file changes, the new lines are ingested. This Processor assumes that data in the file is textual.
+		</p>
+		<p>
+			Tailing a file from a filesystem is a seemingly simple but notoriously difficult task. This is because we are periodically checking the contents
+			of a file that is being written to. The file may be constantly changing, or it may rarely change. The file may be "rolled over" (i.e., renamed)
+			and it's important that even after restarting the application (NiFi, in this case), we are able to pick up where we left off. Other additional complexities
+			also come into play. For example, NFS mounted drives may indicate that data is readable but then return NUL bytes (Unicode 0) when attempting to read, as
+			the actual bytes are not yet known (see the &lt;Reread when NUL encountered&gt; property), and file systems have different timestamp granularities.
+		</p>
+		<p>
+			This Processor is designed to handle all of these different cases. This can lead to slightly more complex configuration, but this document should provide
+			you with all you need to get started!
+		</p>
+
+
+		<h3>Modes</h3>
         <p>
-            This processor is used to tail a file or multiple files according to multiple modes. The 
+            This processor is used to tail a file or multiple files, depending on the configured mode. The
             mode to choose depends of the logging pattern followed by the file(s) to tail. In any case, if there
-            is a rolling pattern, the rolling files must be plain text files (compression is not supported at 
+            is a rolling pattern, the rolling files must be plain text files (compression is not supported at
             the moment).
         </p>
+
         <ul>
         	<li><b>Single file</b>: the processor will tail the file with the path given in 'File(s) to tail' property.</li>
         	<li><b>Multiple files</b>: the processor will look for files into the 'Base directory'. It will look for file recursively
         	according to the 'Recursive lookup' property and will tail all the files matching the regular expression
         	provided in the 'File(s) to tail' property.</li>
         </ul>
+
         <h3>Rolling filename pattern</h3>
         <p>
         	In case the 'Rolling filename pattern' property is used, when the processor detects that the file to tail has rolled over, the
-        	processor will look for possible missing messages in the rolled file. To do so, the processor will use the pattern to find the 
+        	processor will look for possible missing messages in the rolled file. To do so, the processor will use the pattern to find the
         	rolling files in the same directory as the file to tail.
         </p>
         <p>
-        	In order to keep this property available in the 'Multiple files' mode when multiples files to tail are in the same directory, 
+        	In order to keep this property available in the 'Multiple files' mode when multiples files to tail are in the same directory,
         	it is possible to use the ${filename} tag to reference the name (without extension) of the file to tail. For example, if we have:
         </p>
        	<p>
@@ -72,7 +93,7 @@
         	and new log messages are always appended in my-app.log file.
         </p>
         <p>
-        	In case recursivity is set to 'true'. The regular expression for the files to tail must embrace the possible intermediate directories 
+        	In case recursivity is set to 'true'. The regular expression for the files to tail must embrace the possible intermediate directories
         	between the base directory and the files to tail. Example:
         </p>
         <p>
@@ -89,26 +110,71 @@
 	        	Recursivity: true
         	</code>
        	</p>
-        <p>
+
+		<p>
         	If the processor is configured with '<b>Multiple files</b>' mode, two additional properties are relevant:
         </p>
+
         <ul>
         	<li><b>Lookup frequency</b>: specifies the minimum duration the processor will wait before listing again the files to tail.</li>
-        	<li><b>Maximum age</b>: specifies the necessary minimum duration to consider that no new messages will be appended in a file 
+        	<li><b>Maximum age</b>: specifies the necessary minimum duration to consider that no new messages will be appended in a file
         	regarding its last modification date. If the amount of time that has elapsed since the file was modified is larger than this
         	period of time, the file will not be tailed. For example, if a file was modified 24 hours ago and this property is set to 12 hours,
         	the file will not be tailed. But if this property is set to 36 hours, then the file will continue to be tailed.</li>
         </ul>
-        <p>
-        	It is necessary to pay attention to 'Lookup frequency' and 'Maximum age' properties, as well as the frequency at which the processor is 
-        	triggered, in order to achieve high performance. It is recommended to keep 'Maximum age' > 'Lookup frequency' > processor scheduling 
-        	frequency to avoid missing data. It also recommended not to set 'Maximum Age' too low because if messages are appended in a file 
+
+		<p>
+        	It is necessary to pay attention to 'Lookup frequency' and 'Maximum age' properties, as well as the frequency at which the processor is
+        	triggered, in order to achieve high performance. It is recommended to keep 'Maximum age' > 'Lookup frequency' > processor scheduling
+        	frequency to avoid missing data. It also recommended not to set 'Maximum Age' too low because if messages are appended in a file
         	after this file has been considered "too old", all the messages in the file may be read again, leading to data duplication.
         </p>
-        <p>
-        	If the processor is configured with '<b>Multiple files</b>' mode, the 'Rolling 
+
+		<p>
+        	If the processor is configured with '<b>Multiple files</b>' mode, the 'Rolling
         	filename pattern' property must be specific enough to ensure that only the rolling files will be listed and not other currently tailed
         	files in the same directory (this can be achieved using ${filename} tag).
         </p>
+
+
+		<h3>Handling Multi-Line Messages</h3>
+		<p>
+			Most of the time, when we tail a file, we are happy to receive data periodically, however it was written to the file. There are scenarios, though,
+			where we may have data written in such a way that multiple lines need to be retained together. Take, for example, the following lines of text that
+			might be found in a log file:
+		</p>
+	<code>
+		<pre>
+2021-07-09 14:12:19,731 INFO [main] org.apache.nifi.NiFi Launching NiFi...
+2021-07-09 14:12:19,915 INFO [main] o.a.n.p.AbstractBootstrapPropertiesLoader Determined default application properties path to be '/Users/mpayne/devel/nifi/nifi-assembly/target/nifi-1.14.0-SNAPSHOT-bin/nifi-1.14.0-SNAPSHOT/./conf/nifi.properties'
+2021-07-09 14:12:19,919 INFO [main] o.a.nifi.properties.NiFiPropertiesLoader Loaded 199 properties from /Users/mpayne/devel/nifi/nifi-assembly/target/nifi-1.14.0-SNAPSHOT-bin/nifi-1.14.0-SNAPSHOT/./conf/nifi.properties
+2021-07-09 14:12:19,925 WARN Line 1 of Log Message
+			Line 2: This is an important warning.
+			Line 3: Please do not ignore this warning.
+			Line 4: These lines of text make sense only in the context of the original message.
+2021-07-09 14:12:19,941 INFO [main] Final message in log file
+		</pre>
+	</code>
+
+		<p>
+			In this case, we may want to ensure that the log lines are not ingested in such a way that our multi-line log message is not broken up into Lines 1 and 2 in one FlowFile
+			and Lines 3 and 4 in another. To accomplish this, the Processor exposes the &lt;Line Start Pattern&gt; property. If we set this Property to a value of
+			<code>\d{4}-\d{2}-\d{2}</code>, then we are telling the Processor that each message should begin with 4 digits, followed by a dash, followed by 2 digits, a dash, and 2 digits.
+			I.e., we are telling it that each message begins with a timestamp in yyyy-MM-dd format. Because of this, even if the Processor runs and sees only Lines 1 and 2 of our
+			multiline log message, it will not ingest the data yet. It will wait until it sees the next message, which starts with a timestamp.
+		</p>
+		<p>
+			Note that, because of this, the last message that the Processor will encounter in the above situation is the "Final message in log file" line. At this point, the Processor does
+			not know whether the next line of text it encounters will be part of this line or a new message. As such, it will not ingest this data. It will wait until either another message
+			is encountered (that matches our regex) or until the file is rolled over (renamed). Because of this, there may be some delay in ingesting the last message in the file, if the process
+			that writes to the file just stops writing at this point.
+		</p>
+
+		<p>
+			Additionally, we run the chance of the Regular Expression not matching the data in the file. This could result in buffering all of the file's content, which could cause NiFi
+			to run out of memory. To avoid this, the &lt;Max Buffer Size&gt; property limits the amount of data that can be buffered. If this amount of data is buffered, it will be flushed
+			to the FlowFile, even if another message hasn't been encountered.
+		</p>
+
     </body>
 </html>
\ No newline at end of file
diff --git a/nifi-nar-bundles/nifi-standard-bundle/nifi-standard-processors/src/test/java/org/apache/nifi/processors/standard/AbstractTestTailFileScenario.java b/nifi-nar-bundles/nifi-standard-bundle/nifi-standard-processors/src/test/java/org/apache/nifi/processors/standard/AbstractTestTailFileScenario.java
index 2c98a6f..8d890be 100644
--- a/nifi-nar-bundles/nifi-standard-bundle/nifi-standard-processors/src/test/java/org/apache/nifi/processors/standard/AbstractTestTailFileScenario.java
+++ b/nifi-nar-bundles/nifi-standard-bundle/nifi-standard-processors/src/test/java/org/apache/nifi/processors/standard/AbstractTestTailFileScenario.java
@@ -18,6 +18,7 @@
 
 import org.apache.commons.lang3.SystemUtils;
 import org.apache.nifi.util.MockFlowFile;
+import org.apache.nifi.util.MockProcessContext;
 import org.apache.nifi.util.TestRunner;
 import org.apache.nifi.util.TestRunners;
 import org.junit.After;
@@ -110,7 +111,7 @@
             randomAccessFile.close();
         }
 
-        processor.cleanup();
+        processor.cleanup(new MockProcessContext(processor));
     }
 
     public void testScenario(List<Action> actions) throws Exception {
diff --git a/nifi-nar-bundles/nifi-standard-bundle/nifi-standard-processors/src/test/java/org/apache/nifi/processors/standard/TestTailFile.java b/nifi-nar-bundles/nifi-standard-bundle/nifi-standard-processors/src/test/java/org/apache/nifi/processors/standard/TestTailFile.java
index e7d6177..a9ac7e5 100644
--- a/nifi-nar-bundles/nifi-standard-bundle/nifi-standard-processors/src/test/java/org/apache/nifi/processors/standard/TestTailFile.java
+++ b/nifi-nar-bundles/nifi-standard-bundle/nifi-standard-processors/src/test/java/org/apache/nifi/processors/standard/TestTailFile.java
@@ -22,6 +22,7 @@
 import org.apache.nifi.processors.standard.TailFile.TailFileState;
 import org.apache.nifi.state.MockStateManager;
 import org.apache.nifi.util.MockFlowFile;
+import org.apache.nifi.util.MockProcessContext;
 import org.apache.nifi.util.TestRunner;
 import org.apache.nifi.util.TestRunners;
 import org.junit.After;
@@ -29,6 +30,8 @@
 import org.junit.Assume;
 import org.junit.Before;
 import org.junit.Test;
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
 
 import java.io.BufferedWriter;
 import java.io.ByteArrayOutputStream;
@@ -58,6 +61,7 @@
 import static org.junit.Assume.assumeFalse;
 
 public class TestTailFile {
+    private static final Logger logger = LoggerFactory.getLogger(TestTailFile.class);
 
     private File file;
     private File existingFile;
@@ -118,7 +122,7 @@
             otherRaf.close();
         }
 
-        processor.cleanup();
+        processor.cleanup(new MockProcessContext(processor));
 
         final File[] files = file.getParentFile().listFiles();
         if (files != null) {
@@ -834,6 +838,73 @@
     }
 
     @Test
+    public void testMultiLineWaitsForRegexMatchShutdownBetweenReads() throws IOException {
+        testMultiLineWaitsForRegexMatch(true);
+    }
+
+    @Test
+    public void testMultiLineWaitsForRegexMatchWithoutShutdownBetweenReads() throws IOException {
+        testMultiLineWaitsForRegexMatch(false);
+    }
+
+    private void testMultiLineWaitsForRegexMatch(final boolean shutdownBetweenReads) throws IOException {
+        runner.setProperty(TailFile.LINE_START_REGEX, "<\\d>");
+        runner.setProperty(TailFile.ROLLING_FILENAME_PATTERN, "log.*");
+
+        final String line1 = "<1>Hello, World\n";
+        final String line2 = "<2>Good-bye, World\n";
+        final String line3 = "<3>Start of multi-line\n";
+        final String line4 = "<4>Last One\n";
+
+        raf.write(line1.getBytes());
+        raf.write(line2.getBytes());
+
+        runner.run(1, shutdownBetweenReads, true);
+
+        runner.assertAllFlowFilesTransferred(TailFile.REL_SUCCESS, 1);
+        runner.clearTransferState();
+
+        raf.write(line3.getBytes());
+        runner.run(1, shutdownBetweenReads, shutdownBetweenReads);
+        runner.assertAllFlowFilesTransferred(TailFile.REL_SUCCESS, 1);
+        runner.clearTransferState();
+
+        for (int i=0; i < 10; i++) {
+            logger.info("i = " + i);
+            raf.write(String.valueOf(i).getBytes());
+            raf.write("\n".getBytes());
+
+            runner.run(1, shutdownBetweenReads, shutdownBetweenReads);
+            runner.assertAllFlowFilesTransferred(TailFile.REL_SUCCESS, 0);
+        }
+
+        // The state should indicate that the position is only equal to the length of the first 2 lines because that's all that has been emitted.
+        final Map<String, String> stateMap = runner.getStateManager().getState(Scope.LOCAL).toMap();
+        assertEquals(String.valueOf(line1.length() + line2.length() + line3.length() + 20), stateMap.get("file.0.length"));
+        assertEquals(String.valueOf(line1.length() + line2.length()), stateMap.get("file.0.position"));
+
+        raf.write(line4.getBytes());
+        runner.run(1, shutdownBetweenReads, shutdownBetweenReads);
+        runner.assertAllFlowFilesTransferred(TailFile.REL_SUCCESS, 1);
+
+        final MockFlowFile multiLineOutputFile = runner.getFlowFilesForRelationship(TailFile.REL_SUCCESS).get(0);
+        multiLineOutputFile.assertContentEquals("<3>Start of multi-line\n0\n1\n2\n3\n4\n5\n6\n7\n8\n9\n");
+        runner.clearTransferState();
+
+        // roll the file
+        raf.close();
+        file.renameTo(new File("target/log.1"));
+        raf = new RandomAccessFile(file, "rw");
+        raf.write(new byte[0]);
+
+        runner.run(1, shutdownBetweenReads, shutdownBetweenReads);
+        runner.assertAllFlowFilesTransferred(TailFile.REL_SUCCESS, 1);
+        final MockFlowFile finalOutputFile = runner.getFlowFilesForRelationship(TailFile.REL_SUCCESS).get(0);
+        finalOutputFile.assertContentEquals("<4>Last One\n");
+    }
+
+
+    @Test
     public void testRolloverAndUpdateAtSameTime() throws IOException {
         runner.setProperty(TailFile.ROLLING_FILENAME_PATTERN, "log.*");