| Prompt: |
| Write a sample Java code snippet that writes data in CSV format using Apache Beam. |
| |
| Response: |
| Your Apache Beam pipeline can write data to CSV files using the CsvIO connector. Here is an example of how to achieve this: |
| |
| ```java |
| |
| import org.apache.beam.sdk.Pipeline; |
| import org.apache.beam.sdk.io.csv.CsvIO; |
| import org.apache.beam.sdk.options.Description; |
| import org.apache.beam.sdk.options.PipelineOptions; |
| import org.apache.beam.sdk.options.PipelineOptionsFactory; |
| import org.apache.beam.sdk.schemas.JavaFieldSchema; |
| import org.apache.beam.sdk.schemas.annotations.DefaultSchema; |
| import org.apache.beam.sdk.transforms.Create; |
| import org.apache.beam.sdk.values.PCollection; |
| import org.apache.commons.csv.CSVFormat; |
| |
| import java.io.Serializable; |
| import java.util.Arrays; |
| import java.util.List; |
| |
| public class WriteCsvFile { |
| |
| // ExampleRecord is a POJO that represents the data to be written to the CSV file |
| @DefaultSchema(JavaFieldSchema.class) |
| public static class ExampleRecord implements Serializable { |
| public int id; |
| public String month; |
| public String amount; |
| |
| public ExampleRecord() { |
| } |
| |
| public ExampleRecord(int id, String month, String amount) { |
| this.id = id; |
| this.month = month; |
| this.amount = amount; |
| } |
| } |
| |
| public interface WriteCsvFileOptions extends PipelineOptions { |
| |
| @Description("A file path to write CSV files to") |
| @Validation.Required |
| String getFilePath(); |
| |
| void setFilePath(String filePath); |
| } |
| |
| public static void main(String[] args) { |
| WriteCsvFileOptions options = |
| PipelineOptionsFactory.fromArgs(args) |
| .withValidation().as(WriteCsvFileOptions.class); |
| |
| Pipeline p = Pipeline.create(options); |
| |
| List<ExampleRecord> rows = |
| Arrays.asList( |
| new ExampleRecord(1, "January", "$1000"), |
| new ExampleRecord(2, "February", "$2000"), |
| new ExampleRecord(3, "March", "$3000")); |
| |
| CSVFormat csvFormat = |
| CSVFormat.DEFAULT.withHeaderComments("CSV file created by Apache Beam") |
| .withCommentMarker('#'); |
| |
| p.apply("Create collection", Create.of(rows)) |
| .apply( |
| "Write to CSV file", |
| CsvIO.<ExampleRecord>write() |
| .to(options.getFilePath()) |
| .withNumShards(1)); |
| p.run(); |
| } |
| } |
| ``` |