| /* |
| * Licensed to the Apache Software Foundation (ASF) under one |
| * or more contributor license agreements. See the NOTICE file |
| * distributed with this work for additional information |
| * regarding copyright ownership. The ASF licenses this file |
| * to you under the Apache License, Version 2.0 (the |
| * "License"); you may not use this file except in compliance |
| * with the License. You may obtain a copy of the License at |
| * |
| * http://www.apache.org/licenses/LICENSE-2.0 |
| * |
| * Unless required by applicable law or agreed to in writing, software |
| * distributed under the License is distributed on an "AS IS" BASIS, |
| * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. |
| * See the License for the specific language governing permissions and |
| * limitations under the License. |
| */ |
| package org.apache.beam.runners.dataflow.worker; |
| |
| import java.io.IOException; |
| import java.util.Iterator; |
| import java.util.NoSuchElementException; |
| import javax.annotation.Nullable; |
| import org.apache.beam.runners.dataflow.worker.util.common.worker.ByteArrayShufflePosition; |
| import org.apache.beam.runners.dataflow.worker.util.common.worker.NativeReader; |
| import org.apache.beam.runners.dataflow.worker.util.common.worker.ShuffleEntry; |
| import org.apache.beam.runners.dataflow.worker.util.common.worker.ShuffleEntryReader; |
| import org.apache.beam.sdk.coders.Coder; |
| import org.apache.beam.sdk.options.PipelineOptions; |
| import org.apache.beam.sdk.util.CoderUtils; |
| import org.apache.beam.vendor.guava.v26_0_jre.com.google.common.annotations.VisibleForTesting; |
| |
| /** |
| * A source that reads from a shuffled dataset, without any key grouping. Returns just the values. |
| * (This reader is for an UNGROUPED shuffle session.) |
| * |
| * @param <T> the type of the elements read from the source |
| */ |
| public class UngroupedShuffleReader<T> extends NativeReader<T> { |
| final byte[] shuffleReaderConfig; |
| final String startShufflePosition; |
| final String stopShufflePosition; |
| final Coder<T> coder; |
| private final BatchModeExecutionContext executionContext; |
| private final DataflowOperationContext operationContext; |
| |
| public UngroupedShuffleReader( |
| @SuppressWarnings("unused") PipelineOptions options, |
| byte[] shuffleReaderConfig, |
| @Nullable String startShufflePosition, |
| @Nullable String stopShufflePosition, |
| Coder<T> coder, |
| BatchModeExecutionContext executionContext, |
| DataflowOperationContext operationContext) { |
| this.shuffleReaderConfig = shuffleReaderConfig; |
| this.startShufflePosition = startShufflePosition; |
| this.stopShufflePosition = stopShufflePosition; |
| this.coder = coder; |
| this.executionContext = executionContext; |
| this.operationContext = operationContext; |
| } |
| |
| @Override |
| public NativeReaderIterator<T> iterator() throws IOException { |
| return iterator( |
| new ApplianceShuffleEntryReader( |
| shuffleReaderConfig, executionContext, operationContext, false /* no caching */)); |
| } |
| |
| /** |
| * Creates an iterator on top of the given entry reader. |
| * |
| * <p>Takes "ownership" of the reader: closes the reader once the iterator is closed. |
| */ |
| UngroupedShuffleReaderIterator<T> iterator(ShuffleEntryReader reader) { |
| return new UngroupedShuffleReaderIterator<>(this, reader); |
| } |
| |
| /** A ReaderIterator that reads from a ShuffleEntryReader and extracts just the values. */ |
| @VisibleForTesting |
| static class UngroupedShuffleReaderIterator<T> extends NativeReaderIterator<T> { |
| private Iterator<ShuffleEntry> iterator; |
| private T current; |
| private UngroupedShuffleReader<T> shuffleReader; |
| private final ShuffleEntryReader entryReader; |
| |
| UngroupedShuffleReaderIterator( |
| UngroupedShuffleReader<T> shuffleReader, ShuffleEntryReader entryReader) { |
| this.iterator = |
| entryReader.read( |
| shuffleReader.startShufflePosition == null |
| ? null |
| : ByteArrayShufflePosition.fromBase64(shuffleReader.startShufflePosition), |
| shuffleReader.stopShufflePosition == null |
| ? null |
| : ByteArrayShufflePosition.fromBase64(shuffleReader.stopShufflePosition)); |
| this.shuffleReader = shuffleReader; |
| this.entryReader = entryReader; |
| } |
| |
| @Override |
| public boolean start() throws IOException { |
| return advance(); |
| } |
| |
| @Override |
| public boolean advance() throws IOException { |
| if (!iterator.hasNext()) { |
| current = null; |
| return false; |
| } |
| ShuffleEntry record = iterator.next(); |
| // Throw away the primary and the secondary keys. |
| byte[] value = record.getValue(); |
| shuffleReader.notifyElementRead(record.length()); |
| current = CoderUtils.decodeFromByteArray(shuffleReader.coder, value); |
| return true; |
| } |
| |
| @Override |
| public T getCurrent() throws NoSuchElementException { |
| if (current == null) { |
| throw new NoSuchElementException(); |
| } |
| return current; |
| } |
| |
| @Override |
| public void close() throws IOException { |
| entryReader.close(); |
| } |
| } |
| } |