| /* |
| * Licensed to the Apache Software Foundation (ASF) under one |
| * or more contributor license agreements. See the NOTICE file |
| * distributed with this work for additional information |
| * regarding copyright ownership. The ASF licenses this file |
| * to you under the Apache License, Version 2.0 (the |
| * "License"); you may not use this file except in compliance |
| * with the License. You may obtain a copy of the License at |
| * |
| * http://www.apache.org/licenses/LICENSE-2.0 |
| * |
| * Unless required by applicable law or agreed to in writing, software |
| * distributed under the License is distributed on an "AS IS" BASIS, |
| * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. |
| * See the License for the specific language governing permissions and |
| * limitations under the License. |
| */ |
| package org.apache.beam.runners.dataflow.worker.util.common.worker; |
| |
| import static org.apache.beam.vendor.guava.v20_0.com.google.common.base.Preconditions.checkNotNull; |
| import static org.apache.beam.vendor.guava.v20_0.com.google.common.base.Preconditions.checkState; |
| |
| import java.util.Arrays; |
| import java.util.NoSuchElementException; |
| import javax.annotation.Nullable; |
| import org.apache.beam.sdk.util.common.ElementByteSizeObservableIterable; |
| import org.apache.beam.sdk.util.common.ElementByteSizeObservableIterator; |
| import org.apache.beam.sdk.util.common.Reiterable; |
| import org.apache.beam.sdk.util.common.Reiterator; |
| import org.slf4j.Logger; |
| import org.slf4j.LoggerFactory; |
| |
| /** |
| * An iterator through KeyGroupedShuffleEntries. |
| * |
| * <p>Much like a {@link NativeReader.NativeReaderIterator}, but without {@code start()}, and not |
| * used via the interface of that class, hence doesn't inherit it. |
| */ |
| public abstract class GroupingShuffleEntryIterator { |
| private static final Logger LOG = LoggerFactory.getLogger(GroupingShuffleEntryIterator.class); |
| |
| /** The iterator through the underlying shuffle records. */ |
| private Reiterator<ShuffleEntry> shuffleIterator; |
| |
| private final GroupingShuffleRangeTracker rangeTracker; |
| |
| /** |
| * The key of the most recent KeyGroupedShuffleEntries returned by {@link #advance}, if any. |
| * |
| * <p>If currentKeyBytes is non-null, then it's the key for the last entry returned by {@link |
| * #advance}, and all incoming entries with that key should be skipped over by this iterator |
| * (since this iterator is iterating over keys, not the individual values associated with a given |
| * key). |
| * |
| * <p>If currentKeyBytes is null, and shuffleIterator.hasNext(), then the key of |
| * shuffleIterator.next() is the key of the next KeyGroupedShuffleEntries to return via {@link |
| * #advance}/{@link #getCurrent}. |
| */ |
| @Nullable private byte[] currentKeyBytes = null; |
| |
| private ShufflePosition lastGroupStart; |
| |
| /** |
| * The size of the shuffle entries read so far for the current key (if currentKeyBytes is |
| * non-null), or the previous key (if currentKeyBytes is null). |
| */ |
| private long totalByteSizeOfEntriesForCurrentKey = 0L; |
| |
| private KeyGroupedShuffleEntries current = null; |
| private boolean isFinished = false; |
| |
| /** |
| * Constructs a GroupingShuffleEntryIterator, given a Reiterator over ungrouped ShuffleEntries, |
| * assuming the ungrouped ShuffleEntries for a given key are consecutive. The counter given as |
| * argument, if non-null, will be updated with the byte size of the entries read. |
| */ |
| public GroupingShuffleEntryIterator( |
| ShuffleEntryReader entryReader, ShufflePosition startPosition, ShufflePosition stopPosition) { |
| this.rangeTracker = new GroupingShuffleRangeTracker(startPosition, stopPosition); |
| this.shuffleIterator = |
| new ProgressTrackingReiterator<>( |
| entryReader.read(startPosition, stopPosition), |
| new ProgressTrackerGroup<ShuffleEntry>() { |
| @Override |
| protected void report(ShuffleEntry entry) { |
| notifyElementRead(entry.length()); |
| } |
| }.start()); |
| } |
| |
| /** |
| * Notifies observers about a new ShuffleEntry (key and value, not key and iterable of values) |
| * read. |
| */ |
| protected abstract void notifyElementRead(long byteSize); |
| |
| /** Notifies observers about the bytes read that are to be committed to the counters. */ |
| protected abstract void commitBytesRead(long bytesRead); |
| |
| public boolean advance() { |
| if (isFinished) { |
| return false; |
| } |
| Reiterator<ShuffleEntry> atCurrentEntry; |
| ShuffleEntry entry; |
| // Skip entries with the same key - we're an iterator over keys, and advancement through |
| // values of the same key happens through ValuesIterable instead. |
| while (true) { |
| if (!shuffleIterator.hasNext()) { |
| return markFinishedAndCommitBytesCounter(); |
| } |
| // Save an iterator to the current entry, in case it's in a different key |
| // than the previous entry. |
| // If it's in the same key, we just skip it; if it's a new key, we'll need to |
| // start ValuesIterable below from this entry. |
| atCurrentEntry = shuffleIterator.copy(); |
| entry = shuffleIterator.next(); |
| if (!Arrays.equals(entry.getKey(), currentKeyBytes)) { |
| break; |
| } |
| // Note: we can get here only if the ValuesIterable of the preceding key has NOT been |
| // read fully. If it had, then the "fast forward" code path in ValuesIterable |
| // would have triggered instead, and would prevent this path by setting currentKeyBytes |
| // to null, and we'd have exited the loop via the inequality check above. |
| totalByteSizeOfEntriesForCurrentKey += entry.length(); |
| } |
| |
| // Now "atCurrentEntry" points to "entry", and "entry" is the first entry in the next key. |
| ShufflePosition groupStart = entry.getPosition(); |
| boolean isAtSplitPoint = (groupStart == null) || !groupStart.equals(lastGroupStart); |
| if (!rangeTracker.tryReturnRecordAt(isAtSplitPoint, groupStart)) { |
| return markFinishedAndCommitBytesCounter(); |
| } |
| lastGroupStart = groupStart; |
| |
| commitBytesRead(totalByteSizeOfEntriesForCurrentKey); |
| totalByteSizeOfEntriesForCurrentKey = entry.length(); |
| |
| currentKeyBytes = entry.getKey(); |
| current = |
| new KeyGroupedShuffleEntries( |
| entry.getPosition(), |
| currentKeyBytes, |
| new ValuesIterable(this, currentKeyBytes, atCurrentEntry)); |
| |
| return true; |
| } |
| |
| private boolean markFinishedAndCommitBytesCounter() { |
| commitBytesRead(totalByteSizeOfEntriesForCurrentKey); |
| |
| current = null; |
| isFinished = true; |
| return false; |
| } |
| |
| public KeyGroupedShuffleEntries getCurrent() { |
| if (current == null) { |
| throw new NoSuchElementException(); |
| } |
| return current; |
| } |
| |
| public boolean finished() { |
| return isFinished; |
| } |
| |
| public boolean trySplitAtPosition(ShufflePosition newStopPosition) { |
| if (rangeTracker.trySplitAtPosition(newStopPosition)) { |
| LOG.info( |
| "Split GroupingShuffleReader at {}, now {}", newStopPosition.toString(), rangeTracker); |
| return true; |
| } else { |
| LOG.debug( |
| "Will not split GroupingShuffleReader {} at {}", |
| rangeTracker, |
| newStopPosition.toString()); |
| return false; |
| } |
| } |
| |
| public GroupingShuffleRangeTracker getRangeTracker() { |
| return rangeTracker; |
| } |
| |
| private static class ValuesIterable |
| extends ElementByteSizeObservableIterable<ShuffleEntry, ValuesIterator> |
| implements Reiterable<ShuffleEntry> { |
| private final GroupingShuffleEntryIterator parent; |
| private final byte[] currentKeyBytes; |
| private Reiterator<ShuffleEntry> baseValuesIterator; |
| |
| public ValuesIterable( |
| GroupingShuffleEntryIterator parent, |
| byte[] keyBytes, |
| Reiterator<ShuffleEntry> baseValuesIterator) { |
| this.parent = parent; |
| this.currentKeyBytes = keyBytes; |
| this.baseValuesIterator = baseValuesIterator; |
| } |
| |
| @Override |
| public ValuesIterator createIterator() { |
| return new ValuesIterator(parent, baseValuesIterator.copy(), currentKeyBytes); |
| } |
| } |
| |
| /** |
| * Provides the {@link Reiterator} used to iterate through the shuffle entries of a |
| * KeyGroupedShuffleEntries. |
| */ |
| private static class ValuesIterator extends ElementByteSizeObservableIterator<ShuffleEntry> |
| implements Reiterator<ShuffleEntry> { |
| private final GroupingShuffleEntryIterator parent; |
| private final Reiterator<ShuffleEntry> valuesIterator; |
| private final ProgressTracker<ShuffleEntry> tracker; |
| private final byte[] expectedKeyBytes; |
| |
| private Boolean cachedHasNext; |
| private long byteSizeRead = 0L; |
| private ShuffleEntry current; |
| |
| public ValuesIterator( |
| GroupingShuffleEntryIterator parent, |
| Reiterator<ShuffleEntry> valuesIterator, |
| byte[] expectedKeyBytes) { |
| this.parent = parent; |
| this.valuesIterator = valuesIterator; |
| this.expectedKeyBytes = checkNotNull(expectedKeyBytes); |
| // N.B. The ProgressTrackerGroup captures the reference to the original |
| // ValuesIterator for a given values iteration, which happens to be |
| // exactly what we want, since this is also the ValuesIterator whose |
| // base Observable has the references to all of the Observers watching |
| // the iteration. Copied ValuesIterator instances do *not* have these |
| // Observers, but that's fine, since the derived ProgressTracker |
| // instances reference the ProgressTrackerGroup, which references the |
| // original ValuesIterator, which does have them. |
| this.tracker = |
| new ProgressTrackerGroup<ShuffleEntry>() { |
| @Override |
| protected void report(ShuffleEntry entry) { |
| ValuesIterator.this.notifyValueReturned(entry.length()); |
| } |
| }.start(); |
| } |
| |
| private ValuesIterator( |
| GroupingShuffleEntryIterator parent, ValuesIterator it, long byteSizeRead) { |
| this.parent = parent; |
| this.valuesIterator = it.valuesIterator.copy(); |
| this.tracker = it.tracker.copy(); |
| this.expectedKeyBytes = it.expectedKeyBytes; |
| this.cachedHasNext = it.cachedHasNext; |
| this.byteSizeRead = byteSizeRead; |
| this.current = it.current; |
| } |
| |
| @Override |
| public boolean hasNext() { |
| if (cachedHasNext != null) { |
| return cachedHasNext; |
| } |
| cachedHasNext = advance(); |
| return cachedHasNext; |
| } |
| |
| private boolean advance() { |
| // Save a copy of the iterator pointing at the next entry, to use below in case we're right |
| // before a key boundary (or end of stream). |
| Reiterator<ShuffleEntry> possibleStartOfNextKey = valuesIterator.copy(); |
| if (valuesIterator.hasNext()) { |
| ShuffleEntry entry = valuesIterator.next(); |
| if (Arrays.equals(entry.getKey(), expectedKeyBytes)) { |
| byteSizeRead += entry.length(); |
| tracker.saw(entry); |
| current = entry; |
| return true; |
| } |
| } |
| |
| // Fast-forwarding: |
| // |
| // This ValuesIterator is the iterator for the Iterable<V> accompanying the current K |
| // for the parent (which is an Iterable<KV<K, Iterable<V>>>). |
| // When ValuesIterator was created, the parent iterator was seeked to the beginning of |
| // this Iterable<V>. |
| // |
| // Now that we've reached the end, we can fast-forward the parent iterator to our iterator |
| // which points to the next key (or end of stream). Otherwise, parent's advance() would have |
| // to read through all entries of K1 again just to skip them. |
| // |
| // "expectedKeyBytes" is initialized from parent.currentKeyBytes by reference, |
| // and the byte array is not mutated, so a reference equality check is sufficient |
| // to guard against two adversary situations: |
| // 1) when the parent GroupingShuffleEntryIterator has been advanced to the next key |
| // between calls to hasNext/next on the current ValuesIterator. |
| // 2) when another ValuesIterator produced from the same parent at a different key |
| // has fast-forwarded it to the successor of that key. |
| // (note that if this "antagonist" ValuesIterator was at the same key, then this |
| // codepath may trigger more than once, hence the body of the if statement should |
| // be idempotent) |
| if (expectedKeyBytes == parent.currentKeyBytes) { |
| // Fast-forward the parent's shuffle iterator. |
| parent.shuffleIterator = possibleStartOfNextKey; |
| // The parent is also tracking the total byte size read. Since we have been tracking this |
| // amount in byteSizeRead, we can just set it to the right value. |
| parent.totalByteSizeOfEntriesForCurrentKey = byteSizeRead; |
| // We need to do both of these things for the fast-forward to be correct. |
| } |
| |
| return false; |
| } |
| |
| @Override |
| public ShuffleEntry next() { |
| if (!hasNext()) { |
| throw new NoSuchElementException(); |
| } |
| checkState(current != null, "current entry should be non-null"); |
| ShuffleEntry toReturn = current; |
| cachedHasNext = null; |
| current = null; |
| return toReturn; |
| } |
| |
| @Override |
| public ValuesIterator copy() { |
| return new ValuesIterator(parent, this, byteSizeRead); |
| } |
| |
| @Override |
| public void remove() { |
| throw new UnsupportedOperationException(); |
| } |
| } |
| } |