blob: 983cbde63e2e4796efdb736855f5d23ebc346152 [file] [log] [blame]
/*
* Licensed to the Apache Software Foundation (ASF) under one
* or more contributor license agreements. See the NOTICE file
* distributed with this work for additional information
* regarding copyright ownership. The ASF licenses this file
* to you under the Apache License, Version 2.0 (the
* "License"); you may not use this file except in compliance
* with the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/
package org.apache.beam.runners.direct;
import static java.util.Collections.emptyMap;
import static java.util.Collections.emptySet;
import static java.util.Collections.singletonMap;
import static org.apache.beam.runners.direct.DirectGraphs.getProducer;
import static org.apache.beam.vendor.guava.v20_0.com.google.common.base.Preconditions.checkState;
import static org.hamcrest.Matchers.containsInAnyOrder;
import static org.hamcrest.Matchers.equalTo;
import static org.hamcrest.Matchers.hasSize;
import static org.hamcrest.Matchers.is;
import static org.hamcrest.Matchers.not;
import static org.hamcrest.Matchers.nullValue;
import static org.junit.Assert.assertEquals;
import static org.junit.Assert.assertThat;
import static org.mockito.Matchers.any;
import static org.mockito.Mockito.mock;
import static org.mockito.Mockito.times;
import static org.mockito.Mockito.verify;
import static org.mockito.Mockito.when;
import java.io.IOException;
import java.io.InputStream;
import java.io.OutputStream;
import java.util.ArrayList;
import java.util.Arrays;
import java.util.Collection;
import java.util.HashMap;
import java.util.List;
import java.util.NoSuchElementException;
import java.util.Set;
import java.util.concurrent.Executors;
import javax.annotation.Nullable;
import org.apache.beam.runners.direct.UnboundedReadDeduplicator.NeverDeduplicator;
import org.apache.beam.runners.direct.UnboundedReadEvaluatorFactory.UnboundedSourceShard;
import org.apache.beam.sdk.Pipeline;
import org.apache.beam.sdk.coders.AtomicCoder;
import org.apache.beam.sdk.coders.BigEndianLongCoder;
import org.apache.beam.sdk.coders.Coder;
import org.apache.beam.sdk.coders.CoderException;
import org.apache.beam.sdk.coders.StringUtf8Coder;
import org.apache.beam.sdk.coders.VarLongCoder;
import org.apache.beam.sdk.io.CountingSource;
import org.apache.beam.sdk.io.Read;
import org.apache.beam.sdk.io.UnboundedSource;
import org.apache.beam.sdk.io.UnboundedSource.CheckpointMark;
import org.apache.beam.sdk.options.PipelineOptions;
import org.apache.beam.sdk.options.PipelineOptionsFactory;
import org.apache.beam.sdk.runners.AppliedPTransform;
import org.apache.beam.sdk.testing.SourceTestUtils;
import org.apache.beam.sdk.testing.TestPipeline;
import org.apache.beam.sdk.transforms.SerializableFunction;
import org.apache.beam.sdk.transforms.windowing.BoundedWindow;
import org.apache.beam.sdk.transforms.windowing.GlobalWindow;
import org.apache.beam.sdk.transforms.windowing.PaneInfo;
import org.apache.beam.sdk.util.CoderUtils;
import org.apache.beam.sdk.util.VarInt;
import org.apache.beam.sdk.util.WindowedValue;
import org.apache.beam.sdk.values.PBegin;
import org.apache.beam.sdk.values.PCollection;
import org.apache.beam.sdk.values.TupleTag;
import org.apache.beam.vendor.guava.v20_0.com.google.common.collect.ContiguousSet;
import org.apache.beam.vendor.guava.v20_0.com.google.common.collect.DiscreteDomain;
import org.apache.beam.vendor.guava.v20_0.com.google.common.collect.ImmutableList;
import org.apache.beam.vendor.guava.v20_0.com.google.common.collect.Iterables;
import org.apache.beam.vendor.guava.v20_0.com.google.common.collect.LinkedListMultimap;
import org.apache.beam.vendor.guava.v20_0.com.google.common.collect.Range;
import org.hamcrest.Matchers;
import org.joda.time.DateTime;
import org.joda.time.Instant;
import org.junit.Before;
import org.junit.Rule;
import org.junit.Test;
import org.junit.rules.ExpectedException;
import org.junit.runner.RunWith;
import org.junit.runners.JUnit4;
/** Tests for {@link UnboundedReadEvaluatorFactory}. */
@RunWith(JUnit4.class)
public class UnboundedReadEvaluatorFactoryTest {
private PCollection<Long> longs;
private UnboundedReadEvaluatorFactory factory;
private EvaluationContext context;
private UncommittedBundle<Long> output;
private BundleFactory bundleFactory = ImmutableListBundleFactory.create();
private UnboundedSource<Long, ?> source;
private DirectGraph graph;
@Rule public ExpectedException thrown = ExpectedException.none();
@Rule public TestPipeline p = TestPipeline.create().enableAbandonedNodeEnforcement(false);
private PipelineOptions options;
@Before
public void setup() {
source = CountingSource.unboundedWithTimestampFn(new LongToInstantFn());
longs = p.apply(Read.from(source));
options = PipelineOptionsFactory.create();
context = mock(EvaluationContext.class);
factory = new UnboundedReadEvaluatorFactory(context, options);
output = bundleFactory.createBundle(longs);
graph = DirectGraphs.getGraph(p);
when(context.createBundle(longs)).thenReturn(output);
}
@Test
public void generatesInitialSplits() throws Exception {
when(context.createRootBundle()).thenAnswer(invocation -> bundleFactory.createRootBundle());
int numSplits = 5;
Collection<CommittedBundle<?>> initialInputs =
new UnboundedReadEvaluatorFactory.InputProvider(context, options)
.getInitialInputs(graph.getProducer(longs), numSplits);
// CountingSource.unbounded has very good splitting behavior
assertThat(initialInputs, hasSize(numSplits));
int readPerSplit = 100;
int totalSize = numSplits * readPerSplit;
Set<Long> expectedOutputs =
ContiguousSet.create(Range.closedOpen(0L, (long) totalSize), DiscreteDomain.longs());
Collection<Long> readItems = new ArrayList<>(totalSize);
for (CommittedBundle<?> initialInput : initialInputs) {
CommittedBundle<UnboundedSourceShard<Long, ?>> shardBundle =
(CommittedBundle<UnboundedSourceShard<Long, ?>>) initialInput;
WindowedValue<UnboundedSourceShard<Long, ?>> shard =
Iterables.getOnlyElement(shardBundle.getElements());
assertThat(shard.getTimestamp(), equalTo(BoundedWindow.TIMESTAMP_MIN_VALUE));
assertThat(shard.getWindows(), Matchers.contains(GlobalWindow.INSTANCE));
UnboundedSource<Long, ?> shardSource = shard.getValue().getSource();
readItems.addAll(
SourceTestUtils.readNItemsFromUnstartedReader(
shardSource.createReader(
PipelineOptionsFactory.create(), null /* No starting checkpoint */),
readPerSplit));
}
assertThat(readItems, containsInAnyOrder(expectedOutputs.toArray(new Long[0])));
}
@Test
public void unboundedSourceInMemoryTransformEvaluatorProducesElements() throws Exception {
when(context.createRootBundle()).thenReturn(bundleFactory.createRootBundle());
Collection<CommittedBundle<?>> initialInputs =
new UnboundedReadEvaluatorFactory.InputProvider(context, options)
.getInitialInputs(graph.getProducer(longs), 1);
CommittedBundle<?> inputShards = Iterables.getOnlyElement(initialInputs);
UnboundedSourceShard<Long, ?> inputShard =
(UnboundedSourceShard<Long, ?>)
Iterables.getOnlyElement(inputShards.getElements()).getValue();
TransformEvaluator<? super UnboundedSourceShard<Long, ?>> evaluator =
factory.forApplication(graph.getProducer(longs), inputShards);
evaluator.processElement((WindowedValue) Iterables.getOnlyElement(inputShards.getElements()));
TransformResult<? super UnboundedSourceShard<Long, ?>> result = evaluator.finishBundle();
WindowedValue<? super UnboundedSourceShard<Long, ?>> residual =
Iterables.getOnlyElement(result.getUnprocessedElements());
assertThat(residual.getTimestamp(), Matchers.lessThan(DateTime.now().toInstant()));
UnboundedSourceShard<Long, ?> residualShard =
(UnboundedSourceShard<Long, ?>) residual.getValue();
assertThat(residualShard.getSource(), equalTo(inputShard.getSource()));
assertThat(residualShard.getCheckpoint(), not(nullValue()));
assertThat(
output.commit(Instant.now()).getElements(),
containsInAnyOrder(
tgw(1L), tgw(2L), tgw(4L), tgw(8L), tgw(9L), tgw(7L), tgw(6L), tgw(5L), tgw(3L),
tgw(0L)));
}
@Test
public void unboundedSourceWithDuplicatesMultipleCalls() throws Exception {
Long[] outputs = new Long[20];
for (long i = 0L; i < 20L; i++) {
outputs[(int) i] = i % 5L;
}
TestUnboundedSource<Long> source = new TestUnboundedSource<>(BigEndianLongCoder.of(), outputs);
source.dedupes = true;
PCollection<Long> pcollection = p.apply(Read.from(source));
AppliedPTransform<?, ?, ?> sourceTransform = getProducer(pcollection);
when(context.createRootBundle()).thenReturn(bundleFactory.createRootBundle());
Collection<CommittedBundle<?>> initialInputs =
new UnboundedReadEvaluatorFactory.InputProvider(context, options)
.getInitialInputs(sourceTransform, 1);
UncommittedBundle<Long> output = bundleFactory.createBundle(pcollection);
when(context.createBundle(pcollection)).thenReturn(output);
CommittedBundle<?> inputBundle = Iterables.getOnlyElement(initialInputs);
TransformEvaluator<UnboundedSourceShard<Long, TestCheckpointMark>> evaluator =
factory.forApplication(sourceTransform, inputBundle);
for (WindowedValue<?> value : inputBundle.getElements()) {
evaluator.processElement(
(WindowedValue<UnboundedSourceShard<Long, TestCheckpointMark>>) value);
}
TransformResult<UnboundedSourceShard<Long, TestCheckpointMark>> result =
evaluator.finishBundle();
assertThat(
output.commit(Instant.now()).getElements(),
containsInAnyOrder(tgw(1L), tgw(2L), tgw(4L), tgw(3L), tgw(0L)));
UncommittedBundle<Long> secondOutput = bundleFactory.createBundle(longs);
when(context.createBundle(longs)).thenReturn(secondOutput);
TransformEvaluator<UnboundedSourceShard<Long, TestCheckpointMark>> secondEvaluator =
factory.forApplication(sourceTransform, inputBundle);
WindowedValue<UnboundedSourceShard<Long, TestCheckpointMark>> residual =
(WindowedValue<UnboundedSourceShard<Long, TestCheckpointMark>>)
Iterables.getOnlyElement(result.getUnprocessedElements());
secondEvaluator.processElement(residual);
secondEvaluator.finishBundle();
assertThat(secondOutput.commit(Instant.now()).getElements(), Matchers.emptyIterable());
}
@Test
public void noElementsAvailableReaderIncludedInResidual() throws Exception {
// Read with a very slow rate so by the second read there are no more elements
PCollection<Long> pcollection =
p.apply(Read.from(new TestUnboundedSource<>(VarLongCoder.of(), 1L)));
AppliedPTransform<?, ?, ?> sourceTransform = getProducer(pcollection);
when(context.createRootBundle()).thenReturn(bundleFactory.createRootBundle());
Collection<CommittedBundle<?>> initialInputs =
new UnboundedReadEvaluatorFactory.InputProvider(context, options)
.getInitialInputs(sourceTransform, 1);
// Process the initial shard. This might produce some output, and will produce a residual shard
// which should produce no output when read from within the following day.
when(context.createBundle(pcollection)).thenReturn(bundleFactory.createBundle(pcollection));
CommittedBundle<?> inputBundle = Iterables.getOnlyElement(initialInputs);
TransformEvaluator<UnboundedSourceShard<Long, TestCheckpointMark>> evaluator =
factory.forApplication(sourceTransform, inputBundle);
for (WindowedValue<?> value : inputBundle.getElements()) {
evaluator.processElement(
(WindowedValue<UnboundedSourceShard<Long, TestCheckpointMark>>) value);
}
TransformResult<UnboundedSourceShard<Long, TestCheckpointMark>> result =
evaluator.finishBundle();
// Read from the residual of the first read. This should not produce any output, but should
// include a residual shard in the result.
UncommittedBundle<Long> secondOutput = bundleFactory.createBundle(longs);
when(context.createBundle(longs)).thenReturn(secondOutput);
TransformEvaluator<UnboundedSourceShard<Long, TestCheckpointMark>> secondEvaluator =
factory.forApplication(sourceTransform, inputBundle);
WindowedValue<UnboundedSourceShard<Long, TestCheckpointMark>> residual =
(WindowedValue<UnboundedSourceShard<Long, TestCheckpointMark>>)
Iterables.getOnlyElement(result.getUnprocessedElements());
secondEvaluator.processElement(residual);
TransformResult<UnboundedSourceShard<Long, TestCheckpointMark>> secondResult =
secondEvaluator.finishBundle();
// Sanity check that nothing was output (The test would have to run for more than a day to do
// so correctly.)
assertThat(secondOutput.commit(Instant.now()).getElements(), Matchers.emptyIterable());
// Test that even though the reader produced no outputs, there is still a residual shard with
// the updated watermark.
WindowedValue<UnboundedSourceShard<Long, TestCheckpointMark>> unprocessed =
(WindowedValue<UnboundedSourceShard<Long, TestCheckpointMark>>)
Iterables.getOnlyElement(secondResult.getUnprocessedElements());
assertThat(unprocessed.getTimestamp(), Matchers.greaterThan(residual.getTimestamp()));
assertThat(unprocessed.getValue().getExistingReader(), not(nullValue()));
}
@Test
public void evaluatorReusesReaderAndClosesAtTheEnd() throws Exception {
int numElements = 1000;
ContiguousSet<Long> elems =
ContiguousSet.create(Range.openClosed(0L, (long) numElements), DiscreteDomain.longs());
TestUnboundedSource<Long> source =
new TestUnboundedSource<>(BigEndianLongCoder.of(), elems.toArray(new Long[0]));
source.advanceWatermarkToInfinity = true;
PCollection<Long> pcollection = p.apply(Read.from(source));
DirectGraph graph = DirectGraphs.getGraph(p);
AppliedPTransform<?, ?, ?> sourceTransform = graph.getProducer(pcollection);
when(context.createRootBundle()).thenReturn(bundleFactory.createRootBundle());
UncommittedBundle<Long> output = mock(UncommittedBundle.class);
when(context.createBundle(pcollection)).thenReturn(output);
WindowedValue<UnboundedSourceShard<Long, TestCheckpointMark>> shard =
WindowedValue.valueInGlobalWindow(
UnboundedSourceShard.unstarted(source, NeverDeduplicator.create()));
CommittedBundle<UnboundedSourceShard<Long, TestCheckpointMark>> inputBundle =
bundleFactory
.<UnboundedSourceShard<Long, TestCheckpointMark>>createRootBundle()
.add(shard)
.commit(Instant.now());
UnboundedReadEvaluatorFactory factory =
new UnboundedReadEvaluatorFactory(context, options, 1.0 /* Always reuse */);
new UnboundedReadEvaluatorFactory.InputProvider(context, options)
.getInitialInputs(sourceTransform, 1);
CommittedBundle<UnboundedSourceShard<Long, TestCheckpointMark>> residual = inputBundle;
do {
TransformEvaluator<UnboundedSourceShard<Long, TestCheckpointMark>> evaluator =
factory.forApplication(sourceTransform, residual);
evaluator.processElement(Iterables.getOnlyElement(residual.getElements()));
TransformResult<UnboundedSourceShard<Long, TestCheckpointMark>> result =
evaluator.finishBundle();
residual =
inputBundle.withElements(
(Iterable<WindowedValue<UnboundedSourceShard<Long, TestCheckpointMark>>>)
result.getUnprocessedElements());
} while (!Iterables.isEmpty(residual.getElements()));
verify(output, times(numElements)).add(any());
assertThat(TestUnboundedSource.readerCreatedCount, equalTo(1));
assertThat(TestUnboundedSource.readerClosedCount, equalTo(1));
}
@Test
public void evaluatorClosesReaderAndResumesFromCheckpoint() throws Exception {
ContiguousSet<Long> elems = ContiguousSet.create(Range.closed(0L, 20L), DiscreteDomain.longs());
TestUnboundedSource<Long> source =
new TestUnboundedSource<>(BigEndianLongCoder.of(), elems.toArray(new Long[0]));
PCollection<Long> pcollection = p.apply(Read.from(source));
AppliedPTransform<?, ?, ?> sourceTransform = DirectGraphs.getGraph(p).getProducer(pcollection);
when(context.createRootBundle()).thenReturn(bundleFactory.createRootBundle());
UncommittedBundle<Long> output = bundleFactory.createBundle(pcollection);
when(context.createBundle(pcollection)).thenReturn(output);
WindowedValue<UnboundedSourceShard<Long, TestCheckpointMark>> shard =
WindowedValue.valueInGlobalWindow(
UnboundedSourceShard.unstarted(source, NeverDeduplicator.create()));
CommittedBundle<UnboundedSourceShard<Long, TestCheckpointMark>> inputBundle =
bundleFactory
.<UnboundedSourceShard<Long, TestCheckpointMark>>createRootBundle()
.add(shard)
.commit(Instant.now());
UnboundedReadEvaluatorFactory factory =
new UnboundedReadEvaluatorFactory(context, options, 0.0 /* never reuse */);
TransformEvaluator<UnboundedSourceShard<Long, TestCheckpointMark>> evaluator =
factory.forApplication(sourceTransform, inputBundle);
evaluator.processElement(shard);
TransformResult<UnboundedSourceShard<Long, TestCheckpointMark>> result =
evaluator.finishBundle();
CommittedBundle<UnboundedSourceShard<Long, TestCheckpointMark>> residual =
inputBundle.withElements(
(Iterable<WindowedValue<UnboundedSourceShard<Long, TestCheckpointMark>>>)
result.getUnprocessedElements());
TransformEvaluator<UnboundedSourceShard<Long, TestCheckpointMark>> secondEvaluator =
factory.forApplication(sourceTransform, residual);
secondEvaluator.processElement(Iterables.getOnlyElement(residual.getElements()));
secondEvaluator.finishBundle();
assertThat(TestUnboundedSource.readerClosedCount, equalTo(2));
assertThat(
Iterables.getOnlyElement(residual.getElements()).getValue().getCheckpoint().isFinalized(),
is(true));
}
@Test
public void evaluatorThrowsInCloseRethrows() throws Exception {
ContiguousSet<Long> elems = ContiguousSet.create(Range.closed(0L, 20L), DiscreteDomain.longs());
TestUnboundedSource<Long> source =
new TestUnboundedSource<>(BigEndianLongCoder.of(), elems.toArray(new Long[0]))
.throwsOnClose();
PCollection<Long> pcollection = p.apply(Read.from(source));
AppliedPTransform<?, ?, ?> sourceTransform = DirectGraphs.getGraph(p).getProducer(pcollection);
when(context.createRootBundle()).thenReturn(bundleFactory.createRootBundle());
UncommittedBundle<Long> output = bundleFactory.createBundle(pcollection);
when(context.createBundle(pcollection)).thenReturn(output);
WindowedValue<UnboundedSourceShard<Long, TestCheckpointMark>> shard =
WindowedValue.valueInGlobalWindow(
UnboundedSourceShard.unstarted(source, NeverDeduplicator.create()));
CommittedBundle<UnboundedSourceShard<Long, TestCheckpointMark>> inputBundle =
bundleFactory
.<UnboundedSourceShard<Long, TestCheckpointMark>>createRootBundle()
.add(shard)
.commit(Instant.now());
UnboundedReadEvaluatorFactory factory =
new UnboundedReadEvaluatorFactory(context, options, 0.0 /* never reuse */);
TransformEvaluator<UnboundedSourceShard<Long, TestCheckpointMark>> evaluator =
factory.forApplication(sourceTransform, inputBundle);
thrown.expect(IOException.class);
thrown.expectMessage("throws on close");
evaluator.processElement(shard);
}
@Test // before this was throwing a NPE
public void emptySource() throws Exception {
TestUnboundedSource.readerClosedCount = 0;
final TestUnboundedSource<String> source = new TestUnboundedSource<>(StringUtf8Coder.of());
source.advanceWatermarkToInfinity = true;
processElement(source);
assertEquals(1, TestUnboundedSource.readerClosedCount);
TestUnboundedSource.readerClosedCount = 0; // reset
}
@Test(expected = IOException.class)
public void sourceThrowingException() throws Exception {
final TestUnboundedSource<String> source = new TestUnboundedSource<>(StringUtf8Coder.of());
source.advanceWatermarkToInfinity = true;
source.throwOnClose = true;
processElement(source);
}
private void processElement(final TestUnboundedSource<String> source) throws Exception {
final EvaluationContext context =
EvaluationContext.create(
MockClock.fromInstant(Instant.now()),
CloningBundleFactory.create(),
DirectGraph.create(
emptyMap(), emptyMap(), LinkedListMultimap.create(), emptySet(), emptyMap()),
emptySet(),
Executors.newCachedThreadPool());
final UnboundedReadEvaluatorFactory factory =
new UnboundedReadEvaluatorFactory(context, options);
final Read.Unbounded<String> unbounded = Read.from(source);
final Pipeline pipeline = Pipeline.create(options);
final PCollection<String> pCollection = pipeline.apply(unbounded);
final AppliedPTransform<PBegin, PCollection<String>, Read.Unbounded<String>> application =
AppliedPTransform.of(
"test",
new HashMap<>(),
singletonMap(new TupleTag(), pCollection),
unbounded,
pipeline);
final TransformEvaluator<UnboundedSourceShard<String, TestCheckpointMark>> evaluator =
factory.forApplication(application, null);
final UnboundedSource.UnboundedReader<String> reader = source.createReader(options, null);
final UnboundedSourceShard<String, TestCheckpointMark> shard =
UnboundedSourceShard.of(source, new NeverDeduplicator(), reader, null);
final WindowedValue<UnboundedSourceShard<String, TestCheckpointMark>> value =
WindowedValue.of(
shard, BoundedWindow.TIMESTAMP_MAX_VALUE, GlobalWindow.INSTANCE, PaneInfo.NO_FIRING);
TestUnboundedSource.readerClosedCount = 0;
evaluator.processElement(value);
}
/**
* A terse alias for producing timestamped longs in the {@link GlobalWindow}, where the timestamp
* is the epoch offset by the value of the element.
*/
private static WindowedValue<Long> tgw(Long elem) {
return WindowedValue.timestampedValueInGlobalWindow(elem, new Instant(elem));
}
private static class LongToInstantFn implements SerializableFunction<Long, Instant> {
@Override
public Instant apply(Long input) {
return new Instant(input);
}
}
private static class TestUnboundedSource<T> extends UnboundedSource<T, TestCheckpointMark> {
private static int getWatermarkCalls = 0;
static int readerCreatedCount;
static int readerClosedCount;
static int readerAdvancedCount;
private final Coder<T> coder;
private final List<T> elems;
private boolean dedupes = false;
private boolean advanceWatermarkToInfinity = false; // After reaching end of input.
private boolean throwOnClose;
public TestUnboundedSource(Coder<T> coder, T... elems) {
this(coder, false, Arrays.asList(elems));
}
private TestUnboundedSource(Coder<T> coder, boolean throwOnClose, List<T> elems) {
readerCreatedCount = 0;
readerClosedCount = 0;
readerAdvancedCount = 0;
this.coder = coder;
this.elems = elems;
this.throwOnClose = throwOnClose;
}
@Override
public List<? extends UnboundedSource<T, TestCheckpointMark>> split(
int desiredNumSplits, PipelineOptions options) throws Exception {
return ImmutableList.of(this);
}
@Override
public UnboundedSource.UnboundedReader<T> createReader(
PipelineOptions options, @Nullable TestCheckpointMark checkpointMark) {
checkState(
checkpointMark == null || checkpointMark.decoded,
"Cannot resume from a checkpoint that has not been decoded");
readerCreatedCount++;
return new TestUnboundedReader(elems, checkpointMark == null ? -1 : checkpointMark.index);
}
@Override
@Nullable
public Coder<TestCheckpointMark> getCheckpointMarkCoder() {
return new TestCheckpointMark.Coder();
}
@Override
public boolean requiresDeduping() {
return dedupes;
}
@Override
public Coder<T> getOutputCoder() {
return coder;
}
public TestUnboundedSource<T> throwsOnClose() {
return new TestUnboundedSource<>(coder, true, elems);
}
private class TestUnboundedReader extends UnboundedReader<T> {
private final List<T> elems;
private int index;
private boolean closed = false;
public TestUnboundedReader(List<T> elems, int startIndex) {
this.elems = elems;
this.index = startIndex;
}
@Override
public boolean start() throws IOException {
return advance();
}
@Override
public boolean advance() throws IOException {
readerAdvancedCount++;
if (index + 1 < elems.size()) {
index++;
return true;
}
return false;
}
@Override
public Instant getWatermark() {
getWatermarkCalls++;
if (index + 1 == elems.size() && TestUnboundedSource.this.advanceWatermarkToInfinity) {
return BoundedWindow.TIMESTAMP_MAX_VALUE;
} else {
return new Instant(index + getWatermarkCalls);
}
}
@Override
public CheckpointMark getCheckpointMark() {
return new TestCheckpointMark(index);
}
@Override
public UnboundedSource<T, ?> getCurrentSource() {
return TestUnboundedSource.this;
}
@Override
public T getCurrent() throws NoSuchElementException {
return elems.get(index);
}
@Override
public Instant getCurrentTimestamp() throws NoSuchElementException {
return new Instant(index);
}
@Override
public byte[] getCurrentRecordId() {
try {
return CoderUtils.encodeToByteArray(coder, getCurrent());
} catch (CoderException e) {
throw new RuntimeException(e);
}
}
@Override
public void close() throws IOException {
try {
readerClosedCount++;
// Enforce the AutoCloseable contract. Close is not idempotent.
assertThat(closed, is(false));
if (throwOnClose) {
throw new IOException(String.format("%s throws on close", TestUnboundedSource.this));
}
} finally {
closed = true;
}
}
}
}
private static class TestCheckpointMark implements CheckpointMark {
final int index;
private boolean finalized = false;
private boolean decoded = false;
private TestCheckpointMark(int index) {
this.index = index;
}
@Override
public void finalizeCheckpoint() throws IOException {
checkState(
!finalized, "%s was finalized more than once", TestCheckpointMark.class.getSimpleName());
checkState(
!decoded,
"%s was finalized after being decoded",
TestCheckpointMark.class.getSimpleName());
finalized = true;
}
boolean isFinalized() {
return finalized;
}
public static class Coder extends AtomicCoder<TestCheckpointMark> {
@Override
public void encode(TestCheckpointMark value, OutputStream outStream) throws IOException {
VarInt.encode(value.index, outStream);
}
@Override
public TestCheckpointMark decode(InputStream inStream) throws IOException {
TestCheckpointMark decoded = new TestCheckpointMark(VarInt.decodeInt(inStream));
decoded.decoded = true;
return decoded;
}
}
}
}