blob: 36df60de4f1f101ac368f8dc86c20bbc8852765a [file] [log] [blame]
/*
* Licensed to the Apache Software Foundation (ASF) under one
* or more contributor license agreements. See the NOTICE file
* distributed with this work for additional information
* regarding copyright ownership. The ASF licenses this file
* to you under the Apache License, Version 2.0 (the
* "License"); you may not use this file except in compliance
* with the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/
package org.apache.beam.runners.dataflow.worker;
import static com.google.api.client.util.Preconditions.checkNotNull;
import static org.apache.beam.runners.dataflow.worker.SourceTranslationUtils.cloudPositionToReaderPosition;
import static org.apache.beam.runners.dataflow.worker.SourceTranslationUtils.cloudProgressToReaderProgress;
import static org.apache.beam.runners.dataflow.worker.SourceTranslationUtils.splitRequestToApproximateSplitRequest;
import static org.apache.beam.vendor.guava.v26_0_jre.com.google.common.base.Preconditions.checkState;
import com.google.api.services.dataflow.model.ApproximateReportedProgress;
import com.google.api.services.dataflow.model.ApproximateSplitRequest;
import java.io.ByteArrayInputStream;
import java.io.Closeable;
import java.io.IOException;
import java.util.NoSuchElementException;
import java.util.concurrent.atomic.AtomicLong;
import javax.annotation.Nullable;
import org.apache.beam.runners.core.metrics.ExecutionStateTracker;
import org.apache.beam.runners.core.metrics.ExecutionStateTracker.ExecutionState;
import org.apache.beam.runners.dataflow.worker.ExperimentContext.Experiment;
import org.apache.beam.runners.dataflow.worker.counters.Counter;
import org.apache.beam.runners.dataflow.worker.counters.CounterName;
import org.apache.beam.runners.dataflow.worker.util.ValueInEmptyWindows;
import org.apache.beam.runners.dataflow.worker.util.common.worker.ByteArrayShufflePosition;
import org.apache.beam.runners.dataflow.worker.util.common.worker.GroupingShuffleEntryIterator;
import org.apache.beam.runners.dataflow.worker.util.common.worker.NativeReader;
import org.apache.beam.runners.dataflow.worker.util.common.worker.ShuffleEntry;
import org.apache.beam.runners.dataflow.worker.util.common.worker.ShuffleEntryReader;
import org.apache.beam.runners.dataflow.worker.util.common.worker.ShuffleReadCounter;
import org.apache.beam.runners.dataflow.worker.util.common.worker.ShuffleReadCounterFactory;
import org.apache.beam.sdk.coders.Coder;
import org.apache.beam.sdk.coders.IterableCoder;
import org.apache.beam.sdk.coders.KvCoder;
import org.apache.beam.sdk.options.PipelineOptions;
import org.apache.beam.sdk.util.CoderUtils;
import org.apache.beam.sdk.util.WindowedValue;
import org.apache.beam.sdk.util.WindowedValue.WindowedValueCoder;
import org.apache.beam.sdk.util.common.ElementByteSizeObservableIterable;
import org.apache.beam.sdk.util.common.ElementByteSizeObservableIterator;
import org.apache.beam.sdk.util.common.Reiterable;
import org.apache.beam.sdk.util.common.Reiterator;
import org.apache.beam.sdk.values.KV;
import org.apache.beam.vendor.guava.v26_0_jre.com.google.common.annotations.VisibleForTesting;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
/**
* A source that reads from a shuffled dataset and yields key-grouped data.
*
* @param <K> the type of the keys read from the shuffle
* @param <V> the type of the values read from the shuffle
*/
public class GroupingShuffleReader<K, V> extends NativeReader<WindowedValue<KV<K, Reiterable<V>>>> {
private static final Logger LOG = LoggerFactory.getLogger(GroupingShuffleReader.class);
private final PipelineOptions options;
final byte[] shuffleReaderConfig;
@Nullable final String startShufflePosition;
@Nullable final String stopShufflePosition;
final BatchModeExecutionContext executionContext;
private final DataflowOperationContext operationContext;
private final ShuffleReadCounterFactory shuffleReadCounterFactory;
// Counts how many bytes were from by a given operation from a given shuffle session.
@Nullable Counter<Long, Long> perOperationPerDatasetBytesCounter;
Coder<K> keyCoder;
Coder<?> valueCoder;
@Nullable Coder<?> secondaryKeyCoder;
public GroupingShuffleReader(
PipelineOptions options,
byte[] shuffleReaderConfig,
@Nullable String startShufflePosition,
@Nullable String stopShufflePosition,
Coder<WindowedValue<KV<K, Iterable<V>>>> coder,
BatchModeExecutionContext executionContext,
DataflowOperationContext operationContext,
ShuffleReadCounterFactory shuffleReadCounterFactory,
boolean valuesAreSorted)
throws Exception {
this.options = options;
this.shuffleReaderConfig = shuffleReaderConfig;
this.startShufflePosition = startShufflePosition;
this.stopShufflePosition = stopShufflePosition;
this.executionContext = executionContext;
this.operationContext = operationContext;
this.shuffleReadCounterFactory = shuffleReadCounterFactory;
initCoder(coder, valuesAreSorted);
// We cannot initialize perOperationPerDatasetBytesCounter here, as it
// depends on shuffleReaderConfig, which isn't populated yet.
}
private synchronized void initCounter(String datasetId) {
if (perOperationPerDatasetBytesCounter == null) {
perOperationPerDatasetBytesCounter =
operationContext
.counterFactory()
.longSum(
CounterName.named(
String.format(
"dax-shuffle-%s-wf-%s-read-bytes",
datasetId, operationContext.nameContext().systemName())));
}
}
@Override
public GroupingShuffleReaderIterator<K, V> iterator() throws IOException {
ApplianceShuffleEntryReader entryReader =
new ApplianceShuffleEntryReader(
shuffleReaderConfig, executionContext, operationContext, true);
initCounter(entryReader.getDatasetId());
return iterator(entryReader);
}
private void initCoder(Coder<WindowedValue<KV<K, Iterable<V>>>> coder, boolean valuesAreSorted)
throws Exception {
if (!(coder instanceof WindowedValueCoder)) {
throw new Exception("unexpected kind of coder for WindowedValue: " + coder);
}
Coder<KV<K, Iterable<V>>> elemCoder =
((WindowedValueCoder<KV<K, Iterable<V>>>) coder).getValueCoder();
if (!(elemCoder instanceof KvCoder)) {
throw new Exception(
"unexpected kind of coder for elements read from "
+ "a key-grouping shuffle: "
+ elemCoder);
}
@SuppressWarnings("unchecked")
KvCoder<K, Iterable<V>> kvCoder = (KvCoder<K, Iterable<V>>) elemCoder;
this.keyCoder = kvCoder.getKeyCoder();
Coder<Iterable<V>> kvValueCoder = kvCoder.getValueCoder();
if (!(kvValueCoder instanceof IterableCoder)) {
throw new Exception(
"unexpected kind of coder for values of KVs read from " + "a key-grouping shuffle");
}
IterableCoder<V> iterCoder = (IterableCoder<V>) kvValueCoder;
if (valuesAreSorted) {
checkState(
iterCoder.getElemCoder() instanceof KvCoder,
"unexpected kind of coder for elements read from a "
+ "key-grouping value sorting shuffle: %s",
iterCoder.getElemCoder());
@SuppressWarnings("rawtypes")
KvCoder<?, ?> valueKvCoder = (KvCoder) iterCoder.getElemCoder();
this.secondaryKeyCoder = valueKvCoder.getKeyCoder();
this.valueCoder = valueKvCoder.getValueCoder();
} else {
this.valueCoder = iterCoder.getElemCoder();
}
}
/**
* Creates an iterator on top of the given entry reader.
*
* <p>Takes "ownership" of the reader: closes the reader once the iterator is closed.
*/
final GroupingShuffleReaderIterator<K, V> iterator(ShuffleEntryReader reader) {
return new GroupingShuffleReaderIterator<K, V>(this, reader);
}
/**
* A ReaderIterator that reads from a ShuffleEntryReader and groups all the values with the same
* key.
*
* <p>A key limitation of this implementation is that all iterator accesses must by externally
* synchronized (the iterator objects are not individually thread-safe, and the iterators derived
* from a single original iterator access shared state that is not thread-safe).
*
* <p>To access the current position, the iterator must advance on-demand and cache the next batch
* of key grouped shuffle entries. The iterator does not advance a second time in @next() to avoid
* asking the underlying iterator to advance to the next key before the caller/user iterates over
* the values corresponding to the current key, which would introduce a performance penalty.
*/
@VisibleForTesting
static final class GroupingShuffleReaderIterator<K, V>
extends NativeReaderIterator<WindowedValue<KV<K, Reiterable<V>>>> {
// The enclosing GroupingShuffleReader.
private final GroupingShuffleReader<K, V> parentReader;
private final ShuffleEntryReader entryReader;
/** The iterator over shuffle entries, grouped by common key. */
private final GroupingShuffleEntryIterator groups;
private final AtomicLong currentGroupSize = new AtomicLong(0L);
private final ShuffleReadCounter shuffleReadCounter;
private ExecutionStateTracker tracker;
private ExecutionState readState;
private WindowedValue<KV<K, Reiterable<V>>> current;
public GroupingShuffleReaderIterator(
final GroupingShuffleReader<K, V> parentReader, ShuffleEntryReader entryReader) {
this.parentReader = parentReader;
this.entryReader = entryReader;
this.readState =
parentReader.operationContext.newExecutionState(ExecutionStateTracker.PROCESS_STATE_NAME);
this.tracker = parentReader.executionContext.getExecutionStateTracker();
String baseShuffleOriginalStepName =
parentReader.operationContext.nameContext().originalName();
// Note: Some usage of the GroupingShuffleReader may not set the
// shuffleReadCounterFactory.
ExperimentContext ec = ExperimentContext.parseFrom(parentReader.options);
if (parentReader.shuffleReadCounterFactory == null) {
this.shuffleReadCounter = null;
} else {
this.shuffleReadCounter =
parentReader.shuffleReadCounterFactory.create(
baseShuffleOriginalStepName,
ec.isEnabled(Experiment.IntertransformIO),
parentReader.perOperationPerDatasetBytesCounter);
}
try (Closeable read = tracker.enterState(readState)) {
this.groups =
new GroupingShuffleEntryIterator(
entryReader,
parentReader.startShufflePosition == null
? null
: ByteArrayShufflePosition.fromBase64(parentReader.startShufflePosition),
parentReader.stopShufflePosition == null
? null
: ByteArrayShufflePosition.fromBase64(parentReader.stopShufflePosition)) {
@Override
protected void notifyElementRead(long byteSize) {
// We accumulate the sum of bytes read in a local variable. This sum will be counted
// when the values are actually read by the consumer of the shuffle reader.
currentGroupSize.addAndGet(byteSize);
parentReader.notifyElementRead(byteSize);
}
@Override
protected void commitBytesRead(long bytes) {
if (shuffleReadCounter != null) {
shuffleReadCounter.addBytesRead(bytes);
}
}
};
} catch (IOException e) {
throw new RuntimeException(e);
}
}
@Override
public boolean start() throws IOException {
return advance();
}
@Override
public boolean advance() throws IOException {
try (Closeable read = tracker.enterState(readState)) {
if (!groups.advance()) {
current = null;
return false;
}
}
K key = CoderUtils.decodeFromByteArray(parentReader.keyCoder, groups.getCurrent().key);
parentReader.executionContext.setKey(key);
current =
new ValueInEmptyWindows<>(
KV.<K, Reiterable<V>>of(key, new ValuesIterable(groups.getCurrent().values)));
return true;
}
@Override
public WindowedValue<KV<K, Reiterable<V>>> getCurrent() throws NoSuchElementException {
if (current == null) {
throw new NoSuchElementException();
}
return current;
}
/**
* Returns the position before the next {@code KV<K, Reiterable<V>>} to be returned by the
* {@link GroupingShuffleReaderIterator}. Returns null if the {@link
* GroupingShuffleReaderIterator} is finished.
*/
@Override
public Progress getProgress() {
com.google.api.services.dataflow.model.Position position =
new com.google.api.services.dataflow.model.Position();
ApproximateReportedProgress progress = new ApproximateReportedProgress();
ByteArrayShufflePosition groupStart =
(ByteArrayShufflePosition) groups.getRangeTracker().getLastGroupStart();
if (groupStart != null) {
position.setShufflePosition(groupStart.encodeBase64());
progress.setPosition(position);
}
com.google.api.services.dataflow.model.ReportedParallelism consumedParallelism =
new com.google.api.services.dataflow.model.ReportedParallelism();
consumedParallelism.setValue((double) groups.getRangeTracker().getSplitPointsProcessed());
progress.setConsumedParallelism(consumedParallelism);
return cloudProgressToReaderProgress(progress);
}
/**
* Updates the stop position of the shuffle source to the position proposed. Ignores the
* proposed stop position if it is smaller than or equal to the position before the next {@code
* KV<K, Reiterable<V>>} to be returned by the {@link GroupingShuffleReaderIterator}.
*/
@Override
public DynamicSplitResult requestDynamicSplit(DynamicSplitRequest splitRequest) {
checkNotNull(splitRequest);
ApproximateSplitRequest splitProgress = splitRequestToApproximateSplitRequest(splitRequest);
com.google.api.services.dataflow.model.Position splitPosition = splitProgress.getPosition();
if (splitPosition == null) {
LOG.warn(
"GroupingShuffleReader only supports split at a Position. Requested: {}", splitRequest);
return null;
}
String splitShufflePosition = splitPosition.getShufflePosition();
if (splitShufflePosition == null) {
LOG.warn(
"GroupingShuffleReader only supports split at a shuffle position. Requested: {}",
splitPosition);
return null;
}
ByteArrayShufflePosition newStopPosition =
ByteArrayShufflePosition.fromBase64(splitShufflePosition);
if (groups.trySplitAtPosition(newStopPosition)) {
return new DynamicSplitResultWithPosition(cloudPositionToReaderPosition(splitPosition));
} else {
return null;
}
}
@Override
public void close() throws IOException {
entryReader.close();
}
/**
* Provides the {@link Reiterable} used to iterate through the values part of a {@code KV<K,
* Reiterable<V>>} entry produced by a {@link GroupingShuffleReader}.
*/
private final class ValuesIterable extends ElementByteSizeObservableIterable<V, ValuesIterator>
implements Reiterable<V> {
// N.B. This class is *not* static; it uses the valueCoder from
// its enclosing GroupingShuffleReader.
private final Reiterable<ShuffleEntry> base;
public ValuesIterable(Reiterable<ShuffleEntry> base) {
this.base = checkNotNull(base);
}
@Override
public ValuesIterator iterator() {
return new ValuesIterator(base.iterator());
}
@Override
protected ValuesIterator createIterator() {
return iterator();
}
}
/**
* Provides the {@link Reiterator} used to iterate through the values part of a {@code KV<K,
* Reiterable<V>>} entry produced by a {@link GroupingShuffleReader}.
*/
private final class ValuesIterator extends ElementByteSizeObservableIterator<V>
implements Reiterator<V> {
// N.B. This class is *not* static; it uses the valueCoder from
// its enclosing GroupingShuffleReader.
private final Reiterator<ShuffleEntry> base;
public ValuesIterator(Reiterator<ShuffleEntry> base) {
this.base = checkNotNull(base);
}
@Override
public boolean hasNext() {
try (Closeable read = tracker.enterState(readState)) {
return base.hasNext();
} catch (IOException e) {
throw new RuntimeException(e);
}
}
@Override
public V next() {
try (Closeable read = tracker.enterState(readState)) {
ShuffleEntry entry = base.next();
checkNotNull(entry);
// The shuffle entries are handed over to the consumer of this iterator. Therefore, we can
// notify the bytes that have been read so far.
notifyValueReturned(currentGroupSize.getAndSet(0L));
try {
if (parentReader.secondaryKeyCoder != null) {
ByteArrayInputStream bais = new ByteArrayInputStream(entry.getSecondaryKey());
@SuppressWarnings("unchecked")
V value =
(V)
KV.of(
// We ignore decoding the timestamp.
parentReader.secondaryKeyCoder.decode(bais),
CoderUtils.decodeFromByteArray(
parentReader.valueCoder, entry.getValue()));
return value;
} else {
@SuppressWarnings("unchecked")
V value =
(V) CoderUtils.decodeFromByteArray(parentReader.valueCoder, entry.getValue());
return value;
}
} catch (IOException exn) {
throw new RuntimeException(exn);
}
} catch (IOException e) {
throw new RuntimeException(e);
}
}
@Override
public void remove() {
base.remove();
}
@Override
public ValuesIterator copy() {
return new ValuesIterator(base.copy());
}
}
}
}