| /* |
| * Licensed to the Apache Software Foundation (ASF) under one |
| * or more contributor license agreements. See the NOTICE file |
| * distributed with this work for additional information |
| * regarding copyright ownership. The ASF licenses this file |
| * to you under the Apache License, Version 2.0 (the |
| * "License"); you may not use this file except in compliance |
| * with the License. You may obtain a copy of the License at |
| * |
| * http://www.apache.org/licenses/LICENSE-2.0 |
| * |
| * Unless required by applicable law or agreed to in writing, software |
| * distributed under the License is distributed on an "AS IS" BASIS, |
| * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. |
| * See the License for the specific language governing permissions and |
| * limitations under the License. |
| */ |
| package org.apache.beam.sdk.io.parquet; |
| |
| import static org.apache.beam.vendor.guava.v26_0_jre.com.google.common.base.Preconditions.checkNotNull; |
| import static org.apache.parquet.hadoop.ParquetFileWriter.Mode.OVERWRITE; |
| |
| import com.google.auto.value.AutoValue; |
| import java.io.IOException; |
| import java.io.OutputStream; |
| import java.nio.channels.Channels; |
| import java.nio.channels.SeekableByteChannel; |
| import java.nio.channels.WritableByteChannel; |
| import javax.annotation.Nullable; |
| import org.apache.avro.Schema; |
| import org.apache.avro.generic.GenericRecord; |
| import org.apache.beam.sdk.annotations.Experimental; |
| import org.apache.beam.sdk.coders.AvroCoder; |
| import org.apache.beam.sdk.coders.StringUtf8Coder; |
| import org.apache.beam.sdk.io.FileIO; |
| import org.apache.beam.sdk.io.fs.ResourceId; |
| import org.apache.beam.sdk.options.ValueProvider; |
| import org.apache.beam.sdk.transforms.Create; |
| import org.apache.beam.sdk.transforms.DoFn; |
| import org.apache.beam.sdk.transforms.PTransform; |
| import org.apache.beam.sdk.transforms.ParDo; |
| import org.apache.beam.sdk.transforms.display.DisplayData; |
| import org.apache.beam.sdk.values.PBegin; |
| import org.apache.beam.sdk.values.PCollection; |
| import org.apache.parquet.avro.AvroParquetReader; |
| import org.apache.parquet.avro.AvroParquetWriter; |
| import org.apache.parquet.hadoop.ParquetReader; |
| import org.apache.parquet.hadoop.ParquetWriter; |
| import org.apache.parquet.hadoop.metadata.CompressionCodecName; |
| import org.apache.parquet.io.DelegatingSeekableInputStream; |
| import org.apache.parquet.io.InputFile; |
| import org.apache.parquet.io.OutputFile; |
| import org.apache.parquet.io.PositionOutputStream; |
| import org.apache.parquet.io.SeekableInputStream; |
| |
| /** |
| * IO to read and write Parquet files. |
| * |
| * <h3>Reading Parquet files</h3> |
| * |
| * <p>{@link ParquetIO} source returns a {@link PCollection} for Parquet files. The elements in the |
| * {@link PCollection} are Avro {@link GenericRecord}. |
| * |
| * <p>To configure the {@link Read}, you have to provide the file patterns (from) of the Parquet |
| * files and the schema. |
| * |
| * <p>For example: |
| * |
| * <pre>{@code |
| * PCollection<GenericRecord> records = pipeline.apply(ParquetIO.read(SCHEMA).from("/foo/bar")); |
| * ... |
| * }</pre> |
| * |
| * <p>As {@link Read} is based on {@link FileIO}, it supports any filesystem (hdfs, ...). |
| * |
| * <p>For more advanced use cases, like reading each file in a {@link PCollection} of {@link |
| * FileIO.ReadableFile}, use the {@link ReadFiles} transform. |
| * |
| * <p>For example: |
| * |
| * <pre>{@code |
| * PCollection<FileIO.ReadableFile> files = pipeline |
| * .apply(FileIO.match().filepattern(options.getInputFilepattern()) |
| * .apply(FileIO.readMatches()); |
| * |
| * PCollection<GenericRecord> output = files.apply(ParquetIO.readFiles(SCHEMA)); |
| * }</pre> |
| * |
| * <h3>Writing Parquet files</h3> |
| * |
| * <p>{@link ParquetIO.Sink} allows you to write a {@link PCollection} of {@link GenericRecord} into |
| * a Parquet file. It can be used with the general-purpose {@link FileIO} transforms with |
| * FileIO.write/writeDynamic specifically. |
| * |
| * <p>By default, {@link ParquetIO.Sink} produces output files that are compressed using the {@link |
| * org.apache.parquet.format.CompressionCodec#SNAPPY}. This default can be changed or overridden |
| * using {@link ParquetIO.Sink#withCompressionCodec(CompressionCodecName)}. |
| * |
| * <p>For example: |
| * |
| * <pre>{@code |
| * pipeline |
| * .apply(...) // PCollection<GenericRecord> |
| * .apply(FileIO |
| * .<GenericRecord>write() |
| * .via(ParquetIO.sink(SCHEMA) |
| * .withCompressionCodec(CompressionCodecName.SNAPPY)) |
| * .to("destination/path")) |
| * }</pre> |
| * |
| * <p>This IO API is considered experimental and may break or receive backwards-incompatible changes |
| * in future versions of the Apache Beam SDK. |
| * |
| * @see <a href="https://beam.apache.org/documentation/io/built-in/parquet/">Beam ParquetIO |
| * documentation</a> |
| */ |
| @Experimental(Experimental.Kind.SOURCE_SINK) |
| public class ParquetIO { |
| |
| /** |
| * Reads {@link GenericRecord} from a Parquet file (or multiple Parquet files matching the |
| * pattern). |
| */ |
| public static Read read(Schema schema) { |
| return new AutoValue_ParquetIO_Read.Builder().setSchema(schema).build(); |
| } |
| |
| /** |
| * Like {@link #read(Schema)}, but reads each file in a {@link PCollection} of {@link |
| * org.apache.beam.sdk.io.FileIO.ReadableFile}, which allows more flexible usage. |
| */ |
| public static ReadFiles readFiles(Schema schema) { |
| return new AutoValue_ParquetIO_ReadFiles.Builder().setSchema(schema).build(); |
| } |
| |
| /** Implementation of {@link #read(Schema)}. */ |
| @AutoValue |
| public abstract static class Read extends PTransform<PBegin, PCollection<GenericRecord>> { |
| |
| @Nullable |
| abstract ValueProvider<String> getFilepattern(); |
| |
| @Nullable |
| abstract Schema getSchema(); |
| |
| abstract Builder toBuilder(); |
| |
| @AutoValue.Builder |
| abstract static class Builder { |
| abstract Builder setFilepattern(ValueProvider<String> filepattern); |
| |
| abstract Builder setSchema(Schema schema); |
| |
| abstract Read build(); |
| } |
| |
| /** Reads from the given filename or filepattern. */ |
| public Read from(ValueProvider<String> filepattern) { |
| return toBuilder().setFilepattern(filepattern).build(); |
| } |
| |
| /** Like {@link #from(ValueProvider)}. */ |
| public Read from(String filepattern) { |
| return from(ValueProvider.StaticValueProvider.of(filepattern)); |
| } |
| |
| @Override |
| public PCollection<GenericRecord> expand(PBegin input) { |
| checkNotNull(getFilepattern(), "Filepattern cannot be null."); |
| |
| return input |
| .apply("Create filepattern", Create.ofProvider(getFilepattern(), StringUtf8Coder.of())) |
| .apply(FileIO.matchAll()) |
| .apply(FileIO.readMatches()) |
| .apply(readFiles(getSchema())); |
| } |
| |
| @Override |
| public void populateDisplayData(DisplayData.Builder builder) { |
| super.populateDisplayData(builder); |
| builder.add( |
| DisplayData.item("filePattern", getFilepattern()).withLabel("Input File Pattern")); |
| } |
| } |
| |
| /** Implementation of {@link #readFiles(Schema)}. */ |
| @AutoValue |
| public abstract static class ReadFiles |
| extends PTransform<PCollection<FileIO.ReadableFile>, PCollection<GenericRecord>> { |
| |
| @Nullable |
| abstract Schema getSchema(); |
| |
| @AutoValue.Builder |
| abstract static class Builder { |
| abstract Builder setSchema(Schema schema); |
| |
| abstract ReadFiles build(); |
| } |
| |
| @Override |
| public PCollection<GenericRecord> expand(PCollection<FileIO.ReadableFile> input) { |
| checkNotNull(getSchema(), "Schema can not be null"); |
| return input.apply(ParDo.of(new ReadFn())).setCoder(AvroCoder.of(getSchema())); |
| } |
| |
| static class ReadFn extends DoFn<FileIO.ReadableFile, GenericRecord> { |
| |
| @ProcessElement |
| public void processElement(ProcessContext processContext) throws Exception { |
| FileIO.ReadableFile file = processContext.element(); |
| |
| if (!file.getMetadata().isReadSeekEfficient()) { |
| ResourceId filename = file.getMetadata().resourceId(); |
| throw new RuntimeException(String.format("File has to be seekable: %s", filename)); |
| } |
| |
| SeekableByteChannel seekableByteChannel = file.openSeekable(); |
| |
| try (ParquetReader<GenericRecord> reader = |
| AvroParquetReader.<GenericRecord>builder(new BeamParquetInputFile(seekableByteChannel)) |
| .build()) { |
| GenericRecord read; |
| while ((read = reader.read()) != null) { |
| processContext.output(read); |
| } |
| } |
| } |
| } |
| |
| private static class BeamParquetInputFile implements InputFile { |
| |
| private SeekableByteChannel seekableByteChannel; |
| |
| BeamParquetInputFile(SeekableByteChannel seekableByteChannel) { |
| this.seekableByteChannel = seekableByteChannel; |
| } |
| |
| @Override |
| public long getLength() throws IOException { |
| return seekableByteChannel.size(); |
| } |
| |
| @Override |
| public SeekableInputStream newStream() { |
| return new DelegatingSeekableInputStream(Channels.newInputStream(seekableByteChannel)) { |
| |
| @Override |
| public long getPos() throws IOException { |
| return seekableByteChannel.position(); |
| } |
| |
| @Override |
| public void seek(long newPos) throws IOException { |
| seekableByteChannel.position(newPos); |
| } |
| }; |
| } |
| } |
| } |
| |
| /** Creates a {@link Sink} that, for use with {@link FileIO#write}. */ |
| public static Sink sink(Schema schema) { |
| return new AutoValue_ParquetIO_Sink.Builder() |
| .setJsonSchema(schema.toString()) |
| .setCompressionCodec(CompressionCodecName.SNAPPY) |
| .build(); |
| } |
| |
| /** Implementation of {@link #sink}. */ |
| @AutoValue |
| public abstract static class Sink implements FileIO.Sink<GenericRecord> { |
| |
| @Nullable |
| abstract String getJsonSchema(); |
| |
| abstract CompressionCodecName getCompressionCodec(); |
| |
| abstract Builder toBuilder(); |
| |
| @AutoValue.Builder |
| abstract static class Builder { |
| abstract Builder setJsonSchema(String jsonSchema); |
| |
| abstract Builder setCompressionCodec(CompressionCodecName compressionCodec); |
| |
| abstract Sink build(); |
| } |
| |
| /** Specifies compression codec. By default, CompressionCodecName.SNAPPY. */ |
| public Sink withCompressionCodec(CompressionCodecName compressionCodecName) { |
| return toBuilder().setCompressionCodec(compressionCodecName).build(); |
| } |
| |
| @Nullable private transient ParquetWriter<GenericRecord> writer; |
| |
| @Override |
| public void open(WritableByteChannel channel) throws IOException { |
| checkNotNull(getJsonSchema(), "Schema cannot be null"); |
| |
| Schema schema = new Schema.Parser().parse(getJsonSchema()); |
| |
| BeamParquetOutputFile beamParquetOutputFile = |
| new BeamParquetOutputFile(Channels.newOutputStream(channel)); |
| |
| this.writer = |
| AvroParquetWriter.<GenericRecord>builder(beamParquetOutputFile) |
| .withSchema(schema) |
| .withCompressionCodec(getCompressionCodec()) |
| .withWriteMode(OVERWRITE) |
| .build(); |
| } |
| |
| @Override |
| public void write(GenericRecord element) throws IOException { |
| checkNotNull(writer, "Writer cannot be null"); |
| writer.write(element); |
| } |
| |
| @Override |
| public void flush() throws IOException { |
| // the only way to completely flush the output is to call writer.close() here |
| writer.close(); |
| } |
| |
| private static class BeamParquetOutputFile implements OutputFile { |
| |
| private OutputStream outputStream; |
| |
| BeamParquetOutputFile(OutputStream outputStream) { |
| this.outputStream = outputStream; |
| } |
| |
| @Override |
| public PositionOutputStream create(long blockSizeHint) { |
| return new BeamOutputStream(outputStream); |
| } |
| |
| @Override |
| public PositionOutputStream createOrOverwrite(long blockSizeHint) { |
| return new BeamOutputStream(outputStream); |
| } |
| |
| @Override |
| public boolean supportsBlockSize() { |
| return false; |
| } |
| |
| @Override |
| public long defaultBlockSize() { |
| return 0; |
| } |
| } |
| |
| private static class BeamOutputStream extends PositionOutputStream { |
| private long position = 0; |
| private OutputStream outputStream; |
| |
| private BeamOutputStream(OutputStream outputStream) { |
| this.outputStream = outputStream; |
| } |
| |
| @Override |
| public long getPos() throws IOException { |
| return position; |
| } |
| |
| @Override |
| public void write(int b) throws IOException { |
| position++; |
| outputStream.write(b); |
| } |
| |
| @Override |
| public void write(byte[] b) throws IOException { |
| write(b, 0, b.length); |
| } |
| |
| @Override |
| public void write(byte[] b, int off, int len) throws IOException { |
| outputStream.write(b, off, len); |
| position += len; |
| } |
| |
| @Override |
| public void flush() throws IOException { |
| outputStream.flush(); |
| } |
| |
| @Override |
| public void close() throws IOException { |
| outputStream.close(); |
| } |
| } |
| } |
| |
| /** Disallow construction of utility class. */ |
| private ParquetIO() {} |
| } |