[BEAM-9650] Add PeriodicSequence generator. (#11477)

* Add PeriodicSequence generator.
Add java snippet for slowly updating side inputs.

Co-authored-by: Mikhail Gryzykhin <mikhail@apache.org>
diff --git a/examples/java/src/main/java/org/apache/beam/examples/snippets/Snippets.java b/examples/java/src/main/java/org/apache/beam/examples/snippets/Snippets.java
index e100166..3fca68c 100644
--- a/examples/java/src/main/java/org/apache/beam/examples/snippets/Snippets.java
+++ b/examples/java/src/main/java/org/apache/beam/examples/snippets/Snippets.java
@@ -51,10 +51,13 @@
 import org.apache.beam.sdk.options.PipelineOptions;
 import org.apache.beam.sdk.options.PipelineOptionsFactory;
 import org.apache.beam.sdk.options.ValueProvider;
+import org.apache.beam.sdk.transforms.Combine;
+import org.apache.beam.sdk.transforms.Count;
 import org.apache.beam.sdk.transforms.Create;
 import org.apache.beam.sdk.transforms.DoFn;
 import org.apache.beam.sdk.transforms.MapElements;
 import org.apache.beam.sdk.transforms.ParDo;
+import org.apache.beam.sdk.transforms.PeriodicImpulse;
 import org.apache.beam.sdk.transforms.Sum;
 import org.apache.beam.sdk.transforms.View;
 import org.apache.beam.sdk.transforms.Watch;
@@ -68,7 +71,6 @@
 import org.apache.beam.sdk.transforms.windowing.Repeatedly;
 import org.apache.beam.sdk.transforms.windowing.Window;
 import org.apache.beam.sdk.transforms.windowing.WindowFn;
-import org.apache.beam.sdk.transforms.windowing.WindowFn.MergeContext;
 import org.apache.beam.sdk.transforms.windowing.WindowMappingFn;
 import org.apache.beam.sdk.values.KV;
 import org.apache.beam.sdk.values.PCollection;
@@ -785,4 +787,72 @@
 
     }
   }
+
+  public static class PeriodicallyUpdatingSideInputs {
+
+    public static PCollection<Long> main(
+        Pipeline p,
+        Instant startAt,
+        Instant stopAt,
+        Duration interval1,
+        Duration interval2,
+        String fileToRead) {
+      // [START PeriodicallyUpdatingSideInputs]
+      PCollectionView<List<Long>> sideInput =
+          p.apply(
+                  "SIImpulse",
+                  PeriodicImpulse.create()
+                      .startAt(startAt)
+                      .stopAt(stopAt)
+                      .withInterval(interval1)
+                      .applyWindowing())
+              .apply(
+                  "FileToRead",
+                  ParDo.of(
+                      new DoFn<Instant, String>() {
+                        @DoFn.ProcessElement
+                        public void process(@Element Instant notUsed, OutputReceiver<String> o) {
+                          o.output(fileToRead);
+                        }
+                      }))
+              .apply(FileIO.matchAll())
+              .apply(FileIO.readMatches())
+              .apply(TextIO.readFiles())
+              .apply(
+                  ParDo.of(
+                      new DoFn<String, String>() {
+                        @ProcessElement
+                        public void process(@Element String src, OutputReceiver<String> o) {
+                          o.output(src);
+                        }
+                      }))
+              .apply(Combine.globally(Count.<String>combineFn()).withoutDefaults())
+              .apply(View.asList());
+
+      PCollection<Instant> mainInput =
+          p.apply(
+              "MIImpulse",
+              PeriodicImpulse.create()
+                  .startAt(startAt.minus(Duration.standardSeconds(1)))
+                  .stopAt(stopAt.minus(Duration.standardSeconds(1)))
+                  .withInterval(interval2)
+                  .applyWindowing());
+
+      // Consume side input. GenerateSequence generates test data.
+      // Use a real source (like PubSubIO or KafkaIO) in production.
+      PCollection<Long> result =
+          mainInput.apply(
+              "generateOutput",
+              ParDo.of(
+                      new DoFn<Instant, Long>() {
+                        @ProcessElement
+                        public void process(ProcessContext c) {
+                          c.output((long) c.sideInput(sideInput).size());
+                        }
+                      })
+                  .withSideInputs(sideInput));
+      // [END PeriodicallyUpdatingSideInputs]
+      return result;
+    }
+  }
 }
diff --git a/examples/java/src/test/java/org/apache/beam/examples/snippets/SnippetsTest.java b/examples/java/src/test/java/org/apache/beam/examples/snippets/SnippetsTest.java
index 0cf53a3c..b65840f 100644
--- a/examples/java/src/test/java/org/apache/beam/examples/snippets/SnippetsTest.java
+++ b/examples/java/src/test/java/org/apache/beam/examples/snippets/SnippetsTest.java
@@ -17,23 +17,33 @@
  */
 package org.apache.beam.examples.snippets;
 
+import java.io.BufferedWriter;
+import java.io.File;
 import java.io.IOException;
 import java.io.Serializable;
+import java.nio.charset.Charset;
 import java.util.ArrayList;
 import java.util.Arrays;
 import java.util.List;
 import org.apache.beam.sdk.Pipeline;
 import org.apache.beam.sdk.io.gcp.bigquery.BigQueryOptions;
 import org.apache.beam.sdk.options.PipelineOptionsFactory;
+import org.apache.beam.sdk.testing.NeedsRunner;
 import org.apache.beam.sdk.testing.PAssert;
 import org.apache.beam.sdk.testing.TestPipeline;
+import org.apache.beam.sdk.testing.UsesStatefulParDo;
 import org.apache.beam.sdk.transforms.Create;
 import org.apache.beam.sdk.transforms.join.CoGbkResult;
 import org.apache.beam.sdk.values.KV;
 import org.apache.beam.sdk.values.PCollection;
 import org.apache.beam.sdk.values.TupleTag;
+import org.apache.beam.vendor.guava.v26_0_jre.com.google.common.io.Files;
+import org.joda.time.Duration;
+import org.joda.time.Instant;
+import org.junit.Assert;
 import org.junit.Rule;
 import org.junit.Test;
+import org.junit.experimental.categories.Category;
 import org.junit.runner.RunWith;
 import org.junit.runners.JUnit4;
 
@@ -143,4 +153,40 @@
 
     p.run();
   }
+
+  @Test
+  @Category({NeedsRunner.class, UsesStatefulParDo.class})
+  public void testSlowlyUpdatingSideInputsWindowed() {
+    Instant startAt = Instant.now().minus(Duration.standardMinutes(3));
+    Duration duration = Duration.standardSeconds(10);
+    Instant stopAt = startAt.plus(duration);
+    Duration interval1 = Duration.standardSeconds(1);
+    Duration interval2 = Duration.standardSeconds(1);
+
+    File f = null;
+    try {
+      f = File.createTempFile("testSlowlyUpdatingSIWindowed", "txt");
+      try (BufferedWriter fw = Files.newWriter(f, Charset.forName("UTF-8"))) {
+        fw.append("testdata");
+      }
+    } catch (IOException e) {
+      Assert.fail("failed to create temp file: " + e.toString());
+      throw new RuntimeException("Should never reach here");
+    }
+
+    PCollection<Long> result =
+        Snippets.PeriodicallyUpdatingSideInputs.main(
+            p, startAt, stopAt, interval1, interval2, f.getPath());
+
+    ArrayList<Long> expectedResults = new ArrayList<Long>();
+    expectedResults.add(0L);
+    for (Long i = startAt.getMillis(); i < stopAt.getMillis(); i = i + interval2.getMillis()) {
+      expectedResults.add(1L);
+    }
+
+    PAssert.that(result).containsInAnyOrder(expectedResults);
+
+    p.run().waitUntilFinish();
+    f.deleteOnExit();
+  }
 }
diff --git a/runners/direct-java/build.gradle b/runners/direct-java/build.gradle
index 98dca8e..02eb5bf 100644
--- a/runners/direct-java/build.gradle
+++ b/runners/direct-java/build.gradle
@@ -95,6 +95,8 @@
   group = "Verification"
   description = "Runs tests that require a runner to validate that piplines/transforms work correctly"
 
+  testLogging.showStandardStreams = true
+
   def pipelineOptions = JsonOutput.toJson(["--runner=DirectRunner", "--runnerDeterminedSharding=false"])
   systemProperty "beamTestPipelineOptions", pipelineOptions
 
@@ -112,6 +114,10 @@
     excludeCategories "org.apache.beam.sdk.testing.UsesCrossLanguageTransforms"
     excludeCategories 'org.apache.beam.sdk.testing.UsesBundleFinalizer'
   }
+  testLogging {
+    outputs.upToDateWhen {false}
+    showStandardStreams = true
+  }
 }
 
 // NOTE: This will also run 'NeedsRunner' tests, which are run in the :needsRunnerTests task as well.
diff --git a/sdks/java/core/src/main/java/org/apache/beam/sdk/transforms/PeriodicImpulse.java b/sdks/java/core/src/main/java/org/apache/beam/sdk/transforms/PeriodicImpulse.java
new file mode 100644
index 0000000..fc836bc
--- /dev/null
+++ b/sdks/java/core/src/main/java/org/apache/beam/sdk/transforms/PeriodicImpulse.java
@@ -0,0 +1,86 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.beam.sdk.transforms;
+
+import org.apache.beam.sdk.transforms.windowing.BoundedWindow;
+import org.apache.beam.sdk.transforms.windowing.FixedWindows;
+import org.apache.beam.sdk.transforms.windowing.Window;
+import org.apache.beam.sdk.values.PBegin;
+import org.apache.beam.sdk.values.PCollection;
+import org.joda.time.Duration;
+import org.joda.time.Instant;
+
+/**
+ * A {@link PTransform} which produces a sequence of elements at fixed runtime intervals.
+ *
+ * <p>If applyWindowing() is specified, each element will be assigned to its own fixed window.
+ *
+ * <p>See {@link PeriodicSequence}.
+ */
+public class PeriodicImpulse extends PTransform<PBegin, PCollection<Instant>> {
+
+  Instant startTimestamp = Instant.now();
+  Instant stopTimestamp = BoundedWindow.TIMESTAMP_MAX_VALUE;
+  Duration fireInterval = Duration.standardMinutes(1);
+  boolean applyWindowing = false;
+
+  private PeriodicImpulse() {}
+
+  public static PeriodicImpulse create() {
+    return new PeriodicImpulse();
+  }
+
+  public PeriodicImpulse startAt(Instant startTime) {
+    this.startTimestamp = startTime;
+    return this;
+  }
+
+  public PeriodicImpulse stopAt(Instant stopTime) {
+    this.stopTimestamp = stopTime;
+    return this;
+  }
+
+  public PeriodicImpulse withInterval(Duration interval) {
+    this.fireInterval = interval;
+    return this;
+  }
+
+  public PeriodicImpulse applyWindowing() {
+    this.applyWindowing = true;
+    return this;
+  }
+
+  @Override
+  public PCollection<Instant> expand(PBegin input) {
+    PCollection<Instant> result =
+        input
+            .apply(
+                Create.<PeriodicSequence.SequenceDefinition>of(
+                    new PeriodicSequence.SequenceDefinition(
+                        startTimestamp, stopTimestamp, fireInterval)))
+            .apply(PeriodicSequence.create());
+
+    if (this.applyWindowing) {
+      result =
+          result.apply(
+              Window.<Instant>into(FixedWindows.of(Duration.millis(fireInterval.getMillis()))));
+    }
+
+    return result;
+  }
+}
diff --git a/sdks/java/core/src/main/java/org/apache/beam/sdk/transforms/PeriodicSequence.java b/sdks/java/core/src/main/java/org/apache/beam/sdk/transforms/PeriodicSequence.java
new file mode 100644
index 0000000..bcf1b57
--- /dev/null
+++ b/sdks/java/core/src/main/java/org/apache/beam/sdk/transforms/PeriodicSequence.java
@@ -0,0 +1,210 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.beam.sdk.transforms;
+
+import static org.apache.beam.vendor.guava.v26_0_jre.com.google.common.base.Preconditions.checkArgument;
+import static org.apache.beam.vendor.guava.v26_0_jre.com.google.common.base.Preconditions.checkNotNull;
+import static org.apache.beam.vendor.guava.v26_0_jre.com.google.common.base.Preconditions.checkState;
+
+import java.util.Objects;
+import javax.annotation.Nullable;
+import org.apache.beam.sdk.annotations.Experimental;
+import org.apache.beam.sdk.io.range.OffsetRange;
+import org.apache.beam.sdk.schemas.JavaFieldSchema;
+import org.apache.beam.sdk.schemas.annotations.DefaultSchema;
+import org.apache.beam.sdk.transforms.splittabledofn.RestrictionTracker;
+import org.apache.beam.sdk.transforms.splittabledofn.SplitResult;
+import org.apache.beam.sdk.values.PCollection;
+import org.apache.beam.vendor.guava.v26_0_jre.com.google.common.base.MoreObjects;
+import org.joda.time.Duration;
+import org.joda.time.Instant;
+
+/**
+ * A {@link PTransform} which generates a sequence of timestamped elements at given runtime
+ * intervals.
+ *
+ * <p>Transform assigns each element some timestamp and will only output element when worker clock
+ * reach given timestamp. Transform will not output elements prior to target time. Transform can
+ * output elements at any time after target time.
+ */
+@Experimental(Experimental.Kind.SPLITTABLE_DO_FN)
+public class PeriodicSequence
+    extends PTransform<PCollection<PeriodicSequence.SequenceDefinition>, PCollection<Instant>> {
+
+  @DefaultSchema(JavaFieldSchema.class)
+  public static class SequenceDefinition {
+    public Instant first;
+    public Instant last;
+    public Long durationMilliSec;
+
+    public SequenceDefinition() {}
+
+    public SequenceDefinition(Instant first, Instant last, Duration duration) {
+      this.first = first;
+      this.last = last;
+      this.durationMilliSec = duration.getMillis();
+    }
+
+    @Override
+    public boolean equals(Object obj) {
+      if (this == obj) {
+        return true;
+      }
+
+      if (obj == null || obj.getClass() != this.getClass()) {
+        return false;
+      }
+
+      SequenceDefinition src = (SequenceDefinition) obj;
+      return src.first.equals(this.first)
+          && src.last.equals(this.last)
+          && src.durationMilliSec.equals(this.durationMilliSec);
+    }
+
+    @Override
+    public int hashCode() {
+      int result = Objects.hash(first, last, durationMilliSec);
+      return result;
+    }
+  }
+
+  private PeriodicSequence() {}
+
+  public static PeriodicSequence create() {
+    return new PeriodicSequence();
+  }
+
+  @Experimental(Experimental.Kind.SPLITTABLE_DO_FN)
+  public static class OutputRangeTracker extends RestrictionTracker<OffsetRange, Long>
+      implements RestrictionTracker.HasProgress {
+    private OffsetRange range;
+    @Nullable private Long lastClaimedOffset = null;
+    @Nullable private Long lastAttemptedOffset = null;
+
+    public OutputRangeTracker(OffsetRange range) {
+      this.range = checkNotNull(range);
+      lastClaimedOffset = this.range.getFrom();
+      lastAttemptedOffset = lastClaimedOffset;
+    }
+
+    @Override
+    public OffsetRange currentRestriction() {
+      return range;
+    }
+
+    @Override
+    public SplitResult<OffsetRange> trySplit(double fractionOfRemainder) {
+      if (fractionOfRemainder != 0) {
+        return null;
+      }
+      OffsetRange res = new OffsetRange(lastClaimedOffset, range.getTo());
+      this.range = new OffsetRange(range.getFrom(), lastClaimedOffset);
+      return SplitResult.of(range, res);
+    }
+
+    @Override
+    public boolean tryClaim(Long i) {
+      checkArgument(
+          i > lastAttemptedOffset,
+          "Trying to claim offset %s while last attempted was %s",
+          i,
+          lastAttemptedOffset);
+      checkArgument(
+          i >= range.getFrom(), "Trying to claim offset %s before start of the range %s", i, range);
+      lastAttemptedOffset = i;
+      if (i > range.getTo()) {
+        return false;
+      }
+      lastClaimedOffset = i;
+      return true;
+    }
+
+    @Override
+    public void checkDone() throws IllegalStateException {
+      checkState(
+          lastAttemptedOffset >= range.getTo() - 1,
+          "Last attempted offset was %s in range %s, claiming work in (%s, %s] was not attempted",
+          lastAttemptedOffset,
+          range,
+          lastAttemptedOffset,
+          range.getTo());
+    }
+
+    @Override
+    public String toString() {
+      return MoreObjects.toStringHelper(this)
+          .add("range", range)
+          .add("lastClaimedOffset", lastClaimedOffset)
+          .add("lastAttemptedOffset", lastAttemptedOffset)
+          .toString();
+    }
+
+    @Override
+    public Progress getProgress() {
+      double workRemaining = Math.max(range.getTo() - lastAttemptedOffset, 0);
+      return Progress.from(range.getTo() - range.getFrom() - workRemaining, workRemaining);
+    }
+  }
+
+  private static class PeriodicSequenceFn extends DoFn<SequenceDefinition, Instant> {
+    @GetInitialRestriction
+    public OffsetRange getInitialRange(@Element SequenceDefinition element) {
+      return new OffsetRange(
+          element.first.getMillis() - element.durationMilliSec, element.last.getMillis());
+    }
+
+    @NewTracker
+    public RestrictionTracker<OffsetRange, Long> newTracker(@Restriction OffsetRange restriction) {
+      return new OutputRangeTracker(restriction);
+    }
+
+    @ProcessElement
+    public ProcessContinuation processElement(
+        @Element SequenceDefinition srcElement,
+        OutputReceiver<Instant> out,
+        RestrictionTracker<OffsetRange, Long> restrictionTracker) {
+
+      OffsetRange restriction = restrictionTracker.currentRestriction();
+      Long interval = srcElement.durationMilliSec;
+      Long nextOutput = restriction.getFrom() + interval;
+
+      boolean claimSuccess = true;
+
+      while (claimSuccess && Instant.ofEpochMilli(nextOutput).isBeforeNow()) {
+        claimSuccess = restrictionTracker.tryClaim(nextOutput);
+        if (claimSuccess) {
+          Instant output = Instant.ofEpochMilli(nextOutput);
+          out.outputWithTimestamp(output, output);
+          nextOutput = nextOutput + interval;
+        }
+      }
+
+      ProcessContinuation continuation = ProcessContinuation.stop();
+      if (claimSuccess) {
+        Duration offset = new Duration(Instant.now(), Instant.ofEpochMilli(nextOutput));
+        continuation = ProcessContinuation.resume().withResumeDelay(offset);
+      }
+      return continuation;
+    }
+  }
+
+  @Override
+  public PCollection<Instant> expand(PCollection<SequenceDefinition> input) {
+    return input.apply(ParDo.of(new PeriodicSequenceFn()));
+  }
+}
diff --git a/sdks/java/core/src/test/java/org/apache/beam/sdk/transforms/PeriodicImpulseTest.java b/sdks/java/core/src/test/java/org/apache/beam/sdk/transforms/PeriodicImpulseTest.java
new file mode 100644
index 0000000..2e01cd7
--- /dev/null
+++ b/sdks/java/core/src/test/java/org/apache/beam/sdk/transforms/PeriodicImpulseTest.java
@@ -0,0 +1,79 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.beam.sdk.transforms;
+
+import java.util.ArrayList;
+import org.apache.beam.sdk.testing.NeedsRunner;
+import org.apache.beam.sdk.testing.PAssert;
+import org.apache.beam.sdk.testing.TestPipeline;
+import org.apache.beam.sdk.testing.UsesImpulse;
+import org.apache.beam.sdk.testing.UsesStatefulParDo;
+import org.apache.beam.sdk.values.KV;
+import org.apache.beam.sdk.values.PCollection;
+import org.joda.time.Duration;
+import org.joda.time.Instant;
+import org.junit.Rule;
+import org.junit.Test;
+import org.junit.experimental.categories.Category;
+import org.junit.runner.RunWith;
+import org.junit.runners.JUnit4;
+
+/** Tests for PeriodicImpulse. */
+@RunWith(JUnit4.class)
+public class PeriodicImpulseTest {
+  @Rule public transient TestPipeline p = TestPipeline.create();
+
+  public static class ExtractTsDoFn<InputT> extends DoFn<InputT, KV<InputT, Instant>> {
+    @ProcessElement
+    public void processElement(DoFn<InputT, KV<InputT, Instant>>.ProcessContext c)
+        throws Exception {
+      c.output(KV.of(c.element(), c.timestamp()));
+    }
+  }
+
+  @Test
+  @Category({
+    NeedsRunner.class,
+    UsesImpulse.class,
+    UsesStatefulParDo.class,
+  })
+  public void testOutputsProperElements() {
+    Instant instant = Instant.now();
+
+    Instant startTime = instant.minus(Duration.standardHours(100));
+    long duration = 500;
+    Duration interval = Duration.millis(250);
+    long intervalMillis = interval.getMillis();
+    Instant stopTime = startTime.plus(duration);
+
+    PCollection<KV<Instant, Instant>> result =
+        p.apply(PeriodicImpulse.create().startAt(startTime).stopAt(stopTime).withInterval(interval))
+            .apply(ParDo.of(new ExtractTsDoFn<>()));
+
+    ArrayList<KV<Instant, Instant>> expectedResults =
+        new ArrayList<>((int) (duration / intervalMillis + 1));
+    for (long i = 0; i <= duration; i += intervalMillis) {
+      Instant el = startTime.plus(i);
+      expectedResults.add(KV.of(el, el));
+    }
+
+    PAssert.that(result).containsInAnyOrder(expectedResults);
+
+    p.run().waitUntilFinish();
+  }
+}
diff --git a/sdks/java/core/src/test/java/org/apache/beam/sdk/transforms/PeriodicSequenceTest.java b/sdks/java/core/src/test/java/org/apache/beam/sdk/transforms/PeriodicSequenceTest.java
new file mode 100644
index 0000000..cd6bc01
--- /dev/null
+++ b/sdks/java/core/src/test/java/org/apache/beam/sdk/transforms/PeriodicSequenceTest.java
@@ -0,0 +1,82 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.beam.sdk.transforms;
+
+import java.util.ArrayList;
+import org.apache.beam.sdk.testing.NeedsRunner;
+import org.apache.beam.sdk.testing.PAssert;
+import org.apache.beam.sdk.testing.TestPipeline;
+import org.apache.beam.sdk.testing.UsesImpulse;
+import org.apache.beam.sdk.testing.UsesStatefulParDo;
+import org.apache.beam.sdk.values.KV;
+import org.apache.beam.sdk.values.PCollection;
+import org.joda.time.Duration;
+import org.joda.time.Instant;
+import org.junit.Rule;
+import org.junit.Test;
+import org.junit.experimental.categories.Category;
+import org.junit.runner.RunWith;
+import org.junit.runners.JUnit4;
+
+/** Tests for PeriodicSequence. */
+@RunWith(JUnit4.class)
+public class PeriodicSequenceTest {
+  @Rule public transient TestPipeline p = TestPipeline.create();
+
+  public static class ExtractTsDoFn<InputT> extends DoFn<InputT, KV<InputT, Instant>> {
+    @ProcessElement
+    public void processElement(DoFn<InputT, KV<InputT, Instant>>.ProcessContext c)
+        throws Exception {
+      c.output(KV.of(c.element(), c.timestamp()));
+    }
+  }
+
+  @Test
+  @Category({
+    NeedsRunner.class,
+    UsesImpulse.class,
+    UsesStatefulParDo.class,
+  })
+  public void testOutputsProperElements() {
+    Instant instant = Instant.now();
+
+    Instant startTime = instant.minus(Duration.standardHours(100));
+    long duration = 500;
+    Duration interval = Duration.millis(250);
+    long intervalMillis = interval.getMillis();
+    Instant stopTime = startTime.plus(duration);
+
+    PCollection<KV<Instant, Instant>> result =
+        p.apply(
+                Create.<PeriodicSequence.SequenceDefinition>of(
+                    new PeriodicSequence.SequenceDefinition(startTime, stopTime, interval)))
+            .apply(PeriodicSequence.create())
+            .apply(ParDo.of(new ExtractTsDoFn<>())); // used to validate timestamp
+
+    ArrayList<KV<Instant, Instant>> expectedResults =
+        new ArrayList<>((int) (duration / intervalMillis + 1));
+    for (long i = 0; i <= duration; i += intervalMillis) {
+      Instant el = startTime.plus(i);
+      expectedResults.add(KV.of(el, el));
+    }
+
+    PAssert.that(result).containsInAnyOrder(expectedResults);
+
+    p.run().waitUntilFinish();
+  }
+}
diff --git a/website/src/documentation/patterns/side-inputs.md b/website/src/documentation/patterns/side-inputs.md
index 875b337..99aca3a 100644
--- a/website/src/documentation/patterns/side-inputs.md
+++ b/website/src/documentation/patterns/side-inputs.md
@@ -79,8 +79,10 @@
 1. Apply the side input.
 
 ```java
-No sample present.
+{% github_sample /apache/beam/blob/master/examples/java/src/main/java/org/apache/beam/examples/snippets/Snippets.java tag:PeriodicallyUpdatingSideInputs
+%}
 ```
+
 ```py
 {% github_sample /apache/beam/blob/master/sdks/python/apache_beam/examples/snippets/snippets.py tag:SideInputSlowUpdateSnip1
 %}