[BEAM-9650] Add PeriodicSequence generator. (#11477)
* Add PeriodicSequence generator.
Add java snippet for slowly updating side inputs.
Co-authored-by: Mikhail Gryzykhin <mikhail@apache.org>
diff --git a/examples/java/src/main/java/org/apache/beam/examples/snippets/Snippets.java b/examples/java/src/main/java/org/apache/beam/examples/snippets/Snippets.java
index e100166..3fca68c 100644
--- a/examples/java/src/main/java/org/apache/beam/examples/snippets/Snippets.java
+++ b/examples/java/src/main/java/org/apache/beam/examples/snippets/Snippets.java
@@ -51,10 +51,13 @@
import org.apache.beam.sdk.options.PipelineOptions;
import org.apache.beam.sdk.options.PipelineOptionsFactory;
import org.apache.beam.sdk.options.ValueProvider;
+import org.apache.beam.sdk.transforms.Combine;
+import org.apache.beam.sdk.transforms.Count;
import org.apache.beam.sdk.transforms.Create;
import org.apache.beam.sdk.transforms.DoFn;
import org.apache.beam.sdk.transforms.MapElements;
import org.apache.beam.sdk.transforms.ParDo;
+import org.apache.beam.sdk.transforms.PeriodicImpulse;
import org.apache.beam.sdk.transforms.Sum;
import org.apache.beam.sdk.transforms.View;
import org.apache.beam.sdk.transforms.Watch;
@@ -68,7 +71,6 @@
import org.apache.beam.sdk.transforms.windowing.Repeatedly;
import org.apache.beam.sdk.transforms.windowing.Window;
import org.apache.beam.sdk.transforms.windowing.WindowFn;
-import org.apache.beam.sdk.transforms.windowing.WindowFn.MergeContext;
import org.apache.beam.sdk.transforms.windowing.WindowMappingFn;
import org.apache.beam.sdk.values.KV;
import org.apache.beam.sdk.values.PCollection;
@@ -785,4 +787,72 @@
}
}
+
+ public static class PeriodicallyUpdatingSideInputs {
+
+ public static PCollection<Long> main(
+ Pipeline p,
+ Instant startAt,
+ Instant stopAt,
+ Duration interval1,
+ Duration interval2,
+ String fileToRead) {
+ // [START PeriodicallyUpdatingSideInputs]
+ PCollectionView<List<Long>> sideInput =
+ p.apply(
+ "SIImpulse",
+ PeriodicImpulse.create()
+ .startAt(startAt)
+ .stopAt(stopAt)
+ .withInterval(interval1)
+ .applyWindowing())
+ .apply(
+ "FileToRead",
+ ParDo.of(
+ new DoFn<Instant, String>() {
+ @DoFn.ProcessElement
+ public void process(@Element Instant notUsed, OutputReceiver<String> o) {
+ o.output(fileToRead);
+ }
+ }))
+ .apply(FileIO.matchAll())
+ .apply(FileIO.readMatches())
+ .apply(TextIO.readFiles())
+ .apply(
+ ParDo.of(
+ new DoFn<String, String>() {
+ @ProcessElement
+ public void process(@Element String src, OutputReceiver<String> o) {
+ o.output(src);
+ }
+ }))
+ .apply(Combine.globally(Count.<String>combineFn()).withoutDefaults())
+ .apply(View.asList());
+
+ PCollection<Instant> mainInput =
+ p.apply(
+ "MIImpulse",
+ PeriodicImpulse.create()
+ .startAt(startAt.minus(Duration.standardSeconds(1)))
+ .stopAt(stopAt.minus(Duration.standardSeconds(1)))
+ .withInterval(interval2)
+ .applyWindowing());
+
+ // Consume side input. GenerateSequence generates test data.
+ // Use a real source (like PubSubIO or KafkaIO) in production.
+ PCollection<Long> result =
+ mainInput.apply(
+ "generateOutput",
+ ParDo.of(
+ new DoFn<Instant, Long>() {
+ @ProcessElement
+ public void process(ProcessContext c) {
+ c.output((long) c.sideInput(sideInput).size());
+ }
+ })
+ .withSideInputs(sideInput));
+ // [END PeriodicallyUpdatingSideInputs]
+ return result;
+ }
+ }
}
diff --git a/examples/java/src/test/java/org/apache/beam/examples/snippets/SnippetsTest.java b/examples/java/src/test/java/org/apache/beam/examples/snippets/SnippetsTest.java
index 0cf53a3c..b65840f 100644
--- a/examples/java/src/test/java/org/apache/beam/examples/snippets/SnippetsTest.java
+++ b/examples/java/src/test/java/org/apache/beam/examples/snippets/SnippetsTest.java
@@ -17,23 +17,33 @@
*/
package org.apache.beam.examples.snippets;
+import java.io.BufferedWriter;
+import java.io.File;
import java.io.IOException;
import java.io.Serializable;
+import java.nio.charset.Charset;
import java.util.ArrayList;
import java.util.Arrays;
import java.util.List;
import org.apache.beam.sdk.Pipeline;
import org.apache.beam.sdk.io.gcp.bigquery.BigQueryOptions;
import org.apache.beam.sdk.options.PipelineOptionsFactory;
+import org.apache.beam.sdk.testing.NeedsRunner;
import org.apache.beam.sdk.testing.PAssert;
import org.apache.beam.sdk.testing.TestPipeline;
+import org.apache.beam.sdk.testing.UsesStatefulParDo;
import org.apache.beam.sdk.transforms.Create;
import org.apache.beam.sdk.transforms.join.CoGbkResult;
import org.apache.beam.sdk.values.KV;
import org.apache.beam.sdk.values.PCollection;
import org.apache.beam.sdk.values.TupleTag;
+import org.apache.beam.vendor.guava.v26_0_jre.com.google.common.io.Files;
+import org.joda.time.Duration;
+import org.joda.time.Instant;
+import org.junit.Assert;
import org.junit.Rule;
import org.junit.Test;
+import org.junit.experimental.categories.Category;
import org.junit.runner.RunWith;
import org.junit.runners.JUnit4;
@@ -143,4 +153,40 @@
p.run();
}
+
+ @Test
+ @Category({NeedsRunner.class, UsesStatefulParDo.class})
+ public void testSlowlyUpdatingSideInputsWindowed() {
+ Instant startAt = Instant.now().minus(Duration.standardMinutes(3));
+ Duration duration = Duration.standardSeconds(10);
+ Instant stopAt = startAt.plus(duration);
+ Duration interval1 = Duration.standardSeconds(1);
+ Duration interval2 = Duration.standardSeconds(1);
+
+ File f = null;
+ try {
+ f = File.createTempFile("testSlowlyUpdatingSIWindowed", "txt");
+ try (BufferedWriter fw = Files.newWriter(f, Charset.forName("UTF-8"))) {
+ fw.append("testdata");
+ }
+ } catch (IOException e) {
+ Assert.fail("failed to create temp file: " + e.toString());
+ throw new RuntimeException("Should never reach here");
+ }
+
+ PCollection<Long> result =
+ Snippets.PeriodicallyUpdatingSideInputs.main(
+ p, startAt, stopAt, interval1, interval2, f.getPath());
+
+ ArrayList<Long> expectedResults = new ArrayList<Long>();
+ expectedResults.add(0L);
+ for (Long i = startAt.getMillis(); i < stopAt.getMillis(); i = i + interval2.getMillis()) {
+ expectedResults.add(1L);
+ }
+
+ PAssert.that(result).containsInAnyOrder(expectedResults);
+
+ p.run().waitUntilFinish();
+ f.deleteOnExit();
+ }
}
diff --git a/runners/direct-java/build.gradle b/runners/direct-java/build.gradle
index 98dca8e..02eb5bf 100644
--- a/runners/direct-java/build.gradle
+++ b/runners/direct-java/build.gradle
@@ -95,6 +95,8 @@
group = "Verification"
description = "Runs tests that require a runner to validate that piplines/transforms work correctly"
+ testLogging.showStandardStreams = true
+
def pipelineOptions = JsonOutput.toJson(["--runner=DirectRunner", "--runnerDeterminedSharding=false"])
systemProperty "beamTestPipelineOptions", pipelineOptions
@@ -112,6 +114,10 @@
excludeCategories "org.apache.beam.sdk.testing.UsesCrossLanguageTransforms"
excludeCategories 'org.apache.beam.sdk.testing.UsesBundleFinalizer'
}
+ testLogging {
+ outputs.upToDateWhen {false}
+ showStandardStreams = true
+ }
}
// NOTE: This will also run 'NeedsRunner' tests, which are run in the :needsRunnerTests task as well.
diff --git a/sdks/java/core/src/main/java/org/apache/beam/sdk/transforms/PeriodicImpulse.java b/sdks/java/core/src/main/java/org/apache/beam/sdk/transforms/PeriodicImpulse.java
new file mode 100644
index 0000000..fc836bc
--- /dev/null
+++ b/sdks/java/core/src/main/java/org/apache/beam/sdk/transforms/PeriodicImpulse.java
@@ -0,0 +1,86 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.beam.sdk.transforms;
+
+import org.apache.beam.sdk.transforms.windowing.BoundedWindow;
+import org.apache.beam.sdk.transforms.windowing.FixedWindows;
+import org.apache.beam.sdk.transforms.windowing.Window;
+import org.apache.beam.sdk.values.PBegin;
+import org.apache.beam.sdk.values.PCollection;
+import org.joda.time.Duration;
+import org.joda.time.Instant;
+
+/**
+ * A {@link PTransform} which produces a sequence of elements at fixed runtime intervals.
+ *
+ * <p>If applyWindowing() is specified, each element will be assigned to its own fixed window.
+ *
+ * <p>See {@link PeriodicSequence}.
+ */
+public class PeriodicImpulse extends PTransform<PBegin, PCollection<Instant>> {
+
+ Instant startTimestamp = Instant.now();
+ Instant stopTimestamp = BoundedWindow.TIMESTAMP_MAX_VALUE;
+ Duration fireInterval = Duration.standardMinutes(1);
+ boolean applyWindowing = false;
+
+ private PeriodicImpulse() {}
+
+ public static PeriodicImpulse create() {
+ return new PeriodicImpulse();
+ }
+
+ public PeriodicImpulse startAt(Instant startTime) {
+ this.startTimestamp = startTime;
+ return this;
+ }
+
+ public PeriodicImpulse stopAt(Instant stopTime) {
+ this.stopTimestamp = stopTime;
+ return this;
+ }
+
+ public PeriodicImpulse withInterval(Duration interval) {
+ this.fireInterval = interval;
+ return this;
+ }
+
+ public PeriodicImpulse applyWindowing() {
+ this.applyWindowing = true;
+ return this;
+ }
+
+ @Override
+ public PCollection<Instant> expand(PBegin input) {
+ PCollection<Instant> result =
+ input
+ .apply(
+ Create.<PeriodicSequence.SequenceDefinition>of(
+ new PeriodicSequence.SequenceDefinition(
+ startTimestamp, stopTimestamp, fireInterval)))
+ .apply(PeriodicSequence.create());
+
+ if (this.applyWindowing) {
+ result =
+ result.apply(
+ Window.<Instant>into(FixedWindows.of(Duration.millis(fireInterval.getMillis()))));
+ }
+
+ return result;
+ }
+}
diff --git a/sdks/java/core/src/main/java/org/apache/beam/sdk/transforms/PeriodicSequence.java b/sdks/java/core/src/main/java/org/apache/beam/sdk/transforms/PeriodicSequence.java
new file mode 100644
index 0000000..bcf1b57
--- /dev/null
+++ b/sdks/java/core/src/main/java/org/apache/beam/sdk/transforms/PeriodicSequence.java
@@ -0,0 +1,210 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.beam.sdk.transforms;
+
+import static org.apache.beam.vendor.guava.v26_0_jre.com.google.common.base.Preconditions.checkArgument;
+import static org.apache.beam.vendor.guava.v26_0_jre.com.google.common.base.Preconditions.checkNotNull;
+import static org.apache.beam.vendor.guava.v26_0_jre.com.google.common.base.Preconditions.checkState;
+
+import java.util.Objects;
+import javax.annotation.Nullable;
+import org.apache.beam.sdk.annotations.Experimental;
+import org.apache.beam.sdk.io.range.OffsetRange;
+import org.apache.beam.sdk.schemas.JavaFieldSchema;
+import org.apache.beam.sdk.schemas.annotations.DefaultSchema;
+import org.apache.beam.sdk.transforms.splittabledofn.RestrictionTracker;
+import org.apache.beam.sdk.transforms.splittabledofn.SplitResult;
+import org.apache.beam.sdk.values.PCollection;
+import org.apache.beam.vendor.guava.v26_0_jre.com.google.common.base.MoreObjects;
+import org.joda.time.Duration;
+import org.joda.time.Instant;
+
+/**
+ * A {@link PTransform} which generates a sequence of timestamped elements at given runtime
+ * intervals.
+ *
+ * <p>Transform assigns each element some timestamp and will only output element when worker clock
+ * reach given timestamp. Transform will not output elements prior to target time. Transform can
+ * output elements at any time after target time.
+ */
+@Experimental(Experimental.Kind.SPLITTABLE_DO_FN)
+public class PeriodicSequence
+ extends PTransform<PCollection<PeriodicSequence.SequenceDefinition>, PCollection<Instant>> {
+
+ @DefaultSchema(JavaFieldSchema.class)
+ public static class SequenceDefinition {
+ public Instant first;
+ public Instant last;
+ public Long durationMilliSec;
+
+ public SequenceDefinition() {}
+
+ public SequenceDefinition(Instant first, Instant last, Duration duration) {
+ this.first = first;
+ this.last = last;
+ this.durationMilliSec = duration.getMillis();
+ }
+
+ @Override
+ public boolean equals(Object obj) {
+ if (this == obj) {
+ return true;
+ }
+
+ if (obj == null || obj.getClass() != this.getClass()) {
+ return false;
+ }
+
+ SequenceDefinition src = (SequenceDefinition) obj;
+ return src.first.equals(this.first)
+ && src.last.equals(this.last)
+ && src.durationMilliSec.equals(this.durationMilliSec);
+ }
+
+ @Override
+ public int hashCode() {
+ int result = Objects.hash(first, last, durationMilliSec);
+ return result;
+ }
+ }
+
+ private PeriodicSequence() {}
+
+ public static PeriodicSequence create() {
+ return new PeriodicSequence();
+ }
+
+ @Experimental(Experimental.Kind.SPLITTABLE_DO_FN)
+ public static class OutputRangeTracker extends RestrictionTracker<OffsetRange, Long>
+ implements RestrictionTracker.HasProgress {
+ private OffsetRange range;
+ @Nullable private Long lastClaimedOffset = null;
+ @Nullable private Long lastAttemptedOffset = null;
+
+ public OutputRangeTracker(OffsetRange range) {
+ this.range = checkNotNull(range);
+ lastClaimedOffset = this.range.getFrom();
+ lastAttemptedOffset = lastClaimedOffset;
+ }
+
+ @Override
+ public OffsetRange currentRestriction() {
+ return range;
+ }
+
+ @Override
+ public SplitResult<OffsetRange> trySplit(double fractionOfRemainder) {
+ if (fractionOfRemainder != 0) {
+ return null;
+ }
+ OffsetRange res = new OffsetRange(lastClaimedOffset, range.getTo());
+ this.range = new OffsetRange(range.getFrom(), lastClaimedOffset);
+ return SplitResult.of(range, res);
+ }
+
+ @Override
+ public boolean tryClaim(Long i) {
+ checkArgument(
+ i > lastAttemptedOffset,
+ "Trying to claim offset %s while last attempted was %s",
+ i,
+ lastAttemptedOffset);
+ checkArgument(
+ i >= range.getFrom(), "Trying to claim offset %s before start of the range %s", i, range);
+ lastAttemptedOffset = i;
+ if (i > range.getTo()) {
+ return false;
+ }
+ lastClaimedOffset = i;
+ return true;
+ }
+
+ @Override
+ public void checkDone() throws IllegalStateException {
+ checkState(
+ lastAttemptedOffset >= range.getTo() - 1,
+ "Last attempted offset was %s in range %s, claiming work in (%s, %s] was not attempted",
+ lastAttemptedOffset,
+ range,
+ lastAttemptedOffset,
+ range.getTo());
+ }
+
+ @Override
+ public String toString() {
+ return MoreObjects.toStringHelper(this)
+ .add("range", range)
+ .add("lastClaimedOffset", lastClaimedOffset)
+ .add("lastAttemptedOffset", lastAttemptedOffset)
+ .toString();
+ }
+
+ @Override
+ public Progress getProgress() {
+ double workRemaining = Math.max(range.getTo() - lastAttemptedOffset, 0);
+ return Progress.from(range.getTo() - range.getFrom() - workRemaining, workRemaining);
+ }
+ }
+
+ private static class PeriodicSequenceFn extends DoFn<SequenceDefinition, Instant> {
+ @GetInitialRestriction
+ public OffsetRange getInitialRange(@Element SequenceDefinition element) {
+ return new OffsetRange(
+ element.first.getMillis() - element.durationMilliSec, element.last.getMillis());
+ }
+
+ @NewTracker
+ public RestrictionTracker<OffsetRange, Long> newTracker(@Restriction OffsetRange restriction) {
+ return new OutputRangeTracker(restriction);
+ }
+
+ @ProcessElement
+ public ProcessContinuation processElement(
+ @Element SequenceDefinition srcElement,
+ OutputReceiver<Instant> out,
+ RestrictionTracker<OffsetRange, Long> restrictionTracker) {
+
+ OffsetRange restriction = restrictionTracker.currentRestriction();
+ Long interval = srcElement.durationMilliSec;
+ Long nextOutput = restriction.getFrom() + interval;
+
+ boolean claimSuccess = true;
+
+ while (claimSuccess && Instant.ofEpochMilli(nextOutput).isBeforeNow()) {
+ claimSuccess = restrictionTracker.tryClaim(nextOutput);
+ if (claimSuccess) {
+ Instant output = Instant.ofEpochMilli(nextOutput);
+ out.outputWithTimestamp(output, output);
+ nextOutput = nextOutput + interval;
+ }
+ }
+
+ ProcessContinuation continuation = ProcessContinuation.stop();
+ if (claimSuccess) {
+ Duration offset = new Duration(Instant.now(), Instant.ofEpochMilli(nextOutput));
+ continuation = ProcessContinuation.resume().withResumeDelay(offset);
+ }
+ return continuation;
+ }
+ }
+
+ @Override
+ public PCollection<Instant> expand(PCollection<SequenceDefinition> input) {
+ return input.apply(ParDo.of(new PeriodicSequenceFn()));
+ }
+}
diff --git a/sdks/java/core/src/test/java/org/apache/beam/sdk/transforms/PeriodicImpulseTest.java b/sdks/java/core/src/test/java/org/apache/beam/sdk/transforms/PeriodicImpulseTest.java
new file mode 100644
index 0000000..2e01cd7
--- /dev/null
+++ b/sdks/java/core/src/test/java/org/apache/beam/sdk/transforms/PeriodicImpulseTest.java
@@ -0,0 +1,79 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.beam.sdk.transforms;
+
+import java.util.ArrayList;
+import org.apache.beam.sdk.testing.NeedsRunner;
+import org.apache.beam.sdk.testing.PAssert;
+import org.apache.beam.sdk.testing.TestPipeline;
+import org.apache.beam.sdk.testing.UsesImpulse;
+import org.apache.beam.sdk.testing.UsesStatefulParDo;
+import org.apache.beam.sdk.values.KV;
+import org.apache.beam.sdk.values.PCollection;
+import org.joda.time.Duration;
+import org.joda.time.Instant;
+import org.junit.Rule;
+import org.junit.Test;
+import org.junit.experimental.categories.Category;
+import org.junit.runner.RunWith;
+import org.junit.runners.JUnit4;
+
+/** Tests for PeriodicImpulse. */
+@RunWith(JUnit4.class)
+public class PeriodicImpulseTest {
+ @Rule public transient TestPipeline p = TestPipeline.create();
+
+ public static class ExtractTsDoFn<InputT> extends DoFn<InputT, KV<InputT, Instant>> {
+ @ProcessElement
+ public void processElement(DoFn<InputT, KV<InputT, Instant>>.ProcessContext c)
+ throws Exception {
+ c.output(KV.of(c.element(), c.timestamp()));
+ }
+ }
+
+ @Test
+ @Category({
+ NeedsRunner.class,
+ UsesImpulse.class,
+ UsesStatefulParDo.class,
+ })
+ public void testOutputsProperElements() {
+ Instant instant = Instant.now();
+
+ Instant startTime = instant.minus(Duration.standardHours(100));
+ long duration = 500;
+ Duration interval = Duration.millis(250);
+ long intervalMillis = interval.getMillis();
+ Instant stopTime = startTime.plus(duration);
+
+ PCollection<KV<Instant, Instant>> result =
+ p.apply(PeriodicImpulse.create().startAt(startTime).stopAt(stopTime).withInterval(interval))
+ .apply(ParDo.of(new ExtractTsDoFn<>()));
+
+ ArrayList<KV<Instant, Instant>> expectedResults =
+ new ArrayList<>((int) (duration / intervalMillis + 1));
+ for (long i = 0; i <= duration; i += intervalMillis) {
+ Instant el = startTime.plus(i);
+ expectedResults.add(KV.of(el, el));
+ }
+
+ PAssert.that(result).containsInAnyOrder(expectedResults);
+
+ p.run().waitUntilFinish();
+ }
+}
diff --git a/sdks/java/core/src/test/java/org/apache/beam/sdk/transforms/PeriodicSequenceTest.java b/sdks/java/core/src/test/java/org/apache/beam/sdk/transforms/PeriodicSequenceTest.java
new file mode 100644
index 0000000..cd6bc01
--- /dev/null
+++ b/sdks/java/core/src/test/java/org/apache/beam/sdk/transforms/PeriodicSequenceTest.java
@@ -0,0 +1,82 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.beam.sdk.transforms;
+
+import java.util.ArrayList;
+import org.apache.beam.sdk.testing.NeedsRunner;
+import org.apache.beam.sdk.testing.PAssert;
+import org.apache.beam.sdk.testing.TestPipeline;
+import org.apache.beam.sdk.testing.UsesImpulse;
+import org.apache.beam.sdk.testing.UsesStatefulParDo;
+import org.apache.beam.sdk.values.KV;
+import org.apache.beam.sdk.values.PCollection;
+import org.joda.time.Duration;
+import org.joda.time.Instant;
+import org.junit.Rule;
+import org.junit.Test;
+import org.junit.experimental.categories.Category;
+import org.junit.runner.RunWith;
+import org.junit.runners.JUnit4;
+
+/** Tests for PeriodicSequence. */
+@RunWith(JUnit4.class)
+public class PeriodicSequenceTest {
+ @Rule public transient TestPipeline p = TestPipeline.create();
+
+ public static class ExtractTsDoFn<InputT> extends DoFn<InputT, KV<InputT, Instant>> {
+ @ProcessElement
+ public void processElement(DoFn<InputT, KV<InputT, Instant>>.ProcessContext c)
+ throws Exception {
+ c.output(KV.of(c.element(), c.timestamp()));
+ }
+ }
+
+ @Test
+ @Category({
+ NeedsRunner.class,
+ UsesImpulse.class,
+ UsesStatefulParDo.class,
+ })
+ public void testOutputsProperElements() {
+ Instant instant = Instant.now();
+
+ Instant startTime = instant.minus(Duration.standardHours(100));
+ long duration = 500;
+ Duration interval = Duration.millis(250);
+ long intervalMillis = interval.getMillis();
+ Instant stopTime = startTime.plus(duration);
+
+ PCollection<KV<Instant, Instant>> result =
+ p.apply(
+ Create.<PeriodicSequence.SequenceDefinition>of(
+ new PeriodicSequence.SequenceDefinition(startTime, stopTime, interval)))
+ .apply(PeriodicSequence.create())
+ .apply(ParDo.of(new ExtractTsDoFn<>())); // used to validate timestamp
+
+ ArrayList<KV<Instant, Instant>> expectedResults =
+ new ArrayList<>((int) (duration / intervalMillis + 1));
+ for (long i = 0; i <= duration; i += intervalMillis) {
+ Instant el = startTime.plus(i);
+ expectedResults.add(KV.of(el, el));
+ }
+
+ PAssert.that(result).containsInAnyOrder(expectedResults);
+
+ p.run().waitUntilFinish();
+ }
+}
diff --git a/website/src/documentation/patterns/side-inputs.md b/website/src/documentation/patterns/side-inputs.md
index 875b337..99aca3a 100644
--- a/website/src/documentation/patterns/side-inputs.md
+++ b/website/src/documentation/patterns/side-inputs.md
@@ -79,8 +79,10 @@
1. Apply the side input.
```java
-No sample present.
+{% github_sample /apache/beam/blob/master/examples/java/src/main/java/org/apache/beam/examples/snippets/Snippets.java tag:PeriodicallyUpdatingSideInputs
+%}
```
+
```py
{% github_sample /apache/beam/blob/master/sdks/python/apache_beam/examples/snippets/snippets.py tag:SideInputSlowUpdateSnip1
%}