| /* |
| * Licensed to the Apache Software Foundation (ASF) under one |
| * or more contributor license agreements. See the NOTICE file |
| * distributed with this work for additional information |
| * regarding copyright ownership. The ASF licenses this file |
| * to you under the Apache License, Version 2.0 (the |
| * "License"); you may not use this file except in compliance |
| * with the License. You may obtain a copy of the License at |
| * |
| * http://www.apache.org/licenses/LICENSE-2.0 |
| * |
| * Unless required by applicable law or agreed to in writing, software |
| * distributed under the License is distributed on an "AS IS" BASIS, |
| * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. |
| * See the License for the specific language governing permissions and |
| * limitations under the License. |
| */ |
| package org.apache.beam.runners.dataflow.worker; |
| |
| import static org.apache.beam.vendor.guava.v26_0_jre.com.google.common.base.Preconditions.checkArgument; |
| |
| import com.google.api.services.dataflow.model.ApproximateReportedProgress; |
| import com.google.api.services.dataflow.model.ApproximateSplitRequest; |
| import com.google.api.services.dataflow.model.ReportedParallelism; |
| import java.io.IOException; |
| import java.nio.ByteBuffer; |
| import java.util.NoSuchElementException; |
| import java.util.Optional; |
| import org.apache.avro.Schema; |
| import org.apache.beam.runners.dataflow.worker.util.common.worker.NativeReader; |
| import org.apache.beam.sdk.coders.Coder; |
| import org.apache.beam.sdk.io.AvroSource; |
| import org.apache.beam.sdk.io.AvroSource.AvroReader; |
| import org.apache.beam.sdk.io.BoundedSource; |
| import org.apache.beam.sdk.io.FileSystems; |
| import org.apache.beam.sdk.io.OffsetBasedSource; |
| import org.apache.beam.sdk.options.PipelineOptions; |
| import org.apache.beam.sdk.util.CoderUtils; |
| import org.slf4j.Logger; |
| import org.slf4j.LoggerFactory; |
| |
| /** |
| * A source that reads PCollections that have been materialized as Avro files. Records are read from |
| * the Avro file as a series of byte arrays. The coder provided is used to deserialize each record |
| * from a byte array. |
| * |
| * @param <T> the type of the elements read from the source |
| */ |
| public class AvroByteReader<T> extends NativeReader<T> { |
| private static final Logger LOG = LoggerFactory.getLogger(AvroByteReader.class); |
| |
| final long startPosition; |
| final long endPosition; |
| final String filename; |
| final AvroSource<ByteBuffer> avroSource; |
| final PipelineOptions options; |
| |
| final Coder<T> coder; |
| private final Schema schema = Schema.create(Schema.Type.BYTES); |
| |
| AvroByteReader( |
| String filename, |
| long startPosition, |
| long endPosition, |
| Coder<T> coder, |
| PipelineOptions options) { |
| checkArgument(filename != null, "filename must not be null"); |
| checkArgument(coder != null, "coder must not be null"); |
| checkArgument(options != null, "options must not be null"); |
| this.filename = filename; |
| this.startPosition = startPosition; |
| this.endPosition = endPosition; |
| this.coder = coder; |
| this.options = options; |
| this.avroSource = |
| (AvroSource<ByteBuffer>) ((AvroSource) AvroSource.from(filename).withSchema(schema)); |
| } |
| |
| @Override |
| public AvroByteFileIterator iterator() throws IOException { |
| BoundedSource.BoundedReader<ByteBuffer> reader; |
| if (startPosition == 0 && endPosition == Long.MAX_VALUE) { |
| // Read entire file (or collection of files). |
| reader = avroSource.createReader(options); |
| } else { |
| // Read a subrange of file. |
| reader = |
| avroSource |
| .createForSubrangeOfFile( |
| FileSystems.matchSingleFileSpec(filename), startPosition, endPosition) |
| .createReader(options); |
| } |
| return new AvroByteFileIterator((AvroReader<ByteBuffer>) reader); |
| } |
| |
| class AvroByteFileIterator extends NativeReaderIterator<T> { |
| private final AvroReader<ByteBuffer> reader; |
| private Optional<T> current; |
| |
| public AvroByteFileIterator(AvroReader<ByteBuffer> reader) { |
| this.reader = reader; |
| } |
| |
| @Override |
| public boolean start() throws IOException { |
| if (!reader.start()) { |
| current = Optional.empty(); |
| return false; |
| } |
| updateCurrent(); |
| return true; |
| } |
| |
| @Override |
| public boolean advance() throws IOException { |
| if (!reader.advance()) { |
| current = Optional.empty(); |
| return false; |
| } |
| updateCurrent(); |
| return true; |
| } |
| |
| private void updateCurrent() throws IOException { |
| ByteBuffer inBuffer = reader.getCurrent(); |
| notifyElementRead(inBuffer.remaining()); |
| byte[] encodedElem = new byte[inBuffer.remaining()]; |
| inBuffer.get(encodedElem); |
| current = Optional.of(CoderUtils.decodeFromByteArray(coder, encodedElem)); |
| } |
| |
| @Override |
| public T getCurrent() throws NoSuchElementException { |
| return current.get(); |
| } |
| |
| @Override |
| public Progress getProgress() { |
| Double readerProgress = reader.getFractionConsumed(); |
| if (readerProgress == null) { |
| return null; |
| } |
| ApproximateReportedProgress progress = new ApproximateReportedProgress(); |
| progress.setFractionConsumed(readerProgress); |
| double consumedParallelism = reader.getSplitPointsConsumed(); |
| double remainingParallelism = reader.getSplitPointsRemaining(); |
| progress.setConsumedParallelism(new ReportedParallelism().setValue(consumedParallelism)); |
| if (remainingParallelism >= 0) { |
| progress.setRemainingParallelism(new ReportedParallelism().setValue(remainingParallelism)); |
| } |
| return SourceTranslationUtils.cloudProgressToReaderProgress(progress); |
| } |
| |
| @Override |
| public void close() throws IOException { |
| reader.close(); |
| } |
| |
| @Override |
| public DynamicSplitResult requestDynamicSplit(DynamicSplitRequest splitRequest) { |
| ApproximateSplitRequest splitProgress = |
| SourceTranslationUtils.splitRequestToApproximateSplitRequest(splitRequest); |
| double splitAtFraction = splitProgress.getFractionConsumed(); |
| LOG.info("Received request for dynamic split at {}", splitAtFraction); |
| OffsetBasedSource<ByteBuffer> residual = reader.splitAtFraction(splitAtFraction); |
| if (residual == null) { |
| LOG.info("Rejected split request for split at {}", splitAtFraction); |
| return null; |
| } |
| com.google.api.services.dataflow.model.Position acceptedPosition = |
| new com.google.api.services.dataflow.model.Position(); |
| acceptedPosition.setByteOffset(residual.getStartOffset()); |
| LOG.info( |
| "Accepted split for position {} which resulted in a new source with byte offset {}", |
| splitAtFraction, |
| residual.getStartOffset()); |
| return new DynamicSplitResultWithPosition( |
| SourceTranslationUtils.cloudPositionToReaderPosition(acceptedPosition)); |
| } |
| } |
| } |