| Prompt: |
| Write the Python code to read data from CSV files using Apache Beam. |
| |
| Response: |
| Your Apache Beam pipeline can read data from [CSV files](https://en.wikipedia.org/wiki/Comma-separated_values) using the [TextIO connector](https://beam.apache.org/releases/pydoc/current/apache_beam.io.textio.html). |
| |
| ```python |
| import logging |
| import apache_beam as beam |
| |
| from apache_beam import Map |
| from apache_beam.io.textio import ReadFromCsv |
| from apache_beam.options.pipeline_options import PipelineOptions |
| |
| |
| class CsvOptions(PipelineOptions): |
| @classmethod |
| def _add_argparse_args(cls, parser): |
| parser.add_argument( |
| '--file_path', |
| help='Csv file path' |
| ) |
| |
| |
| def run(): |
| """ |
| This pipeline shows how to read from Csv file. |
| """ |
| |
| options = CsvOptions() |
| |
| with beam.Pipeline(options=options) as p: |
| |
| output = (p | "Read from Csv file" >> ReadFromCsv( |
| path=options.file_path |
| ) |
| | "Log Data" >> Map(logging.info)) |
| |
| |
| if __name__ == "__main__": |
| logging.getLogger().setLevel(logging.INFO) |
| run() |
| ``` |
| |
| The `CsvOptions` class is a custom class that extends the `PipelineOptions` object. This code uses [pipeline options](https://beam.apache.org/documentation/patterns/pipeline-options/) to parse command-line arguments. |
| |
| The [`ReadFromCsv` transform](https://beam.apache.org/releases/pydoc/current/apache_beam.io.textio.html#apache_beam.io.textio.ReadFromCsv) is a built-in Apache Beam transform that reads data from a CSV file. |