blob: 45c3f2d0b7bd30d9f3a903febb09cf0e916db890 [file] [log] [blame] [view]
Prompt:
Write a sample Java code snippet that writes data to a Parquet file using Apache Beam.
Response:
Your Apache Beam pipeline can write data to Apache Parquet files using the ParquetIO connector. Here is an example of how to achieve this:
```java
package parquet;
import java.io.Serializable;
import java.util.Arrays;
import java.util.List;
import org.apache.avro.Schema;
import org.apache.avro.generic.GenericRecord;
import org.apache.avro.generic.GenericRecordBuilder;
import org.apache.avro.reflect.ReflectData;
import org.apache.beam.sdk.Pipeline;
import org.apache.beam.sdk.extensions.avro.coders.AvroCoder;
import org.apache.beam.sdk.io.FileIO;
import org.apache.beam.sdk.io.parquet.ParquetIO;
import org.apache.beam.sdk.options.Description;
import org.apache.beam.sdk.options.PipelineOptions;
import org.apache.beam.sdk.options.PipelineOptionsFactory;
import org.apache.beam.sdk.options.Validation;
import org.apache.beam.sdk.transforms.Create;
import org.apache.beam.sdk.transforms.MapElements;
import org.apache.beam.sdk.transforms.SimpleFunction;
public class WriteSampleParquetFile {
// Represents a DTO to be written to Parquet.
public static class SampleRecord implements Serializable {
public static final String COLUMN_ID = "id";
public static final String COLUMN_MONTH = "month";
public static final String COLUMN_AMOUNT = "amount";
private int id;
private String month;
private String amount;
public SampleRecord() {}
public SampleRecord(int id, String month, String amount) {
this.id = id;
this.month = month;
this.amount = amount;
}
public int getId() {
return id;
}
public String getMonth() {
return month;
}
public String getAmount() {
return amount;
}
}
// Pipeline options for writing to Parquet files.
public interface WriteSampleParquetFileOptions extends PipelineOptions {
@Description("A file path to write sample Parquet files to")
@Validation.Required
String getPath();
void setPath(String path);
}
public static void main(String[] args) {
WriteSampleParquetFileOptions options =
PipelineOptionsFactory.fromArgs(args).withValidation().as(WriteSampleParquetFileOptions.class);
// Get the Avro schema for the SampleRecord object.
Schema sampleRecordSchema = ReflectData.get().getSchema(SampleRecord.class);
// Create a pipeline.
Pipeline p = Pipeline.create(options);
// Create a list of SampleRecord objects.
List<SampleRecord> rows =
Arrays.asList(
new SampleRecord(1, "January", "$1000"),
new SampleRecord(2, "February", "$2000"),
new SampleRecord(3, "March", "$3000"));
// Apply the Create transform to the pipeline to create a PCollection from the list of SampleRecord objects.
p.apply("Create", Create.of(rows))
// Apply the MapElements transform to the pipeline to map the SampleRecord objects to GenericRecord objects.
.apply(
"Map Sample record to GenericRecord",
MapElements.via(new MapSampleRecordToGenericRecord(sampleRecordSchema)))
// Set the coder for the GenericRecord objects to AvroCoder.
.setCoder(AvroCoder.of(sampleRecordSchema))
// Apply the FileIO.write() transform to the pipeline to write the GenericRecord objects to a Parquet file.
.apply(
"Write Parquet file",
FileIO.<GenericRecord>write()
.via(ParquetIO.sink(sampleRecordSchema))
.to(options.getPath()));
// Run the pipeline.
p.run();
}
// A SimpleFunction that maps SampleRecord objects to GenericRecord objects.
private static class MapSampleRecordToGenericRecord
extends SimpleFunction<SampleRecord, GenericRecord> {
private final Schema schema;
public MapSampleRecordToGenericRecord(Schema schema) {
this.schema = schema;
}
// Maps a SampleRecord object to a GenericRecord object.
@Override
public GenericRecord apply(SampleRecord input) {
GenericRecordBuilder builder = new GenericRecordBuilder(schema);
builder
.set(SampleRecord.COLUMN_MONTH, input.getMonth())
.set(SampleRecord.COLUMN_AMOUNT, input.getAmount())
.set(SampleRecord.COLUMN_ID, input.getId());
return builder.build();
}
}
}
```
The `WriteSampleParquetFileOptions` class is utilized to define the command-line argument `--path`, specifying the path where the Parquet file should be written. This code uses pipeline options to define the required `path` argument.