[BEAM-11666]  flake on RecordingManagerTest (#15118)

diff --git a/sdks/python/apache_beam/runners/interactive/recording_manager_test.py b/sdks/python/apache_beam/runners/interactive/recording_manager_test.py
index ca44ca3..7b7a6e9 100644
--- a/sdks/python/apache_beam/runners/interactive/recording_manager_test.py
+++ b/sdks/python/apache_beam/runners/interactive/recording_manager_test.py
@@ -437,6 +437,21 @@
         set(pipeline_instrument.cache_key(pc) for pc in (elems, squares)))
 
   def test_clear(self):
+    p1 = beam.Pipeline(InteractiveRunner())
+    elems_1 = p1 | 'elems 1' >> beam.Create([0, 1, 2])
+
+    ib.watch(locals())
+    ie.current_env().track_user_pipelines()
+
+    recording_manager = RecordingManager(p1)
+    recording = recording_manager.record([elems_1], max_n=3, max_duration=500)
+    recording.wait_until_finish()
+    record_describe = recording_manager.describe()
+    self.assertGreater(record_describe['size'], 0)
+    recording_manager.clear()
+    self.assertEqual(recording_manager.describe()['size'], 0)
+
+  def test_clear_specific_pipeline(self):
     """Tests that clear can empty the cache for a specific pipeline."""
 
     # Create two pipelines so we can check that clearing the cache won't clear
@@ -461,16 +476,18 @@
     rm_2 = RecordingManager(p2)
     recording = rm_2.record([elems_2], max_n=3, max_duration=500)
     recording.wait_until_finish()
-
     # Assert that clearing only one recording clears that recording.
-    self.assertGreater(rm_1.describe()['size'], 0)
-    self.assertGreater(rm_2.describe()['size'], 0)
-    rm_1.clear()
-    self.assertEqual(rm_1.describe()['size'], 0)
-    self.assertGreater(rm_2.describe()['size'], 0)
+    if rm_1.describe()['state'] == PipelineState.STOPPED \
+            and rm_2.describe()['state'] == PipelineState.STOPPED:
 
-    rm_2.clear()
-    self.assertEqual(rm_2.describe()['size'], 0)
+      self.assertGreater(rm_1.describe()['size'], 0)
+      self.assertGreater(rm_2.describe()['size'], 0)
+      rm_1.clear()
+      self.assertEqual(rm_1.describe()['size'], 0)
+      self.assertGreater(rm_2.describe()['size'], 0)
+
+      rm_2.clear()
+      self.assertEqual(rm_2.describe()['size'], 0)
 
   def test_record_pipeline(self):
     # Add the TestStream so that it can be cached.