Merge pull request #10622: [BEAM=6857] Fix timermap test to not use TestStream

diff --git a/sdks/java/core/src/test/java/org/apache/beam/sdk/transforms/ParDoTest.java b/sdks/java/core/src/test/java/org/apache/beam/sdk/transforms/ParDoTest.java
index 57d8c5c..51e68d3 100644
--- a/sdks/java/core/src/test/java/org/apache/beam/sdk/transforms/ParDoTest.java
+++ b/sdks/java/core/src/test/java/org/apache/beam/sdk/transforms/ParDoTest.java
@@ -4297,12 +4297,7 @@
   public static class TimerFamilyTests extends SharedTestBase implements Serializable {
 
     @Test
-    @Category({
-      NeedsRunner.class,
-      UsesTimersInParDo.class,
-      UsesTestStream.class,
-      UsesTimerMap.class
-    })
+    @Category({NeedsRunner.class, UsesTimersInParDo.class, UsesTimerMap.class})
     public void testTimerFamilyEventTime() throws Exception {
       final String timerFamilyId = "foo";
 
@@ -4332,24 +4327,14 @@
             }
           };
 
-      TestStream<KV<String, Integer>> stream =
-          TestStream.create(KvCoder.of(StringUtf8Coder.of(), VarIntCoder.of()))
-              .advanceWatermarkTo(new Instant(0))
-              .addElements(KV.of("hello", 37))
-              .advanceWatermarkToInfinity();
-
-      PCollection<String> output = pipeline.apply(stream).apply(ParDo.of(fn));
+      PCollection<String> output =
+          pipeline.apply(Create.of(KV.of("hello", 37))).apply(ParDo.of(fn));
       PAssert.that(output).containsInAnyOrder("process", "timer1", "timer2");
       pipeline.run();
     }
 
     @Test
-    @Category({
-      NeedsRunner.class,
-      UsesTimersInParDo.class,
-      UsesTestStream.class,
-      UsesTimerMap.class
-    })
+    @Category({NeedsRunner.class, UsesTimersInParDo.class, UsesTimerMap.class})
     public void testTimerWithMultipleTimerFamily() throws Exception {
       final String timerFamilyId1 = "foo";
       final String timerFamilyId2 = "bar";
@@ -4386,13 +4371,8 @@
             }
           };
 
-      TestStream<KV<String, Integer>> stream =
-          TestStream.create(KvCoder.of(StringUtf8Coder.of(), VarIntCoder.of()))
-              .advanceWatermarkTo(new Instant(0))
-              .addElements(KV.of("hello", 37))
-              .advanceWatermarkToInfinity();
-
-      PCollection<String> output = pipeline.apply(stream).apply(ParDo.of(fn));
+      PCollection<String> output =
+          pipeline.apply(Create.of(KV.of("hello", 37))).apply(ParDo.of(fn));
       PAssert.that(output).containsInAnyOrder("process", "timer", "timer");
       pipeline.run();
     }