This page describes common file processing tasks. For more information on file-based I/O, see Pipeline I/O and File-based input and output data.
{{< language-switcher java py >}}
This section shows you how to process files as they arrive in your file system or object store (like Google Cloud Storage). You can continuously read files or trigger stream and processing pipelines when a file arrives.
{{< paragraph class=“language-java” >}} You can use FileIO or TextIO to continuously read the source for new files. {{< /paragraph >}}
{{< paragraph class=“language-java” >}} Use the FileIO class to continuously watch a single file pattern. The following example matches a file pattern repeatedly every 30 seconds, continuously returns new matched files as an unbounded PCollection<Metadata>, and stops if no new files appear for one hour: {{< /paragraph >}}
{{< highlight java >}} {{< code_sample “examples/java/src/main/java/org/apache/beam/examples/snippets/Snippets.java” FileProcessPatternProcessNewFilesSnip1 >}} {{< /highlight >}}
{{< paragraph class=“language-java” >}} The TextIO class watchForNewFiles property streams new file matches. {{< /paragraph >}}
{{< highlight java >}} {{< code_sample “examples/java/src/main/java/org/apache/beam/examples/snippets/Snippets.java” FileProcessPatternProcessNewFilesSnip2 >}} {{< /highlight >}}
{{< paragraph class=“language-java” >}} Some runners may retain file lists during updates, but file lists don’t persist when you restart a pipeline. You can save file lists by: {{< /paragraph >}}
{{< paragraph class=“language-java” >}}
{{< paragraph class=“language-py” >}} The continuous-read option is not available for Python. {{< /paragraph >}}
A streaming pipeline can process data from an unbounded source. For example, to trigger stream processing with Google Cloud Pub/Sub:
DoFn that follows the Google Cloud Pub/Sub source.To start or schedule a batch pipeline job when a file arrives, write the triggering event in the source file itself. This has the most latency because the pipeline must initialize before processing. It’s best suited for low-frequency, large, file-size updates.
{{< paragraph class=“language-java” >}} Use the FileIO class to read filenames in a pipeline job. FileIO returns a PCollection<ReadableFile> object, and the ReadableFile instance contains the filename. {{< /paragraph >}}
{{< paragraph class=“language-java” >}} To access filenames: {{< /paragraph >}}
{{< paragraph class=“language-java” >}}
ReadableFile instance with FileIO. FileIO returns a PCollection<ReadableFile> object. The ReadableFile class contains the filename.readFullyAsUTF8String() method to read the file into memory and return the filename as a String object. If memory is limited, you can use utility classes like FileSystems to work directly with the file. {{< /paragraph >}}{{< paragraph class=“language-py” >}} To read filenames in a pipeline job: {{< /paragraph >}}
{{< paragraph class=“language-py” >}}
FileSystems module to get a list of files that match a glob pattern.PCollection. {{< /paragraph >}}{{< highlight java >}} {{< code_sample “examples/java/src/main/java/org/apache/beam/examples/snippets/Snippets.java” FileProcessPatternAccessMetadataSnip1 >}} {{< /highlight >}}
{{< highlight py >}} {{< code_sample “sdks/python/apache_beam/examples/snippets/snippets.py” FileProcessPatternAccessMetadataSnip1 >}} {{< /highlight >}}