blob: 10de308f551ca488d08d51bf86616813c21a14e9 [file] [log] [blame]
/*
* Licensed to the Apache Software Foundation (ASF) under one
* or more contributor license agreements. See the NOTICE file
* distributed with this work for additional information
* regarding copyright ownership. The ASF licenses this file
* to you under the Apache License, Version 2.0 (the
* "License"); you may not use this file except in compliance
* with the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/
package org.apache.beam.sdk.io;
import static org.apache.beam.sdk.testing.SourceTestUtils.assertSplitAtFractionExhaustive;
import static org.apache.beam.sdk.testing.SourceTestUtils.readFromSource;
import static org.junit.Assert.assertEquals;
import static org.junit.Assert.assertFalse;
import static org.junit.Assert.assertNotNull;
import static org.junit.Assert.assertNull;
import static org.junit.Assert.assertTrue;
import java.io.IOException;
import java.util.ArrayList;
import java.util.List;
import java.util.NoSuchElementException;
import org.apache.beam.sdk.coders.BigEndianIntegerCoder;
import org.apache.beam.sdk.coders.Coder;
import org.apache.beam.sdk.io.BoundedSource.BoundedReader;
import org.apache.beam.sdk.io.OffsetBasedSource.OffsetBasedReader;
import org.apache.beam.sdk.options.PipelineOptions;
import org.apache.beam.sdk.options.PipelineOptionsFactory;
import org.junit.Test;
import org.junit.runner.RunWith;
import org.junit.runners.JUnit4;
/** Tests code common to all offset-based sources. */
@RunWith(JUnit4.class)
public class OffsetBasedSourceTest {
// An offset-based source with 4 bytes per offset that yields its own current offset
// and rounds the start and end offset to the nearest multiple of a given number,
// e.g. reading [13, 48) with granularity 10 gives records with values [20, 50).
private static class CoarseRangeSource extends OffsetBasedSource<Integer> {
private long granularity;
public CoarseRangeSource(
long startOffset, long endOffset, long minBundleSize, long granularity) {
super(startOffset, endOffset, minBundleSize);
this.granularity = granularity;
}
@Override
public OffsetBasedSource<Integer> createSourceForSubrange(long start, long end) {
return new CoarseRangeSource(start, end, getMinBundleSize(), granularity);
}
@Override
public Coder<Integer> getOutputCoder() {
return BigEndianIntegerCoder.of();
}
@Override
public long getBytesPerOffset() {
return 4;
}
@Override
public long getMaxEndOffset(PipelineOptions options) {
return getEndOffset();
}
@Override
public CoarseRangeReader createReader(PipelineOptions options) {
return new CoarseRangeReader(this);
}
}
private static class CoarseRangeReader extends OffsetBasedReader<Integer> {
private long current = -1;
private long granularity;
public CoarseRangeReader(CoarseRangeSource source) {
super(source);
this.granularity = source.granularity;
}
@Override
protected long getCurrentOffset() {
return current;
}
@Override
public boolean startImpl() {
current = getCurrentSource().getStartOffset();
while (current % granularity != 0) {
++current;
}
return true;
}
@Override
public boolean advanceImpl() {
++current;
return true;
}
@Override
public Integer getCurrent() throws NoSuchElementException {
return (int) current;
}
@Override
public boolean isAtSplitPoint() {
return current % granularity == 0;
}
@Override
public void close() {}
}
public static void assertSplitsAre(
List<? extends OffsetBasedSource<?>> splits, long[] expectedBoundaries) {
assertEquals(splits.size(), expectedBoundaries.length - 1);
int i = 0;
for (OffsetBasedSource<?> split : splits) {
assertEquals(split.getStartOffset(), expectedBoundaries[i]);
assertEquals(split.getEndOffset(), expectedBoundaries[i + 1]);
i++;
}
}
@Test
public void testSplitPositionsZeroStart() throws Exception {
long start = 0;
long end = 1000;
long minBundleSize = 50;
CoarseRangeSource testSource = new CoarseRangeSource(start, end, minBundleSize, 1);
long[] boundaries = {0, 150, 300, 450, 600, 750, 900, 1000};
assertSplitsAre(testSource.split(150 * testSource.getBytesPerOffset(), null), boundaries);
}
@Test
public void testSplitPositionsNonZeroStart() throws Exception {
long start = 300;
long end = 1000;
long minBundleSize = 50;
CoarseRangeSource testSource = new CoarseRangeSource(start, end, minBundleSize, 1);
long[] boundaries = {300, 450, 600, 750, 900, 1000};
assertSplitsAre(testSource.split(150 * testSource.getBytesPerOffset(), null), boundaries);
}
@Test
public void testEstimatedSizeBytes() throws Exception {
long start = 300;
long end = 1000;
long minBundleSize = 150;
CoarseRangeSource testSource = new CoarseRangeSource(start, end, minBundleSize, 1);
PipelineOptions options = PipelineOptionsFactory.create();
assertEquals(
(end - start) * testSource.getBytesPerOffset(), testSource.getEstimatedSizeBytes(options));
}
@Test
public void testMinBundleSize() throws Exception {
long start = 300;
long end = 1000;
long minBundleSize = 150;
CoarseRangeSource testSource = new CoarseRangeSource(start, end, minBundleSize, 1);
long[] boundaries = {300, 450, 600, 750, 1000};
assertSplitsAre(testSource.split(100 * testSource.getBytesPerOffset(), null), boundaries);
}
@Test
public void testSplitPositionsCollapseEndBundle() throws Exception {
long start = 0;
long end = 1000;
long minBundleSize = 50;
CoarseRangeSource testSource = new CoarseRangeSource(start, end, minBundleSize, 1);
// Last 10 bytes should collapse to the previous bundle.
long[] boundaries = {0, 110, 220, 330, 440, 550, 660, 770, 880, 1000};
assertSplitsAre(testSource.split(110 * testSource.getBytesPerOffset(), null), boundaries);
}
@Test
public void testReadingGranularityAndFractionConsumed() throws IOException {
// Tests that the reader correctly snaps to multiples of the given granularity
// (note: this is testing test code), and that getFractionConsumed works sensibly
// in the face of that.
PipelineOptions options = PipelineOptionsFactory.create();
CoarseRangeSource source = new CoarseRangeSource(13, 35, 1, 10);
try (CoarseRangeReader reader = source.createReader(options)) {
List<Integer> items = new ArrayList<>();
assertEquals(0.0, reader.getFractionConsumed(), 1e-6);
assertTrue(reader.start());
items.add(reader.getCurrent());
while (reader.advance()) {
Double fraction = reader.getFractionConsumed();
assertNotNull(fraction);
assertTrue(fraction.toString(), fraction > 0.0);
assertTrue(fraction.toString(), fraction <= 1.0);
items.add(reader.getCurrent());
}
assertEquals(1.0, reader.getFractionConsumed(), 1e-6);
assertEquals(20, items.size());
assertEquals(20, items.get(0).intValue());
assertEquals(39, items.get(items.size() - 1).intValue());
source = new CoarseRangeSource(13, 17, 1, 10);
}
try (BoundedSource.BoundedReader<Integer> reader = source.createReader(options)) {
assertFalse(reader.start());
}
}
@Test
public void testProgress() throws IOException {
PipelineOptions options = PipelineOptionsFactory.create();
CoarseRangeSource source = new CoarseRangeSource(13, 17, 1, 2);
try (OffsetBasedReader<Integer> reader = source.createReader(options)) {
// Unstarted reader
assertEquals(0.0, reader.getFractionConsumed(), 1e-6);
assertEquals(0, reader.getSplitPointsConsumed());
assertEquals(BoundedReader.SPLIT_POINTS_UNKNOWN, reader.getSplitPointsRemaining());
// Start and produce the element 14 since granularity is 2.
assertTrue(reader.start());
assertTrue(reader.isAtSplitPoint());
assertEquals(14, reader.getCurrent().intValue());
assertEquals(0, reader.getSplitPointsConsumed());
assertEquals(BoundedReader.SPLIT_POINTS_UNKNOWN, reader.getSplitPointsRemaining());
// Advance and produce the element 15, not a split point.
assertTrue(reader.advance());
assertEquals(15, reader.getCurrent().intValue());
assertEquals(0, reader.getSplitPointsConsumed());
assertEquals(BoundedReader.SPLIT_POINTS_UNKNOWN, reader.getSplitPointsRemaining());
// Advance and produce the element 16, is a split point. Since the next offset (17) is
// outside the range [13, 17), remaining parallelism should become 1 from UNKNOWN.
assertTrue(reader.advance());
assertTrue(reader.isAtSplitPoint());
assertEquals(16, reader.getCurrent().intValue());
assertEquals(1, reader.getSplitPointsConsumed());
assertEquals(1, reader.getSplitPointsRemaining()); // The next offset is outside the range.
// Advance and produce the element 17, not a split point.
assertTrue(reader.advance());
assertEquals(17, reader.getCurrent().intValue());
assertEquals(1, reader.getSplitPointsConsumed());
assertEquals(1, reader.getSplitPointsRemaining());
// Advance and reach the end of the reader.
assertFalse(reader.advance());
assertEquals(1.0, reader.getFractionConsumed(), 1e-6);
assertEquals(2, reader.getSplitPointsConsumed());
assertEquals(0, reader.getSplitPointsRemaining());
}
}
@Test
public void testProgressEmptySource() throws IOException {
PipelineOptions options = PipelineOptionsFactory.create();
CoarseRangeSource source = new CoarseRangeSource(13, 17, 1, 100);
try (OffsetBasedReader<Integer> reader = source.createReader(options)) {
// before starting
assertEquals(0.0, reader.getFractionConsumed(), 1e-6);
assertEquals(0, reader.getSplitPointsConsumed());
assertEquals(BoundedReader.SPLIT_POINTS_UNKNOWN, reader.getSplitPointsRemaining());
// confirm empty
assertFalse(reader.start());
// after reading empty source
assertEquals(1.0, reader.getFractionConsumed(), 1e-6);
assertEquals(0, reader.getSplitPointsConsumed());
assertEquals(0, reader.getSplitPointsRemaining());
}
}
@Test
public void testSplitAtFraction() throws IOException {
PipelineOptions options = PipelineOptionsFactory.create();
CoarseRangeSource source = new CoarseRangeSource(13, 35, 1, 10);
try (CoarseRangeReader reader = source.createReader(options)) {
List<Integer> originalItems = new ArrayList<>();
assertTrue(reader.start());
originalItems.add(reader.getCurrent());
assertTrue(reader.advance());
originalItems.add(reader.getCurrent());
assertTrue(reader.advance());
originalItems.add(reader.getCurrent());
assertTrue(reader.advance());
originalItems.add(reader.getCurrent());
assertNull(reader.splitAtFraction(0.0));
assertNull(reader.splitAtFraction(reader.getFractionConsumed() - 0.1));
BoundedSource<Integer> residual = reader.splitAtFraction(reader.getFractionConsumed() + 0.1);
BoundedSource<Integer> primary = reader.getCurrentSource();
List<Integer> primaryItems = readFromSource(primary, options);
List<Integer> residualItems = readFromSource(residual, options);
for (Integer item : residualItems) {
assertTrue(item > reader.getCurrentOffset());
}
assertFalse(primaryItems.isEmpty());
assertFalse(residualItems.isEmpty());
assertTrue(primaryItems.get(primaryItems.size() - 1) <= residualItems.get(0));
while (reader.advance()) {
originalItems.add(reader.getCurrent());
}
assertEquals(originalItems, primaryItems);
}
}
@Test
public void testSplitAtFractionExhaustive() throws Exception {
PipelineOptions options = PipelineOptionsFactory.create();
CoarseRangeSource original = new CoarseRangeSource(13, 35, 1, 10);
assertSplitAtFractionExhaustive(original, options);
}
@Test
public void testEmptyOffsetRange() throws Exception {
CoarseRangeSource empty = new CoarseRangeSource(0, 0, 1, 1);
try (CoarseRangeReader reader = empty.createReader(PipelineOptionsFactory.create())) {
assertEquals(0, reader.getSplitPointsConsumed());
assertEquals(OffsetBasedReader.SPLIT_POINTS_UNKNOWN, reader.getSplitPointsRemaining());
assertEquals(0.0, reader.getFractionConsumed(), 0.0001);
assertFalse(reader.start());
assertEquals(0, reader.getSplitPointsConsumed());
assertEquals(0, reader.getSplitPointsRemaining());
assertEquals(1.0, reader.getFractionConsumed(), 0.0001);
}
}
}