[BEAM-11666] flake on RecordingManagerTest (#15118)
diff --git a/sdks/python/apache_beam/runners/interactive/recording_manager_test.py b/sdks/python/apache_beam/runners/interactive/recording_manager_test.py
index ca44ca3..7b7a6e9 100644
--- a/sdks/python/apache_beam/runners/interactive/recording_manager_test.py
+++ b/sdks/python/apache_beam/runners/interactive/recording_manager_test.py
@@ -437,6 +437,21 @@
set(pipeline_instrument.cache_key(pc) for pc in (elems, squares)))
def test_clear(self):
+ p1 = beam.Pipeline(InteractiveRunner())
+ elems_1 = p1 | 'elems 1' >> beam.Create([0, 1, 2])
+
+ ib.watch(locals())
+ ie.current_env().track_user_pipelines()
+
+ recording_manager = RecordingManager(p1)
+ recording = recording_manager.record([elems_1], max_n=3, max_duration=500)
+ recording.wait_until_finish()
+ record_describe = recording_manager.describe()
+ self.assertGreater(record_describe['size'], 0)
+ recording_manager.clear()
+ self.assertEqual(recording_manager.describe()['size'], 0)
+
+ def test_clear_specific_pipeline(self):
"""Tests that clear can empty the cache for a specific pipeline."""
# Create two pipelines so we can check that clearing the cache won't clear
@@ -461,16 +476,18 @@
rm_2 = RecordingManager(p2)
recording = rm_2.record([elems_2], max_n=3, max_duration=500)
recording.wait_until_finish()
-
# Assert that clearing only one recording clears that recording.
- self.assertGreater(rm_1.describe()['size'], 0)
- self.assertGreater(rm_2.describe()['size'], 0)
- rm_1.clear()
- self.assertEqual(rm_1.describe()['size'], 0)
- self.assertGreater(rm_2.describe()['size'], 0)
+ if rm_1.describe()['state'] == PipelineState.STOPPED \
+ and rm_2.describe()['state'] == PipelineState.STOPPED:
- rm_2.clear()
- self.assertEqual(rm_2.describe()['size'], 0)
+ self.assertGreater(rm_1.describe()['size'], 0)
+ self.assertGreater(rm_2.describe()['size'], 0)
+ rm_1.clear()
+ self.assertEqual(rm_1.describe()['size'], 0)
+ self.assertGreater(rm_2.describe()['size'], 0)
+
+ rm_2.clear()
+ self.assertEqual(rm_2.describe()['size'], 0)
def test_record_pipeline(self):
# Add the TestStream so that it can be cached.