This page describes common file processing tasks. For more information on file-based I/O, see [Pipeline I/O]({{ site.baseurl }}/documentation/programming-guide/#pipeline-io) and [File-based input and output data]({{ site.baseurl }}/documentation/programming-guide/#file-based-data).
This section shows you how to process files as they arrive in your file system or object store (like Google Cloud Storage). You can continuously read files or trigger stream and processing pipelines when a file arrives.
{:.language-java} You can use FileIO
or TextIO
to continuously read the source for new files.
{:.language-java} Use the FileIO
class to continuously watch a single file pattern. The following example matches a file pattern repeatedly every 30 seconds, continuously returns new matched files as an unbounded PCollection<Metadata>
, and stops if no new files appear for one hour:
{% github_sample /apache/beam/blob/master/examples/java/src/main/java/org/apache/beam/examples/snippets/Snippets.java tag:FileProcessPatternProcessNewFilesSnip1 %}
{:.language-java} The TextIO
class watchForNewFiles
property streams new file matches.
{% github_sample /apache/beam/blob/master/examples/java/src/main/java/org/apache/beam/examples/snippets/Snippets.java tag:FileProcessPatternProcessNewFilesSnip2 %}
{:.language-java} Some runners may retain file lists during updates, but file lists don’t persist when you restart a pipeline. You can save file lists by:
{:.language-java}
{:.language-py} The continuous-read option is not available for Python.
A streaming pipeline can process data from an unbounded source. For example, to trigger stream processing with Google Cloud Pub/Sub:
DoFn
that follows the Google Cloud Pub/Sub source.To start or schedule a batch pipeline job when a file arrives, write the triggering event in the source file itself. This has the most latency because the pipeline must initialize before processing. It’s best suited for low-frequency, large, file-size updates.
{:.language-java} Use the FileIO
class to read filenames in a pipeline job. FileIO
returns a PCollection<ReadableFile>
object, and the ReadableFile
instance contains the filename.
{:.language-java} To access filenames:
{:.language-java}
ReadableFile
instance with FileIO
. FileIO
returns a PCollection<ReadableFile>
object. The ReadableFile
class contains the filename.readFullyAsUTF8String()
method to read the file into memory and return the filename as a String
object. If memory is limited, you can use utility classes like FileSystems
to work directly with the file.{:.language-py} To read filenames in a pipeline job:
{:.language-py}
FileSystems
module to get a list of files that match a glob pattern.PCollection
.{% github_sample /apache/beam/blob/master/examples/java/src/main/java/org/apache/beam/examples/snippets/Snippets.java tag:FileProcessPatternAccessMetadataSnip1 %}
{% github_sample /apache/beam/blob/master/sdks/python/apache_beam/examples/snippets/snippets.py tag:FileProcessPatternAccessMetadataSnip1 %}