blob: b3f2c5e8ae7d9f4a58a58cf6a962bea9715a955c [file] [log] [blame]
/*
* Licensed to the Apache Software Foundation (ASF) under one
* or more contributor license agreements. See the NOTICE file
* distributed with this work for additional information
* regarding copyright ownership. The ASF licenses this file
* to you under the Apache License, Version 2.0 (the
* "License"); you may not use this file except in compliance
* with the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/
package org.apache.beam.sdk.io;
import static org.apache.beam.sdk.testing.SerializableMatchers.greaterThanOrEqualTo;
import static org.apache.beam.sdk.transforms.display.DisplayDataMatchers.hasDisplayItem;
import static org.apache.beam.sdk.transforms.display.DisplayDataMatchers.includesDisplayDataFor;
import static org.hamcrest.MatcherAssert.assertThat;
import java.io.IOException;
import java.io.InputStream;
import java.io.OutputStream;
import java.io.Serializable;
import java.util.ArrayList;
import java.util.Collections;
import java.util.List;
import java.util.Map;
import java.util.NoSuchElementException;
import java.util.Optional;
import java.util.UUID;
import java.util.concurrent.ConcurrentHashMap;
import java.util.function.Consumer;
import java.util.stream.Collectors;
import java.util.stream.LongStream;
import org.apache.beam.sdk.coders.AvroCoder;
import org.apache.beam.sdk.coders.Coder;
import org.apache.beam.sdk.coders.CoderException;
import org.apache.beam.sdk.coders.CustomCoder;
import org.apache.beam.sdk.coders.StringUtf8Coder;
import org.apache.beam.sdk.coders.VarLongCoder;
import org.apache.beam.sdk.io.CountingSource.CounterMark;
import org.apache.beam.sdk.io.UnboundedSource.CheckpointMark;
import org.apache.beam.sdk.io.UnboundedSource.UnboundedReader;
import org.apache.beam.sdk.options.PipelineOptions;
import org.apache.beam.sdk.testing.NeedsRunner;
import org.apache.beam.sdk.testing.PAssert;
import org.apache.beam.sdk.testing.TestPipeline;
import org.apache.beam.sdk.testing.UsesUnboundedPCollections;
import org.apache.beam.sdk.testing.UsesUnboundedSplittableParDo;
import org.apache.beam.sdk.transforms.Count;
import org.apache.beam.sdk.transforms.DoFn;
import org.apache.beam.sdk.transforms.display.DisplayData;
import org.apache.beam.sdk.transforms.reflect.DoFnInvokers;
import org.apache.beam.sdk.transforms.splittabledofn.WatermarkEstimators;
import org.apache.beam.sdk.transforms.windowing.AfterWatermark;
import org.apache.beam.sdk.transforms.windowing.BoundedWindow;
import org.apache.beam.sdk.transforms.windowing.GlobalWindows;
import org.apache.beam.sdk.transforms.windowing.Window;
import org.apache.beam.sdk.values.PCollection;
import org.apache.beam.vendor.guava.v26_0_jre.com.google.common.base.MoreObjects;
import org.apache.beam.vendor.guava.v26_0_jre.com.google.common.collect.ImmutableList;
import org.checkerframework.checker.nullness.qual.Nullable;
import org.joda.time.Duration;
import org.joda.time.Instant;
import org.junit.Rule;
import org.junit.Test;
import org.junit.experimental.categories.Category;
import org.junit.rules.ExpectedException;
import org.junit.runner.RunWith;
import org.junit.runners.JUnit4;
/** Tests for {@link Read}. */
@RunWith(JUnit4.class)
@SuppressWarnings({
"rawtypes", // TODO(https://issues.apache.org/jira/browse/BEAM-10556)
})
public class ReadTest implements Serializable {
private static final Map<String, List<Instant>> STATIC_INSTANT_LIST_MAP =
new ConcurrentHashMap<>();
@Rule public transient ExpectedException thrown = ExpectedException.none();
@Rule public final transient TestPipeline pipeline = TestPipeline.create();
@Test
public void testInstantiationOfBoundedSourceAsSDFWrapper() {
DoFn dofn = new Read.BoundedSourceAsSDFWrapperFn<>();
DoFnInvokers.invokerFor(dofn);
}
@Test
public void failsWhenCustomBoundedSourceIsNotSerializable() {
thrown.expect(IllegalArgumentException.class);
Read.from(new NotSerializableBoundedSource());
}
@Test
public void succeedsWhenCustomBoundedSourceIsSerializable() {
Read.from(new SerializableBoundedSource());
}
@Test
public void failsWhenCustomUnboundedSourceIsNotSerializable() {
thrown.expect(IllegalArgumentException.class);
Read.from(new NotSerializableUnboundedSource());
}
@Test
public void succeedsWhenCustomUnboundedSourceIsSerializable() {
Read.from(new SerializableUnboundedSource());
}
@Test
public void testDisplayData() {
SerializableBoundedSource boundedSource =
new SerializableBoundedSource() {
@Override
public void populateDisplayData(DisplayData.Builder builder) {
builder.add(DisplayData.item("foo", "bar"));
}
};
SerializableUnboundedSource unboundedSource =
new SerializableUnboundedSource() {
@Override
public void populateDisplayData(DisplayData.Builder builder) {
builder.add(DisplayData.item("foo", "bar"));
}
};
Duration maxReadTime = Duration.standardMinutes(2345);
Read.Bounded<String> bounded = Read.from(boundedSource);
BoundedReadFromUnboundedSource<String> unbounded =
Read.from(unboundedSource).withMaxNumRecords(1234).withMaxReadTime(maxReadTime);
DisplayData boundedDisplayData = DisplayData.from(bounded);
assertThat(boundedDisplayData, hasDisplayItem("source", boundedSource.getClass()));
assertThat(boundedDisplayData, includesDisplayDataFor("source", boundedSource));
DisplayData unboundedDisplayData = DisplayData.from(unbounded);
assertThat(unboundedDisplayData, hasDisplayItem("source", unboundedSource.getClass()));
assertThat(unboundedDisplayData, includesDisplayDataFor("source", unboundedSource));
assertThat(unboundedDisplayData, hasDisplayItem("maxRecords", 1234));
assertThat(unboundedDisplayData, hasDisplayItem("maxReadTime", maxReadTime));
}
@Test
@Category({
NeedsRunner.class,
UsesUnboundedPCollections.class,
UsesUnboundedSplittableParDo.class
})
public void testUnboundedSdfWrapperCacheStartedReaders() {
long numElements = 1000L;
PCollection<Long> input =
pipeline.apply(Read.from(new ExpectCacheUnboundedSource(numElements)));
PAssert.that(input)
.containsInAnyOrder(
LongStream.rangeClosed(1L, numElements).boxed().collect(Collectors.toList()));
// Force the pipeline to run with one thread to ensure the reader will be reused on one DoFn
// instance.
// We are not able to use DirectOptions because of circular dependency.
pipeline
.runWithAdditionalOptionArgs(ImmutableList.of("--targetParallelism=1"))
.waitUntilFinish();
}
@Test
@Category({
NeedsRunner.class,
UsesUnboundedPCollections.class,
UsesUnboundedSplittableParDo.class
})
public void testWatermarkAdvanceOnClaimFail() {
// NOTE: this test is supposed to run only against DirectRunner
// as for other runners it might not be working the interception of watermark
// through the STATIC_INSTANT_LIST_MAP
int numElements = 1000;
final String uuid = UUID.randomUUID().toString();
List<Instant> interceptedWatermark =
STATIC_INSTANT_LIST_MAP.computeIfAbsent(uuid, tmp -> new ArrayList<>());
PCollection<Long> counted =
pipeline
.apply(
newUnboundedReadInterceptingWatermark(
numElements,
(Serializable & Consumer<Instant>)
(instant -> STATIC_INSTANT_LIST_MAP.get(uuid).add(instant))))
.apply(
Window.<Long>into(new GlobalWindows())
.discardingFiredPanes()
.triggering(AfterWatermark.pastEndOfWindow()))
.apply(Count.globally());
PAssert.that(counted).containsInAnyOrder((long) numElements);
pipeline.run().waitUntilFinish();
// verify that the observed watermark gradually moves
assertThat(interceptedWatermark.size(), greaterThanOrEqualTo(numElements));
Instant watermark = interceptedWatermark.get(0);
for (int i = 1; i < interceptedWatermark.size(); i++) {
assertThat(
"Watermarks should be non-decreasing sequence, got " + interceptedWatermark,
!watermark.isAfter(interceptedWatermark.get(i)));
watermark = interceptedWatermark.get(i);
}
}
private <T extends Serializable & Consumer<Instant>>
Read.Unbounded<Long> newUnboundedReadInterceptingWatermark(
long numElements, T interceptedWatermarkReceiver) {
UnboundedLongSource source = new UnboundedLongSource(numElements);
return new Read.Unbounded<Long>(null, source) {
@Override
@SuppressWarnings("unchecked")
Read.UnboundedSourceAsSDFWrapperFn<Long, CheckpointMark> createUnboundedSdfWrapper() {
return new Read.UnboundedSourceAsSDFWrapperFn<Long, CheckpointMark>(
(Coder) source.getCheckpointMarkCoder()) {
@Override
public WatermarkEstimators.Manual newWatermarkEstimator(Instant watermarkEstimatorState) {
return new WatermarkEstimators.Manual(watermarkEstimatorState) {
@Override
public void setWatermark(Instant watermark) {
super.setWatermark(watermark);
interceptedWatermarkReceiver.accept(watermark);
}
};
}
};
}
};
}
private abstract static class CustomBoundedSource extends BoundedSource<String> {
@Override
public List<? extends BoundedSource<String>> split(
long desiredBundleSizeBytes, PipelineOptions options) throws Exception {
return null;
}
@Override
public long getEstimatedSizeBytes(PipelineOptions options) throws Exception {
return 0;
}
@Override
public BoundedReader<String> createReader(PipelineOptions options) throws IOException {
return null;
}
@Override
public Coder<String> getOutputCoder() {
return StringUtf8Coder.of();
}
}
private static class NotSerializableBoundedSource extends CustomBoundedSource {
@SuppressWarnings("unused")
private final NotSerializableClass notSerializableClass = new NotSerializableClass();
}
private static class SerializableBoundedSource extends CustomBoundedSource {}
private static class ExpectCacheUnboundedSource
extends UnboundedSource<Long, CountingSource.CounterMark> {
private final long numElements;
ExpectCacheUnboundedSource(long numElements) {
this.numElements = numElements;
}
@Override
public List<? extends UnboundedSource<Long, CounterMark>> split(
int desiredNumSplits, PipelineOptions options) throws Exception {
return ImmutableList.of(this);
}
@Override
public UnboundedReader<Long> createReader(
PipelineOptions options, @Nullable CounterMark checkpointMark) throws IOException {
if (checkpointMark != null) {
throw new IOException("The reader should be retrieved from cache instead of a new one");
}
return new ExpectCacheReader(this, checkpointMark);
}
@Override
public Coder<Long> getOutputCoder() {
return VarLongCoder.of();
}
@Override
public Coder<CounterMark> getCheckpointMarkCoder() {
return AvroCoder.of(CountingSource.CounterMark.class);
}
}
private static class ExpectCacheReader extends UnboundedReader<Long> {
private long current;
private ExpectCacheUnboundedSource source;
ExpectCacheReader(ExpectCacheUnboundedSource source, CounterMark checkpointMark) {
this.source = source;
if (checkpointMark == null) {
current = 0L;
} else {
current = checkpointMark.getLastEmitted();
}
}
@Override
public boolean start() throws IOException {
return advance();
}
@Override
public boolean advance() throws IOException {
current += 1;
if (current > source.numElements) {
return false;
}
return true;
}
@Override
public Long getCurrent() throws NoSuchElementException {
return current;
}
@Override
public Instant getCurrentTimestamp() throws NoSuchElementException {
return getWatermark();
}
@Override
public void close() throws IOException {}
@Override
public Instant getWatermark() {
if (current > source.numElements) {
return BoundedWindow.TIMESTAMP_MAX_VALUE;
}
return BoundedWindow.TIMESTAMP_MIN_VALUE;
}
@Override
public CheckpointMark getCheckpointMark() {
if (current <= 0) {
return null;
}
return new CounterMark(current, BoundedWindow.TIMESTAMP_MIN_VALUE);
}
@Override
public UnboundedSource<Long, ?> getCurrentSource() {
return source;
}
}
private abstract static class CustomUnboundedSource
extends UnboundedSource<String, NoOpCheckpointMark> {
@Override
public List<? extends UnboundedSource<String, NoOpCheckpointMark>> split(
int desiredNumSplits, PipelineOptions options) throws Exception {
return null;
}
@Override
public UnboundedReader<String> createReader(
PipelineOptions options, NoOpCheckpointMark checkpointMark) {
return null;
}
@Override
public @Nullable Coder<NoOpCheckpointMark> getCheckpointMarkCoder() {
return null;
}
@Override
public boolean requiresDeduping() {
return true;
}
@Override
public Coder<String> getOutputCoder() {
return StringUtf8Coder.of();
}
}
private static class NoOpCheckpointMark implements CheckpointMark {
@Override
public void finalizeCheckpoint() throws IOException {}
}
private static class NotSerializableUnboundedSource extends CustomUnboundedSource {
@SuppressWarnings("unused")
private final NotSerializableClass notSerializableClass = new NotSerializableClass();
}
private static class SerializableUnboundedSource extends CustomUnboundedSource {}
private static class NotSerializableClass {}
private static class OffsetCheckpointMark implements CheckpointMark {
private static final Coder<OffsetCheckpointMark> CODER =
new CustomCoder<OffsetCheckpointMark>() {
private final VarLongCoder longCoder = VarLongCoder.of();
@Override
public void encode(OffsetCheckpointMark value, OutputStream outStream)
throws CoderException, IOException {
longCoder.encode(value.offset, outStream);
}
@Override
public OffsetCheckpointMark decode(InputStream inStream)
throws CoderException, IOException {
return new OffsetCheckpointMark(longCoder.decode(inStream));
}
};
private final long offset;
OffsetCheckpointMark(Long offset) {
this.offset = MoreObjects.firstNonNull(offset, -1L);
}
@Override
public void finalizeCheckpoint() {}
}
private class UnboundedLongSource extends UnboundedSource<Long, OffsetCheckpointMark> {
private final long numElements;
public UnboundedLongSource(long numElements) {
this.numElements = numElements;
}
@Override
public List<? extends UnboundedSource<Long, OffsetCheckpointMark>> split(
int desiredNumSplits, PipelineOptions options) {
return Collections.singletonList(this);
}
@Override
public UnboundedReader<Long> createReader(
PipelineOptions options, @Nullable OffsetCheckpointMark checkpointMark) {
return new UnboundedLongSourceReader(
Optional.ofNullable(checkpointMark).map(m -> m.offset).orElse(-1L));
}
@Override
public Coder<Long> getOutputCoder() {
return VarLongCoder.of();
}
@Override
public Coder<OffsetCheckpointMark> getCheckpointMarkCoder() {
return OffsetCheckpointMark.CODER;
}
private class UnboundedLongSourceReader extends UnboundedReader<Long> {
private final Instant now = Instant.now();
private long current;
UnboundedLongSourceReader(long current) {
this.current = current;
}
@Override
public Long getCurrent() throws NoSuchElementException {
return current;
}
@Override
public Instant getCurrentTimestamp() throws NoSuchElementException {
return current < 0 ? now : now.plus(current);
}
@Override
public void close() throws IOException {}
@Override
public boolean start() throws IOException {
return advance();
}
@Override
public boolean advance() throws IOException {
return ++current < numElements;
}
@Override
public Instant getWatermark() {
return current < numElements ? getCurrentTimestamp() : BoundedWindow.TIMESTAMP_MAX_VALUE;
}
@Override
public CheckpointMark getCheckpointMark() {
return new OffsetCheckpointMark(current);
}
@Override
public UnboundedSource<Long, ?> getCurrentSource() {
return UnboundedLongSource.this;
}
}
}
}