Prompt: Write a sample Java code snippet that writes data in CSV format using Apache Beam.

Response: Your Apache Beam pipeline can write data to CSV files using the CsvIO connector. Here is an example of how to achieve this:


import org.apache.beam.sdk.Pipeline; import org.apache.beam.sdk.io.csv.CsvIO; import org.apache.beam.sdk.options.Description; import org.apache.beam.sdk.options.PipelineOptions; import org.apache.beam.sdk.options.PipelineOptionsFactory; import org.apache.beam.sdk.schemas.JavaFieldSchema; import org.apache.beam.sdk.schemas.annotations.DefaultSchema; import org.apache.beam.sdk.transforms.Create; import org.apache.beam.sdk.values.PCollection; import org.apache.commons.csv.CSVFormat; import java.io.Serializable; import java.util.Arrays; import java.util.List; public class WriteCsvFile { // ExampleRecord is a POJO that represents the data to be written to the CSV file @DefaultSchema(JavaFieldSchema.class) public static class ExampleRecord implements Serializable { public int id; public String month; public String amount; public ExampleRecord() { } public ExampleRecord(int id, String month, String amount) { this.id = id; this.month = month; this.amount = amount; } } public interface WriteCsvFileOptions extends PipelineOptions { @Description("A file path to write CSV files to") @Validation.Required String getFilePath(); void setFilePath(String filePath); } public static void main(String[] args) { WriteCsvFileOptions options = PipelineOptionsFactory.fromArgs(args) .withValidation().as(WriteCsvFileOptions.class); Pipeline p = Pipeline.create(options); List<ExampleRecord> rows = Arrays.asList( new ExampleRecord(1, "January", "$1000"), new ExampleRecord(2, "February", "$2000"), new ExampleRecord(3, "March", "$3000")); CSVFormat csvFormat = CSVFormat.DEFAULT.withHeaderComments("CSV file created by Apache Beam") .withCommentMarker('#'); p.apply("Create collection", Create.of(rows)) .apply( "Write to CSV file", CsvIO.<ExampleRecord>write() .to(options.getFilePath()) .withNumShards(1)); p.run(); } }