blob: f87c82aa0b981afa51f8b01e0dfbbd7218a8b28c [file] [log] [blame] [view]
Prompt:
Write a sample Java code snippet that writes data in CSV format using Apache Beam.
Response:
Your Apache Beam pipeline can write data to CSV files using the CsvIO connector. Here is an example of how to achieve this:
```java
import org.apache.beam.sdk.Pipeline;
import org.apache.beam.sdk.io.csv.CsvIO;
import org.apache.beam.sdk.options.Description;
import org.apache.beam.sdk.options.PipelineOptions;
import org.apache.beam.sdk.options.PipelineOptionsFactory;
import org.apache.beam.sdk.schemas.JavaFieldSchema;
import org.apache.beam.sdk.schemas.annotations.DefaultSchema;
import org.apache.beam.sdk.transforms.Create;
import org.apache.beam.sdk.values.PCollection;
import org.apache.commons.csv.CSVFormat;
import java.io.Serializable;
import java.util.Arrays;
import java.util.List;
public class WriteCsvFile {
// ExampleRecord is a POJO that represents the data to be written to the CSV file
@DefaultSchema(JavaFieldSchema.class)
public static class ExampleRecord implements Serializable {
public int id;
public String month;
public String amount;
public ExampleRecord() {
}
public ExampleRecord(int id, String month, String amount) {
this.id = id;
this.month = month;
this.amount = amount;
}
}
public interface WriteCsvFileOptions extends PipelineOptions {
@Description("A file path to write CSV files to")
@Validation.Required
String getFilePath();
void setFilePath(String filePath);
}
public static void main(String[] args) {
WriteCsvFileOptions options =
PipelineOptionsFactory.fromArgs(args)
.withValidation().as(WriteCsvFileOptions.class);
Pipeline p = Pipeline.create(options);
List<ExampleRecord> rows =
Arrays.asList(
new ExampleRecord(1, "January", "$1000"),
new ExampleRecord(2, "February", "$2000"),
new ExampleRecord(3, "March", "$3000"));
CSVFormat csvFormat =
CSVFormat.DEFAULT.withHeaderComments("CSV file created by Apache Beam")
.withCommentMarker('#');
p.apply("Create collection", Create.of(rows))
.apply(
"Write to CSV file",
CsvIO.<ExampleRecord>write()
.to(options.getFilePath())
.withNumShards(1));
p.run();
}
}
```