Merge pull request #10622: [BEAM=6857] Fix timermap test to not use TestStream
diff --git a/sdks/java/core/src/test/java/org/apache/beam/sdk/transforms/ParDoTest.java b/sdks/java/core/src/test/java/org/apache/beam/sdk/transforms/ParDoTest.java
index 57d8c5c..51e68d3 100644
--- a/sdks/java/core/src/test/java/org/apache/beam/sdk/transforms/ParDoTest.java
+++ b/sdks/java/core/src/test/java/org/apache/beam/sdk/transforms/ParDoTest.java
@@ -4297,12 +4297,7 @@
public static class TimerFamilyTests extends SharedTestBase implements Serializable {
@Test
- @Category({
- NeedsRunner.class,
- UsesTimersInParDo.class,
- UsesTestStream.class,
- UsesTimerMap.class
- })
+ @Category({NeedsRunner.class, UsesTimersInParDo.class, UsesTimerMap.class})
public void testTimerFamilyEventTime() throws Exception {
final String timerFamilyId = "foo";
@@ -4332,24 +4327,14 @@
}
};
- TestStream<KV<String, Integer>> stream =
- TestStream.create(KvCoder.of(StringUtf8Coder.of(), VarIntCoder.of()))
- .advanceWatermarkTo(new Instant(0))
- .addElements(KV.of("hello", 37))
- .advanceWatermarkToInfinity();
-
- PCollection<String> output = pipeline.apply(stream).apply(ParDo.of(fn));
+ PCollection<String> output =
+ pipeline.apply(Create.of(KV.of("hello", 37))).apply(ParDo.of(fn));
PAssert.that(output).containsInAnyOrder("process", "timer1", "timer2");
pipeline.run();
}
@Test
- @Category({
- NeedsRunner.class,
- UsesTimersInParDo.class,
- UsesTestStream.class,
- UsesTimerMap.class
- })
+ @Category({NeedsRunner.class, UsesTimersInParDo.class, UsesTimerMap.class})
public void testTimerWithMultipleTimerFamily() throws Exception {
final String timerFamilyId1 = "foo";
final String timerFamilyId2 = "bar";
@@ -4386,13 +4371,8 @@
}
};
- TestStream<KV<String, Integer>> stream =
- TestStream.create(KvCoder.of(StringUtf8Coder.of(), VarIntCoder.of()))
- .advanceWatermarkTo(new Instant(0))
- .addElements(KV.of("hello", 37))
- .advanceWatermarkToInfinity();
-
- PCollection<String> output = pipeline.apply(stream).apply(ParDo.of(fn));
+ PCollection<String> output =
+ pipeline.apply(Create.of(KV.of("hello", 37))).apply(ParDo.of(fn));
PAssert.that(output).containsInAnyOrder("process", "timer", "timer");
pipeline.run();
}