Merge pull request #15490 from KevinGG/beam_sql_out_cache

[BEAM-10708] Introspect beam_sql output
diff --git a/sdks/python/apache_beam/pipeline.py b/sdks/python/apache_beam/pipeline.py
index 287ec49..c83bc5c 100644
--- a/sdks/python/apache_beam/pipeline.py
+++ b/sdks/python/apache_beam/pipeline.py
@@ -96,6 +96,7 @@
 from apache_beam.utils import subprocess_server
 from apache_beam.utils.annotations import deprecated
 from apache_beam.utils.interactive_utils import alter_label_if_ipython
+from apache_beam.utils.interactive_utils import is_in_ipython
 
 if TYPE_CHECKING:
   from types import TracebackType
@@ -565,7 +566,9 @@
           shutil.rmtree(tmpdir)
       return self.runner.run_pipeline(self, self._options)
     finally:
-      shutil.rmtree(self.local_tempdir, ignore_errors=True)
+      if not is_in_ipython():
+        shutil.rmtree(self.local_tempdir, ignore_errors=True)
+      # else interactive beam handles the cleanup.
 
   def __enter__(self):
     # type: () -> Pipeline
diff --git a/sdks/python/apache_beam/runners/interactive/augmented_pipeline.py b/sdks/python/apache_beam/runners/interactive/augmented_pipeline.py
index 3243163..37f914b 100644
--- a/sdks/python/apache_beam/runners/interactive/augmented_pipeline.py
+++ b/sdks/python/apache_beam/runners/interactive/augmented_pipeline.py
@@ -86,9 +86,7 @@
         if (isinstance(val, beam.pvalue.PCollection) and
             val.pipeline is self._user_pipeline and
             (not self._pcolls or val in self._pcolls)):
-          pcoll_id = self._context.pcollections.get_id(val)
           c[val] = Cacheable(
-              pcoll_id=pcoll_id,
               var=key,
               pcoll=val,
               version=str(id(val)),
diff --git a/sdks/python/apache_beam/runners/interactive/background_caching_job.py b/sdks/python/apache_beam/runners/interactive/background_caching_job.py
index c08c5d6..5219538 100644
--- a/sdks/python/apache_beam/runners/interactive/background_caching_job.py
+++ b/sdks/python/apache_beam/runners/interactive/background_caching_job.py
@@ -44,6 +44,7 @@
 
 import apache_beam as beam
 from apache_beam.runners.interactive import interactive_environment as ie
+from apache_beam.runners.interactive import utils
 from apache_beam.runners.interactive.caching import streaming_cache
 from apache_beam.runners.runner import PipelineState
 
@@ -221,10 +222,9 @@
   Throughout the check, if source-to-cache has changed from the last check, it
   also cleans up the invalidated cache early on.
   """
-  from apache_beam.runners.interactive import pipeline_instrument as instr
   # TODO(BEAM-8335): we temporarily only cache replaceable unbounded sources.
   # Add logic for other cacheable sources here when they are available.
-  has_cache = instr.has_unbounded_sources(user_pipeline)
+  has_cache = utils.has_unbounded_sources(user_pipeline)
   if has_cache:
     if not isinstance(ie.current_env().get_cache_manager(user_pipeline,
                                                          create_if_absent=True),
@@ -331,10 +331,9 @@
 
   A signature is a str representation of urn and payload of a source.
   """
-  from apache_beam.runners.interactive import pipeline_instrument as instr
   # TODO(BEAM-8335): we temporarily only cache replaceable unbounded sources.
   # Add logic for other cacheable sources here when they are available.
-  unbounded_sources_as_applied_transforms = instr.unbounded_sources(
+  unbounded_sources_as_applied_transforms = utils.unbounded_sources(
       user_pipeline)
   unbounded_sources_as_ptransforms = set(
       map(lambda x: x.transform, unbounded_sources_as_applied_transforms))
diff --git a/sdks/python/apache_beam/runners/interactive/caching/cacheable.py b/sdks/python/apache_beam/runners/interactive/caching/cacheable.py
index 96663a7..f69324e 100644
--- a/sdks/python/apache_beam/runners/interactive/caching/cacheable.py
+++ b/sdks/python/apache_beam/runners/interactive/caching/cacheable.py
@@ -26,24 +26,22 @@
 from dataclasses import dataclass
 
 import apache_beam as beam
-from apache_beam.runners.interactive.utils import obfuscate
 
 
 @dataclass
 class Cacheable:
-  pcoll_id: str
   var: str
   version: str
-  pcoll: beam.pvalue.PCollection
   producer_version: str
+  pcoll: beam.pvalue.PCollection
 
   def __hash__(self):
-    return hash((
-        self.pcoll_id,
-        self.var,
-        self.version,
-        self.pcoll,
-        self.producer_version))
+    return hash((self.var, self.version, self.producer_version, self.pcoll))
+
+  @staticmethod
+  def from_pcoll(
+      pcoll_name: str, pcoll: beam.pvalue.PCollection) -> 'Cacheable':
+    return Cacheable(pcoll_name, str(id(pcoll)), str(id(pcoll.producer)), pcoll)
 
   def to_key(self):
     return CacheKey(
@@ -55,22 +53,50 @@
 
 @dataclass
 class CacheKey:
+  """The identifier of a cacheable PCollection in cache.
+
+  It contains 4 stringified components:
+  var: The obfuscated variable name of the PCollection.
+  version: The id() of the PCollection.
+  producer_version: The id() of the producer of the PCollection.
+  pipeline_id: The id() of the pipeline the PCollection belongs to.
+  """
   var: str
   version: str
   producer_version: str
   pipeline_id: str
 
   def __post_init__(self):
+    from apache_beam.runners.interactive.utils import obfuscate
     # Normalize arbitrary variable name to a fixed length hex str.
     self.var = obfuscate(self.var)[:10]
 
+  def __hash__(self):
+    return hash(
+        (self.var, self.version, self.producer_version, self.pipeline_id))
+
   @staticmethod
-  def from_str(r):
+  def from_str(r: str) -> 'CacheKey':
     r_split = r.split('-')
     ck = CacheKey(*r_split)
+    # Avoid double obfuscation.
     ck.var = r_split[0]
     return ck
 
-  def __repr__(self):
+  @staticmethod
+  def from_pcoll(pcoll_name: str, pcoll: beam.pvalue.PCollection) -> 'CacheKey':
+    return CacheKey(
+        pcoll_name,
+        str(id(pcoll)),
+        str(id(pcoll.producer)),
+        str(id(pcoll.pipeline)))
+
+  def to_str(self):
     return '-'.join(
         [self.var, self.version, self.producer_version, self.pipeline_id])
+
+  def __repr__(self):
+    return self.to_str()
+
+  def __str__(self):
+    return self.to_str()
diff --git a/sdks/python/apache_beam/runners/interactive/caching/read_cache.py b/sdks/python/apache_beam/runners/interactive/caching/read_cache.py
index b23681d..cf0859d 100644
--- a/sdks/python/apache_beam/runners/interactive/caching/read_cache.py
+++ b/sdks/python/apache_beam/runners/interactive/caching/read_cache.py
@@ -27,6 +27,7 @@
 from apache_beam.portability.api import beam_runner_api_pb2
 from apache_beam.runners.interactive import cache_manager as cache
 from apache_beam.runners.interactive.caching.cacheable import Cacheable
+from apache_beam.runners.interactive.caching.reify import unreify_from_cache
 from apache_beam.runners.pipeline_context import PipelineContext
 from apache_beam.transforms.ptransform import PTransform
 
@@ -45,7 +46,6 @@
     self._cache_manager = cache_manager
     self._cacheable = cacheable
     self._key = repr(cacheable.to_key())
-    self._label = '{}{}'.format('_cache_', self._key)
 
   def read_cache(self) -> Tuple[str, str]:
     """Reads cache of the cacheable PCollection and wires the cache into the
@@ -119,28 +119,22 @@
 
   def _build_runner_api_template(
       self) -> Tuple[beam_runner_api_pb2.Pipeline, beam.pvalue.PCollection]:
-    transform = _ReadCacheTransform(self._cache_manager, self._key, self._label)
+    transform = _ReadCacheTransform(self._cache_manager, self._key)
     tmp_pipeline = beam.Pipeline()
     tmp_pipeline.component_id_map = self._context.component_id_map
-    read_output = tmp_pipeline | 'source' + self._label >> transform
+    read_output = tmp_pipeline | 'source_cache_' >> transform
     return tmp_pipeline.to_runner_api(), read_output
 
 
 class _ReadCacheTransform(PTransform):
   """A composite transform encapsulates reading cache of PCollections.
   """
-  def __init__(self, cache_manager: cache.CacheManager, key: str, label: str):
+  def __init__(self, cache_manager: cache.CacheManager, key: str):
     self._cache_manager = cache_manager
     self._key = key
-    self._label = label
 
   def expand(self, pcoll: beam.pvalue.PCollection) -> beam.pvalue.PCollection:
-    class Unreify(beam.DoFn):
-      def process(self, e):
-        yield e.windowed_value
-
-    return (
-        pcoll.pipeline
-        |
-        'read' + self._label >> cache.ReadCache(self._cache_manager, self._key)
-        | 'unreify' + self._label >> beam.ParDo(Unreify()))
+    return unreify_from_cache(
+        pipeline=pcoll.pipeline,
+        cache_key=self._key,
+        cache_manager=self._cache_manager)
diff --git a/sdks/python/apache_beam/runners/interactive/caching/read_cache_test.py b/sdks/python/apache_beam/runners/interactive/caching/read_cache_test.py
index aa2ed20..d32c265 100644
--- a/sdks/python/apache_beam/runners/interactive/caching/read_cache_test.py
+++ b/sdks/python/apache_beam/runners/interactive/caching/read_cache_test.py
@@ -69,9 +69,8 @@
     actual_pipeline = pipeline_proto
 
     # Read cache directly on the pipeline instance.
-    label = '{}{}'.format('_cache_', key)
-    transform = read_cache._ReadCacheTransform(aug_p._cache_manager, key, label)
-    p | 'source' + label >> transform
+    transform = read_cache._ReadCacheTransform(aug_p._cache_manager, key)
+    p | 'source_cache_' + key >> transform
     expected_pipeline = p.to_runner_api()
 
     # This rougly checks the equivalence between two protos, not detailed
diff --git a/sdks/python/apache_beam/runners/interactive/caching/reify.py b/sdks/python/apache_beam/runners/interactive/caching/reify.py
new file mode 100644
index 0000000..ce82785
--- /dev/null
+++ b/sdks/python/apache_beam/runners/interactive/caching/reify.py
@@ -0,0 +1,117 @@
+#
+# Licensed to the Apache Software Foundation (ASF) under one or more
+# contributor license agreements.  See the NOTICE file distributed with
+# this work for additional information regarding copyright ownership.
+# The ASF licenses this file to You under the Apache License, Version 2.0
+# (the "License"); you may not use this file except in compliance with
+# the License.  You may obtain a copy of the License at
+#
+#    http://www.apache.org/licenses/LICENSE-2.0
+#
+# Unless required by applicable law or agreed to in writing, software
+# distributed under the License is distributed on an "AS IS" BASIS,
+# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+# See the License for the specific language governing permissions and
+# limitations under the License.
+#
+
+"""Module for transforms that reifies and unreifies PCollection values with
+window info.
+
+For internal use only; no backwards-compatibility guarantees.
+"""
+
+# pytype: skip-file
+
+from typing import Optional
+
+import apache_beam as beam
+from apache_beam.runners.interactive import cache_manager as cache
+from apache_beam.testing import test_stream
+from apache_beam.transforms.window import WindowedValue
+
+READ_CACHE = 'ReadCache_'
+WRITE_CACHE = 'WriteCache_'
+
+
+class Reify(beam.DoFn):
+  """Reifies elements with window info into windowed values.
+
+  Internally used to capture window info with each element into cache for
+  replayability.
+  """
+  def process(
+      self,
+      e,
+      w=beam.DoFn.WindowParam,
+      p=beam.DoFn.PaneInfoParam,
+      t=beam.DoFn.TimestampParam):
+    yield test_stream.WindowedValueHolder(WindowedValue(e, t, [w], p))
+
+
+class Unreify(beam.DoFn):
+  """Unreifies elements from windowed values.
+
+  Cached values are elements with window info. This unpacks the elements.
+  """
+  def process(self, e):
+    # Row coder was used when encoding windowed values.
+    if isinstance(e, beam.Row) and hasattr(e, 'windowed_value'):
+      yield e.windowed_value
+
+
+def reify_to_cache(
+    pcoll: beam.pvalue.PCollection,
+    cache_key: str,
+    cache_manager: cache.CacheManager,
+    reify_label: Optional[str] = None,
+    write_cache_label: Optional[str] = None,
+    is_capture: bool = False) -> beam.pvalue.PValue:
+  """Reifies elements into windowed values and write to cache.
+
+  Args:
+    pcoll: The PCollection to be cached.
+    cache_key: The key of the cache.
+    cache_manager: The cache manager to manage the cache.
+    reify_label: (optional) A transform label for the Reify transform.
+    write_cache_label: (optional) A transform label for the cache-writing
+      transform.
+    is_capture: Whether the cache is capturing a record of recordable sources.
+  """
+  if not reify_label:
+    reify_label = '{}{}{}'.format('ReifyBefore_', WRITE_CACHE, cache_key)
+  if not write_cache_label:
+    write_cache_label = '{}{}'.format(WRITE_CACHE, cache_key)
+  return (
+      pcoll | reify_label >> beam.ParDo(Reify())
+      | write_cache_label >> cache.WriteCache(
+          cache_manager, cache_key, is_capture=is_capture))
+
+
+def unreify_from_cache(
+    pipeline: beam.Pipeline,
+    cache_key: str,
+    cache_manager: cache.CacheManager,
+    element_type: Optional[type] = None,
+    source_label: Optional[str] = None,
+    unreify_label: Optional[str] = None) -> beam.pvalue.PCollection:
+  """Reads from cache and unreifies elements from windowed values.
+
+  pipeline: The pipeline that's reading from the cache.
+  cache_key: The key of the cache.
+  cache_manager: The cache manager to manage the cache.
+  element_type: (optional) The element type of the PCollection's elements.
+  source_label: (optional) A transform label for the cache-reading transform.
+  unreify_label: (optional) A transform label for the Unreify transform.
+  """
+  if not source_label:
+    source_label = '{}{}'.format(READ_CACHE, cache_key)
+  if not unreify_label:
+    unreify_label = '{}{}{}'.format('UnreifyAfter_', READ_CACHE, cache_key)
+  read_cache = pipeline | source_label >> cache.ReadCache(
+      cache_manager, cache_key)
+  if element_type:
+    # If the PCollection is schema-aware, explicitly sets the output types.
+    return read_cache | unreify_label >> beam.ParDo(
+        Unreify()).with_output_types(element_type)
+  return read_cache | unreify_label >> beam.ParDo(Unreify())
diff --git a/sdks/python/apache_beam/runners/interactive/caching/write_cache.py b/sdks/python/apache_beam/runners/interactive/caching/write_cache.py
index 94effdf..d398e70 100644
--- a/sdks/python/apache_beam/runners/interactive/caching/write_cache.py
+++ b/sdks/python/apache_beam/runners/interactive/caching/write_cache.py
@@ -27,10 +27,9 @@
 from apache_beam.portability.api import beam_runner_api_pb2
 from apache_beam.runners.interactive import cache_manager as cache
 from apache_beam.runners.interactive.caching.cacheable import Cacheable
+from apache_beam.runners.interactive.caching.reify import reify_to_cache
 from apache_beam.runners.pipeline_context import PipelineContext
-from apache_beam.testing import test_stream
 from apache_beam.transforms.ptransform import PTransform
-from apache_beam.transforms.window import WindowedValue
 
 
 class WriteCache:
@@ -46,8 +45,7 @@
     self._context = context
     self._cache_manager = cache_manager
     self._cacheable = cacheable
-    self._key = repr(cacheable.to_key())
-    self._label = '{}{}'.format('_cache_', self._key)
+    self._key = cacheable.to_key().to_str()
 
   def write_cache(self) -> None:
     """Writes cache for the cacheable PCollection that is being computed.
@@ -129,35 +127,21 @@
   def _build_runner_api_template(
       self) -> Tuple[beam_runner_api_pb2.Pipeline, '_PCollectionPlaceHolder']:
     pph = _PCollectionPlaceHolder(self._cacheable.pcoll, self._context)
-    transform = _WriteCacheTransform(
-        self._cache_manager, self._key, self._label)
-    _ = pph.placeholder_pcoll | 'sink' + self._label >> transform
+    transform = _WriteCacheTransform(self._cache_manager, self._key)
+    _ = pph.placeholder_pcoll | 'sink_cache_' + self._key >> transform
     return pph.placeholder_pcoll.pipeline.to_runner_api(), pph
 
 
 class _WriteCacheTransform(PTransform):
   """A composite transform encapsulates writing cache for PCollections.
   """
-  def __init__(self, cache_manager: cache.CacheManager, key: str, label: str):
+  def __init__(self, cache_manager: cache.CacheManager, key: str):
     self._cache_manager = cache_manager
     self._key = key
-    self._label = label
 
-  def expand(self, pcoll: beam.pvalue.PCollection) -> beam.pvalue.PCollection:
-    class Reify(beam.DoFn):
-      def process(
-          self,
-          e,
-          w=beam.DoFn.WindowParam,
-          p=beam.DoFn.PaneInfoParam,
-          t=beam.DoFn.TimestampParam):
-        yield test_stream.WindowedValueHolder(WindowedValue(e, t, [w], p))
-
-    return (
-        pcoll
-        | 'reify' + self._label >> beam.ParDo(Reify())
-        | 'write' + self._label >> cache.WriteCache(
-            self._cache_manager, self._key, is_capture=False))
+  def expand(self, pcoll: beam.pvalue.PCollection) -> beam.pvalue.PValue:
+    return reify_to_cache(
+        pcoll=pcoll, cache_key=self._key, cache_manager=self._cache_manager)
 
 
 class _PCollectionPlaceHolder:
diff --git a/sdks/python/apache_beam/runners/interactive/caching/write_cache_test.py b/sdks/python/apache_beam/runners/interactive/caching/write_cache_test.py
index af8dc7b..588efdc 100644
--- a/sdks/python/apache_beam/runners/interactive/caching/write_cache_test.py
+++ b/sdks/python/apache_beam/runners/interactive/caching/write_cache_test.py
@@ -57,10 +57,8 @@
     actual_pipeline = pipeline_proto
 
     # Write cache directly on the piepline instance.
-    label = '{}{}'.format('_cache_', key)
-    transform = write_cache._WriteCacheTransform(
-        aug_p._cache_manager, key, label)
-    _ = pcoll | 'sink' + label >> transform
+    transform = write_cache._WriteCacheTransform(aug_p._cache_manager, key)
+    _ = pcoll | 'sink_cache_' + key >> transform
     expected_pipeline = p.to_runner_api()
 
     assert_pipeline_proto_equal(self, expected_pipeline, actual_pipeline)
diff --git a/sdks/python/apache_beam/runners/interactive/display/pcoll_visualization.py b/sdks/python/apache_beam/runners/interactive/display/pcoll_visualization.py
index 89800d8..9e071fe 100644
--- a/sdks/python/apache_beam/runners/interactive/display/pcoll_visualization.py
+++ b/sdks/python/apache_beam/runners/interactive/display/pcoll_visualization.py
@@ -30,6 +30,7 @@
 
 from dateutil import tz
 
+import apache_beam as beam
 from apache_beam.runners.interactive import interactive_environment as ie
 from apache_beam.runners.interactive.utils import elements_to_df
 from apache_beam.transforms.window import GlobalWindow
@@ -236,6 +237,22 @@
   return None
 
 
+def visualize_computed_pcoll(
+    pcoll_name: str, pcoll: beam.pvalue.PCollection) -> None:
+  """A simple visualize alternative.
+
+  When the pcoll_name and pcoll pair identifies a watched and computed
+  PCollection in the current interactive environment without ambiguity, an
+  ElementStream can be built directly from cache.
+  """
+  pipeline = ie.current_env().user_pipeline(pcoll.pipeline)
+  rm = ie.current_env().get_recording_manager(pipeline, create_if_absent=True)
+  stream = rm.read(
+      pcoll_name, pcoll, max_n=float('inf'), max_duration_secs=float('inf'))
+  if stream:
+    visualize(stream, element_type=pcoll.element_type)
+
+
 class PCollectionVisualization(object):
   """A visualization of a PCollection.
 
diff --git a/sdks/python/apache_beam/runners/interactive/display/pipeline_graph.py b/sdks/python/apache_beam/runners/interactive/display/pipeline_graph.py
index d084569..a94113d 100644
--- a/sdks/python/apache_beam/runners/interactive/display/pipeline_graph.py
+++ b/sdks/python/apache_beam/runners/interactive/display/pipeline_graph.py
@@ -182,8 +182,8 @@
       for pcoll_id in transform.outputs.values():
         pcoll_node = None
         if self._pipeline_instrument:
-          pcoll_node = self._pipeline_instrument.cacheable_var_by_pcoll_id(
-              pcoll_id)
+          cacheable = self._pipeline_instrument.cacheables.get(pcoll_id)
+          pcoll_node = cacheable.var if cacheable else None
         # If no PipelineInstrument is available or the PCollection is not
         # watched.
         if not pcoll_node:
diff --git a/sdks/python/apache_beam/runners/interactive/interactive_environment.py b/sdks/python/apache_beam/runners/interactive/interactive_environment.py
index 102f10a..e22dee7 100644
--- a/sdks/python/apache_beam/runners/interactive/interactive_environment.py
+++ b/sdks/python/apache_beam/runners/interactive/interactive_environment.py
@@ -201,6 +201,10 @@
     # A singleton inspector instance to message information of current
     # environment to other applications.
     self._inspector = InteractiveEnvironmentInspector()
+    # A similar singleton inspector except it includes synthetic variables
+    # generated by Interactive Beam.
+    self._inspector_with_synthetic = InteractiveEnvironmentInspector(
+        ignore_synthetic=False)
 
   @property
   def options(self):
@@ -235,9 +239,16 @@
   @property
   def inspector(self):
     """Gets the singleton InteractiveEnvironmentInspector to retrieve
-    information consumable by other applications."""
+    information consumable by other applications such as a notebook
+    extension."""
     return self._inspector
 
+  @property
+  def inspector_with_synthetic(self):
+    """Gets the singleton InteractiveEnvironmentInspector with additional
+    synthetic variables generated by Interactive Beam. Internally used."""
+    return self._inspector_with_synthetic
+
   def cleanup(self, pipeline=None):
     """Cleans up cached states for the given pipeline. Noop if the given
     pipeline is absent from the environment. Cleans up for all pipelines
@@ -560,6 +571,8 @@
     """Evicts the user pipeline and its derived pipelines."""
     if user_pipeline:
       self._tracked_user_pipelines.evict(user_pipeline)
+    else:
+      self._tracked_user_pipelines.clear()
 
   def pipeline_id_to_pipeline(self, pid):
     """Converts a pipeline id to a user pipeline.
diff --git a/sdks/python/apache_beam/runners/interactive/interactive_runner.py b/sdks/python/apache_beam/runners/interactive/interactive_runner.py
index 68a66e0..4778737 100644
--- a/sdks/python/apache_beam/runners/interactive/interactive_runner.py
+++ b/sdks/python/apache_beam/runners/interactive/interactive_runner.py
@@ -34,6 +34,7 @@
 from apache_beam.runners.interactive.display import pipeline_graph
 from apache_beam.runners.interactive.options import capture_control
 from apache_beam.runners.interactive.utils import to_element_list
+from apache_beam.runners.interactive.utils import watch_sources
 from apache_beam.testing.test_stream_service import TestStreamServiceController
 
 # size of PCollection samples cached.
@@ -129,7 +130,7 @@
       ie.current_env().evict_computed_pcollections()
 
     # Make sure that sources without a user reference are still cached.
-    inst.watch_sources(pipeline)
+    watch_sources(pipeline)
 
     user_pipeline = ie.current_env().user_pipeline(pipeline)
     pipeline_instrument = inst.build_pipeline_instrument(pipeline, options)
diff --git a/sdks/python/apache_beam/runners/interactive/messaging/interactive_environment_inspector.py b/sdks/python/apache_beam/runners/interactive/messaging/interactive_environment_inspector.py
index ed3dc51..db53e87 100644
--- a/sdks/python/apache_beam/runners/interactive/messaging/interactive_environment_inspector.py
+++ b/sdks/python/apache_beam/runners/interactive/messaging/interactive_environment_inspector.py
@@ -36,16 +36,17 @@
   list_inspectables first then communicates back to the kernel and get_val for
   usage on the kernel side.
   """
-  def __init__(self):
+  def __init__(self, ignore_synthetic=True):
     self._inspectables = {}
     self._anonymous = {}
     self._inspectable_pipelines = set()
+    self._ignore_synthetic = ignore_synthetic
 
   @property
   def inspectables(self):
     """Lists pipelines and pcollections assigned to variables as inspectables.
     """
-    self._inspectables = inspect()
+    self._inspectables = inspect(self._ignore_synthetic)
     return self._inspectables
 
   @property
@@ -136,7 +137,7 @@
     return {}
 
 
-def inspect():
+def inspect(ignore_synthetic=True):
   """Inspects current interactive environment to track metadata and values of
   pipelines and pcollections.
 
@@ -148,7 +149,7 @@
   for watching in ie.current_env().watching():
     for name, value in watching:
       # Ignore synthetic vars created by Interactive Beam itself.
-      if name.startswith('synthetic_var_'):
+      if ignore_synthetic and name.startswith('synthetic_var_'):
         continue
       metadata = meta(name, value)
       identifier = obfuscate(metadata)
diff --git a/sdks/python/apache_beam/runners/interactive/pipeline_fragment.py b/sdks/python/apache_beam/runners/interactive/pipeline_fragment.py
index 6c0a922..7564a76 100644
--- a/sdks/python/apache_beam/runners/interactive/pipeline_fragment.py
+++ b/sdks/python/apache_beam/runners/interactive/pipeline_fragment.py
@@ -65,7 +65,7 @@
     self._runner_pipeline = self._build_runner_pipeline()
     _, self._context = self._runner_pipeline.to_runner_api(return_context=True)
     from apache_beam.runners.interactive import pipeline_instrument as instr
-    self._runner_pcoll_to_id = instr.pcolls_to_pcoll_id(
+    self._runner_pcoll_to_id = instr.pcoll_to_pcoll_id(
         self._runner_pipeline, self._context)
     # Correlate components in the runner pipeline to components in the user
     # pipeline. The target pcolls are the pcolls given and defined in the user
diff --git a/sdks/python/apache_beam/runners/interactive/pipeline_instrument.py b/sdks/python/apache_beam/runners/interactive/pipeline_instrument.py
index 448fca7..065d555 100644
--- a/sdks/python/apache_beam/runners/interactive/pipeline_instrument.py
+++ b/sdks/python/apache_beam/runners/interactive/pipeline_instrument.py
@@ -23,20 +23,24 @@
 """
 # pytype: skip-file
 
+import logging
+from typing import Dict
+
 import apache_beam as beam
 from apache_beam.pipeline import PipelineVisitor
 from apache_beam.portability.api import beam_runner_api_pb2
-from apache_beam.runners.interactive import cache_manager as cache
 from apache_beam.runners.interactive import interactive_environment as ie
 from apache_beam.runners.interactive import pipeline_fragment as pf
 from apache_beam.runners.interactive import background_caching_job
+from apache_beam.runners.interactive import utils
 from apache_beam.runners.interactive.caching.cacheable import Cacheable
 from apache_beam.runners.interactive.caching.cacheable import CacheKey
+from apache_beam.runners.interactive.caching.reify import WRITE_CACHE
+from apache_beam.runners.interactive.caching.reify import reify_to_cache
+from apache_beam.runners.interactive.caching.reify import unreify_from_cache
 from apache_beam.testing import test_stream
-from apache_beam.transforms.window import WindowedValue
 
-READ_CACHE = "_ReadCache_"
-WRITE_CACHE = "_WriteCache_"
+_LOGGER = logging.getLogger(__name__)
 
 
 class PipelineInstrument(object):
@@ -74,20 +78,13 @@
      context) = self._pipeline.to_runner_api(return_context=True)
 
     # All compute-once-against-original-pipeline fields.
-    self._unbounded_sources = unbounded_sources(
+    self._unbounded_sources = utils.unbounded_sources(
         self._background_caching_pipeline)
-    # TODO(BEAM-7760): once cache scope changed, this is not needed to manage
-    # relationships across pipelines, runners, and jobs.
-    self._pcolls_to_pcoll_id = pcolls_to_pcoll_id(self._pipeline, context)
+    self._pcoll_to_pcoll_id = pcoll_to_pcoll_id(self._pipeline, context)
 
-    # A mapping from PCollection id to python id() value in user defined
-    # pipeline instance.
-    (
-        self._pcoll_version_map,
-        self._cacheables,
-        # A dict from pcoll_id to variable name of the referenced PCollection.
-        # (Dict[str, str])
-        self._cacheable_var_by_pcoll_id) = cacheables(self.pcolls_to_pcoll_id)
+    # A Dict[str, Cacheable] from a PCollection id to a Cacheable that belongs
+    # to the analyzed pipeline.
+    self._cacheables = self.find_cacheables()
 
     # A dict from cache key to PCollection that is read from cache.
     # If exists, caller should reuse the PCollection read. If not, caller
@@ -280,7 +277,7 @@
         return_context=True)
 
     # Get all the sources we want to cache.
-    sources = unbounded_sources(self._background_caching_pipeline)
+    sources = utils.unbounded_sources(self._background_caching_pipeline)
 
     # Get all the root transforms. The caching transforms will be subtransforms
     # of one of these roots.
@@ -337,27 +334,20 @@
     return pipeline_to_execute
 
   @property
-  def has_unbounded_sources(self):
-    """Returns whether the pipeline has any recordable sources.
-    """
-    return len(self._unbounded_sources) > 0
+  def cacheables(self) -> Dict[str, Cacheable]:
+    """Returns the Cacheables by PCollection ids.
 
-  @property
-  def cacheables(self):
-    """Finds cacheable PCollections from the pipeline.
-
-    The function only treats the result as cacheables since there is no
-    guarantee whether PCollections that need to be cached have been cached or
-    not. A PCollection needs to be cached when it's bound to a user defined
-    variable in the source code. Otherwise, the PCollection is not reusable
-    nor introspectable which nullifies the need of cache.
+    If you're already working with user defined pipelines and PCollections,
+    do not build a PipelineInstrument just to get the cacheables. Instead,
+    use apache_beam.runners.interactive.utils.cacheables.
     """
     return self._cacheables
 
   @property
-  def pcolls_to_pcoll_id(self):
-    """Returns a dict mapping str(PCollection)s to IDs."""
-    return self._pcolls_to_pcoll_id
+  def has_unbounded_sources(self):
+    """Returns whether the pipeline has any recordable sources.
+    """
+    return len(self._unbounded_sources) > 0
 
   @property
   def original_pipeline_proto(self):
@@ -384,6 +374,27 @@
     pipeline to instances in the user pipeline."""
     return self._runner_pcoll_to_user_pcoll
 
+  def find_cacheables(self) -> Dict[str, Cacheable]:
+    """Finds PCollections that need to be cached for analyzed pipeline.
+
+    There might be multiple pipelines defined and watched, this will only find
+    cacheables belong to the analyzed pipeline.
+    """
+    result = {}
+    cacheables = utils.cacheables()
+    for _, cacheable in cacheables.items():
+      if cacheable.pcoll.pipeline is not self._user_pipeline:
+        # Ignore all cacheables from other pipelines.
+        continue
+      pcoll_id = self.pcoll_id(cacheable.pcoll)
+      if not pcoll_id:
+        _LOGGER.debug(
+            'Unable to retrieve PCollection id for %s. Ignored.',
+            cacheable.pcoll)
+        continue
+      result[self.pcoll_id(cacheable.pcoll)] = cacheable
+    return result
+
   def instrument(self):
     """Instruments original pipeline with cache.
 
@@ -418,13 +429,13 @@
 
     v = InstrumentVisitor(self)
     self._pipeline.visit(v)
-
     # Every output PCollection that is never used as an input PCollection is
     # considered as a side effect of the pipeline run and should be included.
     self._extended_targets.update(all_outputs.difference(all_inputs))
-    # Add the unbounded source pcollections to the cacheable inputs. This allows
+    # Add the unbounded source PCollections to the cacheable inputs. This allows
     # for the caching of unbounded sources without a variable reference.
     cacheable_inputs.update(unbounded_source_pcolls)
+
     # Create ReadCache transforms.
     for cacheable_input in cacheable_inputs:
       self._read_cache(
@@ -435,7 +446,7 @@
     self._replace_with_cached_inputs(self._pipeline)
 
     # Write cache for all cacheables.
-    for _, cacheable in self.cacheables.items():
+    for _, cacheable in self._cacheables.items():
       self._write_cache(
           self._pipeline, cacheable.pcoll, ignore_unbounded_reads=True)
 
@@ -499,13 +510,13 @@
           self._process(out_pcoll)
 
       def _process(self, pcoll):
-        pcoll_id = self._pin.pcolls_to_pcoll_id.get(str(pcoll), '')
-        if pcoll_id in self._pin._pcoll_version_map:
-          cacheable_key = self._pin._cacheable_key(pcoll)
-          user_pcoll = self._pin.cacheables[cacheable_key].pcoll
-          if (cacheable_key in self._pin.cacheables and user_pcoll != pcoll):
+        pcoll_id = self._pin._pcoll_to_pcoll_id.get(str(pcoll), '')
+        if pcoll_id in self._pin._cacheables:
+          pcoll_id = self._pin.pcoll_id(pcoll)
+          user_pcoll = self._pin._cacheables[pcoll_id].pcoll
+          if (pcoll_id in self._pin._cacheables and user_pcoll != pcoll):
             self._pin._runner_pcoll_to_user_pcoll[pcoll] = user_pcoll
-            self._pin.cacheables[cacheable_key].pcoll = pcoll
+            self._pin._cacheables[pcoll_id].pcoll = pcoll
 
     v = PreprocessVisitor(self)
     self._pipeline.visit(v)
@@ -552,29 +563,17 @@
     key = self.cache_key(pcoll)
     # Only need to write when the cache with expected key doesn't exist.
     if not self._cache_manager.exists('full', key):
-      label = '{}{}'.format(WRITE_CACHE, key)
-
       self.cached_pcolls.add(self.runner_pcoll_to_user_pcoll.get(pcoll, pcoll))
-
       # Read the windowing information and cache it along with the element. This
       # caches the arguments to a WindowedValue object because Python has logic
       # that detects if a DoFn returns a WindowedValue. When it detecs one, it
       # puts the element into the correct window then emits the value to
       # downstream transforms.
-      class Reify(beam.DoFn):
-        def process(
-            self,
-            e,
-            w=beam.DoFn.WindowParam,
-            p=beam.DoFn.PaneInfoParam,
-            t=beam.DoFn.TimestampParam):
-          yield test_stream.WindowedValueHolder(WindowedValue(e, t, [w], p))
-
-      extended_target = (
-          pcoll
-          | label + 'reify' >> beam.ParDo(Reify())
-          | label >> cache.WriteCache(
-              self._cache_manager, key, is_capture=is_capture))
+      extended_target = reify_to_cache(
+          pcoll=pcoll,
+          cache_key=key,
+          cache_manager=self._cache_manager,
+          is_capture=is_capture)
       if output_as_extended_target:
         self._extended_targets.add(extended_target)
 
@@ -606,15 +605,8 @@
 
         # To put the cached value into the correct window, simply return a
         # WindowedValue constructed from the element.
-        class Unreify(beam.DoFn):
-          def process(self, e):
-            yield e.windowed_value
-
-        pcoll_from_cache = (
-            pipeline
-            | '{}{}'.format(READ_CACHE, key) >> cache.ReadCache(
-                self._cache_manager, key)
-            | '{}{}unreify'.format(READ_CACHE, key) >> beam.ParDo(Unreify()))
+        pcoll_from_cache = unreify_from_cache(
+            pipeline=pipeline, cache_key=key, cache_manager=self._cache_manager)
         self._cached_pcoll_read[key] = pcoll_from_cache
     # else: NOOP when cache doesn't exist, just compute the original graph.
 
@@ -704,7 +696,7 @@
   def _cacheable_inputs(self, transform):
     inputs = set()
     for in_pcoll in transform.inputs:
-      if self._cacheable_key(in_pcoll) in self.cacheables:
+      if self.pcoll_id(in_pcoll) in self._cacheables:
         inputs.add(in_pcoll)
     return inputs
 
@@ -717,50 +709,35 @@
       outputs.add(out_pcoll)
     return inputs, outputs
 
-  def _cacheable_key(self, pcoll):
-    """Gets the key a cacheable PCollection is tracked within the instrument."""
-    return cacheable_key(
-        pcoll, self.pcolls_to_pcoll_id, self._pcoll_version_map)
+  def pcoll_id(self, pcoll):
+    """Gets the PCollection id of the given pcoll.
+
+    Returns '' if not found.
+    """
+    return self._pcoll_to_pcoll_id.get(str(pcoll), '')
 
   def cache_key(self, pcoll):
     """Gets the identifier of a cacheable PCollection in cache.
 
     If the pcoll is not a cacheable, return ''.
+    This is only needed in pipeline instrument when the origin of given pcoll
+    is unknown (whether it's from the user pipeline or a runner pipeline). If
+    a pcoll is from the user pipeline, always use CacheKey.from_pcoll to build
+    the key.
     The key is what the pcoll would use as identifier if it's materialized in
     cache. It doesn't mean that there would definitely be such cache already.
     Also, the pcoll can come from the original user defined pipeline object or
     an equivalent pcoll from a transformed copy of the original pipeline.
-
-    'pcoll_id' of cacheable is not stable for cache_key, thus not included in
-    cache key. A combination of 'var', 'version' and 'producer_version' is
-    sufficient to identify a cached PCollection.
     """
-    cacheable = self.cacheables.get(self._cacheable_key(pcoll), None)
+    cacheable = self._cacheables.get(self.pcoll_id(pcoll), None)
     if cacheable:
       if cacheable.pcoll in self.runner_pcoll_to_user_pcoll:
         user_pcoll = self.runner_pcoll_to_user_pcoll[cacheable.pcoll]
       else:
         user_pcoll = cacheable.pcoll
-
-      return repr(
-          CacheKey(
-              cacheable.var,
-              cacheable.version,
-              cacheable.producer_version,
-              str(id(user_pcoll.pipeline))))
+      return CacheKey.from_pcoll(cacheable.var, user_pcoll).to_str()
     return ''
 
-  def cacheable_var_by_pcoll_id(self, pcoll_id):
-    """Retrieves the variable name of a PCollection.
-
-    In source code, PCollection variables are defined in the user pipeline. When
-    it's converted to the runner api representation, each PCollection referenced
-    in the user pipeline is assigned a unique-within-pipeline pcoll_id. Given
-    such pcoll_id, retrieves the str variable name defined in user pipeline for
-    that referenced PCollection. If the PCollection is not watched, return None.
-    """
-    return self._cacheable_var_by_pcoll_id.get(pcoll_id, None)
-
 
 def build_pipeline_instrument(pipeline, options=None):
   """Creates PipelineInstrument for a pipeline and its options with cache.
@@ -782,83 +759,7 @@
   return pi
 
 
-def cacheables(pcolls_to_pcoll_id):
-  """Finds PCollections that need to be cached for analyzed PCollections.
-
-  The function only treats the result as cacheables since there is no guarantee
-  whether PCollections that need to be cached have been cached or not. A
-  PCollection needs to be cached when it's bound to a user defined variable in
-  the source code. Otherwise, the PCollection is not reusable nor introspectable
-  which nullifies the need of cache. There might be multiple pipelines defined
-  and watched, this will only return for PCollections with pcolls_to_pcoll_id
-  analyzed. The check is not strict because pcoll_id is not unique across
-  multiple pipelines. Additional check needs to be done during instrument.
-  """
-  pcoll_version_map = {}
-  cacheables = {}
-  cacheable_var_by_pcoll_id = {}
-  for watching in ie.current_env().watching():
-    for key, val in watching:
-      if isinstance(val, beam.pvalue.PCollection):
-        pcoll_id = pcolls_to_pcoll_id.get(str(val), None)
-        # It's highly possible that PCollection str is not unique across
-        # multiple pipelines, further check during instrument is needed.
-        if not pcoll_id:
-          continue
-
-        cacheable = Cacheable(
-            pcoll_id=pcoll_id,
-            var=key,
-            version=str(id(val)),
-            pcoll=val,
-            producer_version=str(id(val.producer)))
-        pcoll_version_map[cacheable.pcoll_id] = cacheable.version
-        cacheables[cacheable_key(val, pcolls_to_pcoll_id)] = cacheable
-        cacheable_var_by_pcoll_id[cacheable.pcoll_id] = key
-
-  return pcoll_version_map, cacheables, cacheable_var_by_pcoll_id
-
-
-def cacheable_key(pcoll, pcolls_to_pcoll_id, pcoll_version_map=None):
-  pcoll_version = str(id(pcoll))
-  pcoll_id = pcolls_to_pcoll_id.get(str(pcoll), '')
-  if pcoll_version_map:
-    original_pipeline_pcoll_version = pcoll_version_map.get(pcoll_id, None)
-    if original_pipeline_pcoll_version:
-      pcoll_version = original_pipeline_pcoll_version
-  return '_'.join((pcoll_version, pcoll_id))
-
-
-def has_unbounded_sources(pipeline):
-  """Checks if a given pipeline has recordable sources."""
-  return len(unbounded_sources(pipeline)) > 0
-
-
-def unbounded_sources(pipeline):
-  """Returns a pipeline's recordable sources."""
-  class CheckUnboundednessVisitor(PipelineVisitor):
-    """Visitor checks if there are any unbounded read sources in the Pipeline.
-
-    Visitor visits all nodes and checks if it is an instance of recordable
-    sources.
-    """
-    def __init__(self):
-      self.unbounded_sources = []
-
-    def enter_composite_transform(self, transform_node):
-      self.visit_transform(transform_node)
-
-    def visit_transform(self, transform_node):
-      if isinstance(transform_node.transform,
-                    tuple(ie.current_env().options.recordable_sources)):
-        self.unbounded_sources.append(transform_node)
-
-  v = CheckUnboundednessVisitor()
-  pipeline.visit(v)
-  return v.unbounded_sources
-
-
-def pcolls_to_pcoll_id(pipeline, original_context):
+def pcoll_to_pcoll_id(pipeline, original_context):
   """Returns a dict mapping PCollections string to PCollection IDs.
 
   Using a PipelineVisitor to iterate over every node in the pipeline,
@@ -878,42 +779,16 @@
     results in validation errors.
     """
     def __init__(self):
-      self.pcolls_to_pcoll_id = {}
+      self.pcoll_to_pcoll_id = {}
 
     def enter_composite_transform(self, transform_node):
       self.visit_transform(transform_node)
 
     def visit_transform(self, transform_node):
       for pcoll in transform_node.outputs.values():
-        self.pcolls_to_pcoll_id[str(pcoll)] = (
+        self.pcoll_to_pcoll_id[str(pcoll)] = (
             original_context.pcollections.get_id(pcoll))
 
   v = PCollVisitor()
   pipeline.visit(v)
-  return v.pcolls_to_pcoll_id
-
-
-def watch_sources(pipeline):
-  """Watches the unbounded sources in the pipeline.
-
-  Sources can output to a PCollection without a user variable reference. In
-  this case the source is not cached. We still want to cache the data so we
-  synthetically create a variable to the intermediate PCollection.
-  """
-
-  retrieved_user_pipeline = ie.current_env().user_pipeline(pipeline)
-
-  class CacheableUnboundedPCollectionVisitor(PipelineVisitor):
-    def __init__(self):
-      self.unbounded_pcolls = set()
-
-    def enter_composite_transform(self, transform_node):
-      self.visit_transform(transform_node)
-
-    def visit_transform(self, transform_node):
-      if isinstance(transform_node.transform,
-                    tuple(ie.current_env().options.recordable_sources)):
-        for pcoll in transform_node.outputs.values():
-          ie.current_env().watch({'synthetic_var_' + str(id(pcoll)): pcoll})
-
-  retrieved_user_pipeline.visit(CacheableUnboundedPCollectionVisitor())
+  return v.pcoll_to_pcoll_id
diff --git a/sdks/python/apache_beam/runners/interactive/pipeline_instrument_test.py b/sdks/python/apache_beam/runners/interactive/pipeline_instrument_test.py
index a3f91c0..bba315d 100644
--- a/sdks/python/apache_beam/runners/interactive/pipeline_instrument_test.py
+++ b/sdks/python/apache_beam/runners/interactive/pipeline_instrument_test.py
@@ -18,7 +18,6 @@
 """Tests for apache_beam.runners.interactive.pipeline_instrument."""
 # pytype: skip-file
 
-import tempfile
 import unittest
 
 import apache_beam as beam
@@ -29,6 +28,9 @@
 from apache_beam.runners.interactive import interactive_environment as ie
 from apache_beam.runners.interactive import pipeline_instrument as instr
 from apache_beam.runners.interactive import interactive_runner
+from apache_beam.runners.interactive import utils
+from apache_beam.runners.interactive.caching.cacheable import Cacheable
+from apache_beam.runners.interactive.caching.cacheable import CacheKey
 from apache_beam.runners.interactive.caching.streaming_cache import StreamingCache
 from apache_beam.runners.interactive.testing.pipeline_assertion import assert_pipeline_equal
 from apache_beam.runners.interactive.testing.pipeline_assertion import assert_pipeline_proto_contain_top_level_transform
@@ -44,38 +46,32 @@
     ie.new_env()
 
   def cache_key_of(self, name, pcoll):
-    return repr(
-        instr.CacheKey(
-            name,
-            str(id(pcoll)),
-            str(id(pcoll.producer)),
-            str(id(pcoll.pipeline))))
+    return CacheKey.from_pcoll(name, pcoll).to_str()
 
-  def test_pcolls_to_pcoll_id(self):
+  def test_pcoll_to_pcoll_id(self):
     p = beam.Pipeline(interactive_runner.InteractiveRunner())
     ie.current_env().set_cache_manager(InMemoryCache(), p)
     # pylint: disable=range-builtin-not-iterating
     init_pcoll = p | 'Init Create' >> beam.Impulse()
     _, ctx = p.to_runner_api(return_context=True)
     self.assertEqual(
-        instr.pcolls_to_pcoll_id(p, ctx),
+        instr.pcoll_to_pcoll_id(p, ctx),
         {str(init_pcoll): 'ref_PCollection_PCollection_1'})
 
-  def test_cacheable_key_without_version_map(self):
-    p = beam.Pipeline(interactive_runner.InteractiveRunner())
-    ie.current_env().set_cache_manager(InMemoryCache(), p)
-    # pylint: disable=range-builtin-not-iterating
-    init_pcoll = p | 'Init Create' >> beam.Create(range(10))
-    _, ctx = p.to_runner_api(return_context=True)
+  def test_pcoll_id_with_user_pipeline(self):
+    p_id_user = beam.Pipeline(interactive_runner.InteractiveRunner())
+    ie.current_env().set_cache_manager(InMemoryCache(), p_id_user)
+    init_pcoll = p_id_user | 'Init Create' >> beam.Create([1, 2, 3])
+    instrumentation = instr.build_pipeline_instrument(p_id_user)
     self.assertEqual(
-        instr.cacheable_key(init_pcoll, instr.pcolls_to_pcoll_id(p, ctx)),
-        str(id(init_pcoll)) + '_ref_PCollection_PCollection_8')
+        instrumentation.pcoll_id(init_pcoll), 'ref_PCollection_PCollection_8')
 
-  def test_cacheable_key_with_version_map(self):
-    p = beam.Pipeline(interactive_runner.InteractiveRunner())
-    ie.current_env().set_cache_manager(InMemoryCache(), p)
-    # pylint: disable=range-builtin-not-iterating
-    init_pcoll = p | 'Init Create' >> beam.Create(range(10))
+  def test_pcoll_id_with_runner_pipeline(self):
+    p_id_runner = beam.Pipeline(interactive_runner.InteractiveRunner())
+    ie.current_env().set_cache_manager(InMemoryCache(), p_id_runner)
+    # pylint: disable=possibly-unused-variable
+    init_pcoll = p_id_runner | 'Init Create' >> beam.Create([1, 2, 3])
+    ib.watch(locals())
 
     # It's normal that when executing, the pipeline object is a different
     # but equivalent instance from what user has built. The pipeline instrument
@@ -84,20 +80,16 @@
     # version map can be used to figure out what the PCollection instances are
     # in the original instance and if the evaluation has changed since last
     # execution.
-    p2 = beam.Pipeline(interactive_runner.InteractiveRunner())
-    ie.current_env().set_cache_manager(InMemoryCache(), p2)
+    p2_id_runner = beam.Pipeline(interactive_runner.InteractiveRunner())
     # pylint: disable=range-builtin-not-iterating
-    init_pcoll_2 = p2 | 'Init Create' >> beam.Create(range(10))
-    _, ctx = p2.to_runner_api(return_context=True)
+    init_pcoll_2 = p2_id_runner | 'Init Create' >> beam.Create(range(10))
+    ie.current_env().add_derived_pipeline(p_id_runner, p2_id_runner)
 
-    # The cacheable_key should use id(init_pcoll) as prefix even when
+    instrumentation = instr.build_pipeline_instrument(p2_id_runner)
+    # The cache_key should use id(init_pcoll) as prefix even when
     # init_pcoll_2 is supplied as long as the version map is given.
     self.assertEqual(
-        instr.cacheable_key(
-            init_pcoll_2,
-            instr.pcolls_to_pcoll_id(p2, ctx),
-            {'ref_PCollection_PCollection_8': str(id(init_pcoll))}),
-        str(id(init_pcoll)) + '_ref_PCollection_PCollection_8')
+        instrumentation.pcoll_id(init_pcoll_2), 'ref_PCollection_PCollection_8')
 
   def test_cache_key(self):
     p = beam.Pipeline(interactive_runner.InteractiveRunner())
@@ -120,63 +112,36 @@
         pipeline_instrument.cache_key(cubes), self.cache_key_of('cubes', cubes))
 
   def test_cacheables(self):
-    p = beam.Pipeline(interactive_runner.InteractiveRunner())
-    ie.current_env().set_cache_manager(InMemoryCache(), p)
+    p_cacheables = beam.Pipeline(interactive_runner.InteractiveRunner())
+    ie.current_env().set_cache_manager(InMemoryCache(), p_cacheables)
     # pylint: disable=range-builtin-not-iterating
-    init_pcoll = p | 'Init Create' >> beam.Create(range(10))
+    init_pcoll = p_cacheables | 'Init Create' >> beam.Create(range(10))
     squares = init_pcoll | 'Square' >> beam.Map(lambda x: x * x)
     cubes = init_pcoll | 'Cube' >> beam.Map(lambda x: x**3)
     ib.watch(locals())
 
-    pipeline_instrument = instr.build_pipeline_instrument(p)
-
-    # TODO(BEAM-7760): The PipelineInstrument cacheables maintains a global list
-    # of cacheable PCollections across all pipelines. Here we take the subset of
-    # cacheables that only pertain to this test's pipeline.
-    cacheables = {
-        k: c
-        for k,
-        c in pipeline_instrument.cacheables.items() if c.pcoll.pipeline is p
-    }
+    pipeline_instrument = instr.build_pipeline_instrument(p_cacheables)
 
     self.assertEqual(
-        cacheables,
+        pipeline_instrument._cacheables,
         {
-            pipeline_instrument._cacheable_key(init_pcoll): instr.Cacheable(
+            pipeline_instrument.pcoll_id(init_pcoll): Cacheable(
                 var='init_pcoll',
                 version=str(id(init_pcoll)),
-                pcoll_id='ref_PCollection_PCollection_8',
                 producer_version=str(id(init_pcoll.producer)),
                 pcoll=init_pcoll),
-            pipeline_instrument._cacheable_key(squares): instr.Cacheable(
+            pipeline_instrument.pcoll_id(squares): Cacheable(
                 var='squares',
                 version=str(id(squares)),
-                pcoll_id='ref_PCollection_PCollection_9',
                 producer_version=str(id(squares.producer)),
                 pcoll=squares),
-            pipeline_instrument._cacheable_key(cubes): instr.Cacheable(
+            pipeline_instrument.pcoll_id(cubes): Cacheable(
                 var='cubes',
                 version=str(id(cubes)),
-                pcoll_id='ref_PCollection_PCollection_10',
                 producer_version=str(id(cubes.producer)),
                 pcoll=cubes)
         })
 
-  def test_has_unbounded_source(self):
-    p = beam.Pipeline(interactive_runner.InteractiveRunner())
-    ie.current_env().set_cache_manager(InMemoryCache(), p)
-    _ = p | 'ReadUnboundedSource' >> beam.io.ReadFromPubSub(
-        subscription='projects/fake-project/subscriptions/fake_sub')
-    self.assertTrue(instr.has_unbounded_sources(p))
-
-  def test_not_has_unbounded_source(self):
-    p = beam.Pipeline(interactive_runner.InteractiveRunner())
-    ie.current_env().set_cache_manager(InMemoryCache(), p)
-    with tempfile.NamedTemporaryFile(delete=False) as f:
-      f.write(b'test')
-    _ = p | 'ReadBoundedSource' >> beam.io.ReadFromText(f.name)
-    self.assertFalse(instr.has_unbounded_sources(p))
-
   def test_background_caching_pipeline_proto(self):
     p = beam.Pipeline(interactive_runner.InteractiveRunner())
     ie.current_env().set_cache_manager(StreamingCache(cache_dir=None), p)
@@ -215,12 +180,11 @@
         | 'b' >> cache.WriteCache(ie.current_env().get_cache_manager(p), ''))
 
     expected_pipeline = p.to_runner_api(return_context=False)
-
     assert_pipeline_proto_equal(self, expected_pipeline, actual_pipeline)
 
   def _example_pipeline(self, watch=True, bounded=True):
-    p = beam.Pipeline(interactive_runner.InteractiveRunner())
-    ie.current_env().set_cache_manager(InMemoryCache(), p)
+    p_example = beam.Pipeline(interactive_runner.InteractiveRunner())
+    ie.current_env().set_cache_manager(InMemoryCache(), p_example)
     # pylint: disable=range-builtin-not-iterating
     if bounded:
       source = beam.Create(range(10))
@@ -228,11 +192,11 @@
       source = beam.io.ReadFromPubSub(
           subscription='projects/fake-project/subscriptions/fake_sub')
 
-    init_pcoll = p | 'Init Source' >> source
+    init_pcoll = p_example | 'Init Source' >> source
     second_pcoll = init_pcoll | 'Second' >> beam.Map(lambda x: x * x)
     if watch:
       ib.watch(locals())
-    return (p, init_pcoll, second_pcoll)
+    return (p_example, init_pcoll, second_pcoll)
 
   def _mock_write_cache(self, pipeline, values, cache_key):
     """Cache the PCollection where cache.WriteCache would write to."""
@@ -248,7 +212,8 @@
     # Original instance defined by user code has all variables handlers.
     p_origin, init_pcoll, second_pcoll = self._example_pipeline()
     # Copied instance when execution has no user defined variables.
-    p_copy, _, _ = self._example_pipeline(False)
+    p_copy, _, _ = self._example_pipeline(watch=False)
+    ie.current_env().add_derived_pipeline(p_origin, p_copy)
     # Instrument the copied pipeline.
     pipeline_instrument = instr.build_pipeline_instrument(p_copy)
     # Manually instrument original pipeline with expected pipeline transforms.
@@ -337,7 +302,8 @@
 
     # Mock as if cacheable PCollections are cached.
     ib.watch(locals())
-
+    # This should be noop.
+    utils.watch_sources(p_original)
     for name, pcoll in locals().items():
       if not isinstance(pcoll, beam.pvalue.PCollection):
         continue
@@ -395,21 +361,22 @@
     from apache_beam.options.pipeline_options import StandardOptions
     options = StandardOptions(streaming=True)
     streaming_cache_manager = StreamingCache(cache_dir=None)
-    p_original = beam.Pipeline(interactive_runner.InteractiveRunner(), options)
-    ie.current_env().set_cache_manager(streaming_cache_manager, p_original)
+    p_original_cache_source = beam.Pipeline(
+        interactive_runner.InteractiveRunner(), options)
+    ie.current_env().set_cache_manager(
+        streaming_cache_manager, p_original_cache_source)
 
     # pylint: disable=possibly-unused-variable
     source_1 = (
-        p_original
+        p_original_cache_source
         | 'source1' >> beam.io.ReadFromPubSub(
             subscription='projects/fake-project/subscriptions/fake_sub')
         | beam.Map(lambda e: e))
 
     # Watch but do not cache the PCollections.
     ib.watch(locals())
-
     # Make sure that sources without a user reference are still cached.
-    instr.watch_sources(p_original)
+    utils.watch_sources(p_original_cache_source)
 
     intermediate_source_pcoll = None
     for watching in ie.current_env().watching():
@@ -421,14 +388,17 @@
 
     # Instrument the original pipeline to create the pipeline the user will see.
     p_copy = beam.Pipeline.from_runner_api(
-        p_original.to_runner_api(),
+        p_original_cache_source.to_runner_api(),
         runner=interactive_runner.InteractiveRunner(),
         options=options)
+    ie.current_env().add_derived_pipeline(p_original_cache_source, p_copy)
     instrumenter = instr.build_pipeline_instrument(p_copy)
     actual_pipeline = beam.Pipeline.from_runner_api(
         proto=instrumenter.instrumented_pipeline_proto(),
         runner=interactive_runner.InteractiveRunner(),
         options=options)
+    ie.current_env().add_derived_pipeline(
+        p_original_cache_source, actual_pipeline)
 
     # Now, build the expected pipeline which replaces the unbounded source with
     # a TestStream.
@@ -496,7 +466,8 @@
 
     # Watch but do not cache the PCollections.
     ib.watch(locals())
-
+    # This should be noop.
+    utils.watch_sources(p_original)
     self._mock_write_cache(
         p_original, [], self.cache_key_of('source_2', source_2))
     ie.current_env().mark_pcollection_computed([source_2])
@@ -563,36 +534,39 @@
     # Create the pipeline that will be instrumented.
     from apache_beam.options.pipeline_options import StandardOptions
     options = StandardOptions(streaming=True)
-    p_original = beam.Pipeline(interactive_runner.InteractiveRunner(), options)
+    p_original_direct_source = beam.Pipeline(
+        interactive_runner.InteractiveRunner(), options)
     ie.current_env().set_cache_manager(
-        StreamingCache(cache_dir=None), p_original)
-    source_1 = p_original | 'source1' >> beam.io.ReadFromPubSub(
+        StreamingCache(cache_dir=None), p_original_direct_source)
+    source_1 = p_original_direct_source | 'source1' >> beam.io.ReadFromPubSub(
         subscription='projects/fake-project/subscriptions/fake_sub')
     # pylint: disable=possibly-unused-variable
-
+    p_expected = beam.Pipeline()
+    # pylint: disable=unused-variable
+    test_stream = (
+        p_expected
+        | TestStream(output_tags=[self.cache_key_of('source_1', source_1)]))
     # Watch but do not cache the PCollections.
     ib.watch(locals())
-
+    # This should be noop.
+    utils.watch_sources(p_original_direct_source)
     # Instrument the original pipeline to create the pipeline the user will see.
     p_copy = beam.Pipeline.from_runner_api(
-        p_original.to_runner_api(),
+        p_original_direct_source.to_runner_api(),
         runner=interactive_runner.InteractiveRunner(),
         options=options)
+    ie.current_env().add_derived_pipeline(p_original_direct_source, p_copy)
     instrumenter = instr.build_pipeline_instrument(p_copy)
     actual_pipeline = beam.Pipeline.from_runner_api(
         proto=instrumenter.instrumented_pipeline_proto(),
         runner=interactive_runner.InteractiveRunner(),
         options=options)
+    ie.current_env().add_derived_pipeline(
+        p_original_direct_source, actual_pipeline)
 
     # Now, build the expected pipeline which replaces the unbounded source with
     # a TestStream.
     source_1_cache_key = self.cache_key_of('source_1', source_1)
-    p_expected = beam.Pipeline()
-
-    # pylint: disable=unused-variable
-    test_stream = (
-        p_expected
-        | TestStream(output_tags=[self.cache_key_of('source_1', source_1)]))
 
     # Test that the TestStream is outputting to the correct PCollection.
     class TestStreamVisitor(PipelineVisitor):
@@ -625,22 +599,25 @@
     # Create the pipeline that will be instrumented.
     from apache_beam.options.pipeline_options import StandardOptions
     options = StandardOptions(streaming=True)
-    p_original = beam.Pipeline(interactive_runner.InteractiveRunner(), options)
+    p_original_read_cache = beam.Pipeline(
+        interactive_runner.InteractiveRunner(), options)
     ie.current_env().set_cache_manager(
-        StreamingCache(cache_dir=None), p_original)
-    source_1 = p_original | 'source1' >> beam.io.ReadFromPubSub(
+        StreamingCache(cache_dir=None), p_original_read_cache)
+    source_1 = p_original_read_cache | 'source1' >> beam.io.ReadFromPubSub(
         subscription='projects/fake-project/subscriptions/fake_sub')
     # pylint: disable=possibly-unused-variable
     pcoll_1 = source_1 | 'square1' >> beam.Map(lambda x: x * x)
 
     # Watch but do not cache the PCollections.
     ib.watch(locals())
-
+    # This should be noop.
+    utils.watch_sources(p_original_read_cache)
     # Instrument the original pipeline to create the pipeline the user will see.
     p_copy = beam.Pipeline.from_runner_api(
-        p_original.to_runner_api(),
+        p_original_read_cache.to_runner_api(),
         runner=interactive_runner.InteractiveRunner(),
         options=options)
+    ie.current_env().add_derived_pipeline(p_original_read_cache, p_copy)
     instrumenter = instr.build_pipeline_instrument(p_copy)
     actual_pipeline = beam.Pipeline.from_runner_api(
         proto=instrumenter.instrumented_pipeline_proto(),
@@ -705,7 +682,8 @@
 
     # Mock as if cacheable PCollections are cached.
     ib.watch(locals())
-
+    # This should be noop.
+    utils.watch_sources(p_original)
     for name, pcoll in locals().items():
       if not isinstance(pcoll, beam.pvalue.PCollection):
         continue
diff --git a/sdks/python/apache_beam/runners/interactive/recording_manager.py b/sdks/python/apache_beam/runners/interactive/recording_manager.py
index c51a648..690e133 100644
--- a/sdks/python/apache_beam/runners/interactive/recording_manager.py
+++ b/sdks/python/apache_beam/runners/interactive/recording_manager.py
@@ -29,8 +29,8 @@
 from apache_beam.runners.interactive import interactive_environment as ie
 from apache_beam.runners.interactive import interactive_runner as ir
 from apache_beam.runners.interactive import pipeline_fragment as pf
-from apache_beam.runners.interactive import pipeline_instrument as pi
 from apache_beam.runners.interactive import utils
+from apache_beam.runners.interactive.caching.cacheable import CacheKey
 from apache_beam.runners.runner import PipelineState
 
 _LOGGER = logging.getLogger(__name__)
@@ -48,7 +48,7 @@
       ):
     self._pcoll = pcoll
     self._cache_key = cache_key
-    self._pipeline = pcoll.pipeline
+    self._pipeline = ie.current_env().user_pipeline(pcoll.pipeline)
     self._var = var
     self._n = max_n
     self._duration_secs = max_duration_secs
@@ -157,24 +157,22 @@
       user_pipeline,  # type: beam.Pipeline
       pcolls,  # type: List[beam.pvalue.PCollection]
       result,  # type: beam.runner.PipelineResult
-      pipeline_instrument,  # type: beam.runners.interactive.PipelineInstrument
       max_n,  # type: int
       max_duration_secs,  # type: float
       ):
-
     self._user_pipeline = user_pipeline
     self._result = result
     self._result_lock = threading.Lock()
     self._pcolls = pcolls
-
-    pcoll_var = lambda pcoll: pipeline_instrument.cacheable_var_by_pcoll_id(
-        pipeline_instrument.pcolls_to_pcoll_id.get(str(pcoll), None))
+    pcoll_var = lambda pcoll: {v: k
+                               for k, v in utils.pcoll_by_name().items()}.get(
+                                   pcoll, None)
 
     self._streams = {
         pcoll: ElementStream(
             pcoll,
             pcoll_var(pcoll),
-            pipeline_instrument.cache_key(pcoll),
+            CacheKey.from_pcoll(pcoll_var(pcoll), pcoll).to_str(),
             max_n,
             max_duration_secs)
         for pcoll in pcolls
@@ -316,8 +314,8 @@
         ie.current_env().watch(
             {'anonymous_pcollection_{}'.format(id(pcoll)): pcoll})
 
-  def _clear(self, pipeline_instrument):
-    # type: (List[beam.pvalue.PCollection]) -> None
+  def _clear(self):
+    # type: () -> None
 
     """Clears the recording of all non-source PCollections."""
 
@@ -327,7 +325,7 @@
     # BackgroundCachingJob.
     computed = ie.current_env().computed_pcollections
     cacheables = [
-        c for c in pipeline_instrument.cacheables.values()
+        c for c in utils.cacheables().values()
         if c.pcoll.pipeline is self.user_pipeline and c.pcoll not in computed
     ]
     all_cached = set(str(c.to_key()) for c in cacheables)
@@ -397,7 +395,7 @@
 
     # Make sure that sources without a user reference are still cached.
     ie.current_env().add_user_pipeline(self.user_pipeline)
-    pi.watch_sources(self.user_pipeline)
+    utils.watch_sources(self.user_pipeline)
 
     # Attempt to run background caching job to record any sources.
     if ie.current_env().is_in_ipython:
@@ -437,7 +435,6 @@
     # watch it. No validation is needed here because the watch logic can handle
     # arbitrary variables.
     self._watch(pcolls)
-    pipeline_instrument = pi.PipelineInstrument(self.user_pipeline)
     self.record_pipeline()
 
     # Get the subset of computed PCollections. These do not to be recomputed.
@@ -450,7 +447,7 @@
     if uncomputed_pcolls:
       # Clear the cache of the given uncomputed PCollections because they are
       # incomplete.
-      self._clear(pipeline_instrument)
+      self._clear()
 
       warnings.filterwarnings(
           'ignore',
@@ -464,12 +461,29 @@
       result = None
 
     recording = Recording(
-        self.user_pipeline,
-        pcolls,
-        result,
-        pipeline_instrument,
-        max_n,
-        max_duration_secs)
+        self.user_pipeline, pcolls, result, max_n, max_duration_secs)
     self._recordings.add(recording)
 
     return recording
+
+  def read(self, pcoll_name, pcoll, max_n, max_duration_secs):
+    # type: (str, beam.pvalue.PValue, int, float) -> Union[None, ElementStream]
+
+    """Reads an ElementStream of a computed PCollection.
+
+    Returns None if an error occurs. The caller is responsible of validating if
+    the given pcoll_name and pcoll can identify a watched and computed
+    PCollection without ambiguity in the notebook.
+    """
+
+    try:
+      cache_key = CacheKey.from_pcoll(pcoll_name, pcoll).to_str()
+      return ElementStream(
+          pcoll, pcoll_name, cache_key, max_n, max_duration_secs)
+    except (KeyboardInterrupt, SystemExit):
+      raise
+    except Exception as e:
+      # Caller should handle all validations. Here to avoid redundant
+      # validations, simply log errors if caller fails to do so.
+      _LOGGER.error(str(e))
+      return None
diff --git a/sdks/python/apache_beam/runners/interactive/recording_manager_test.py b/sdks/python/apache_beam/runners/interactive/recording_manager_test.py
index 7b7a6e9..ec7c78e 100644
--- a/sdks/python/apache_beam/runners/interactive/recording_manager_test.py
+++ b/sdks/python/apache_beam/runners/interactive/recording_manager_test.py
@@ -26,7 +26,7 @@
 from apache_beam.runners.interactive import background_caching_job as bcj
 from apache_beam.runners.interactive import interactive_beam as ib
 from apache_beam.runners.interactive import interactive_environment as ie
-from apache_beam.runners.interactive import pipeline_instrument as pi
+from apache_beam.runners.interactive.caching.cacheable import CacheKey
 from apache_beam.runners.interactive.interactive_runner import InteractiveRunner
 from apache_beam.runners.interactive.options.capture_limiters import Limiter
 from apache_beam.runners.interactive.recording_manager import ElementStream
@@ -66,12 +66,12 @@
     self.cache = InMemoryCache()
     self.p = beam.Pipeline()
     self.pcoll = self.p | beam.Create([])
-    self.cache_key = str(pi.CacheKey('pcoll', '', '', ''))
+    self.cache_key = str(CacheKey('pcoll', '', '', ''))
 
     # Create a MockPipelineResult to control the state of a fake run of the
     # pipeline.
     self.mock_result = MockPipelineResult()
-    ie.current_env().track_user_pipelines()
+    ie.current_env().add_user_pipeline(self.p)
     ie.current_env().set_pipeline_result(self.p, self.mock_result)
     ie.current_env().set_cache_manager(self.cache, self.p)
 
@@ -207,11 +207,7 @@
 
     # Create a recording.
     recording = Recording(
-        p, [elems],
-        mock_result,
-        pi.PipelineInstrument(p),
-        max_n=10,
-        max_duration_secs=60)
+        p, [elems], mock_result, max_n=10, max_duration_secs=60)
 
     # The background caching job and the recording isn't done yet so there may
     # be more elements to be recorded.
@@ -235,11 +231,7 @@
     bcj_mock_result.set_state(PipelineState.DONE)
     ie.current_env().set_background_caching_job(p, background_caching_job)
     recording = Recording(
-        p, [elems],
-        mock_result,
-        pi.PipelineInstrument(p),
-        max_n=10,
-        max_duration_secs=60)
+        p, [elems], mock_result, max_n=10, max_duration_secs=60)
     recording.wait_until_finish()
 
     # There are no more elements and the recording finished, meaning that the
@@ -267,11 +259,7 @@
 
     # Create a recording with an arbitrary start time.
     recording = Recording(
-        p, [numbers, letters],
-        mock_result,
-        pi.PipelineInstrument(p),
-        max_n=10,
-        max_duration_secs=60)
+        p, [numbers, letters], mock_result, max_n=10, max_duration_secs=60)
 
     # Get the cache key of the stream and write something to cache. This is
     # so that a pipeline doesn't have to run in the test.
@@ -422,9 +410,6 @@
     # was run.
     rm = RecordingManager(p)
 
-    # Get the cache, key, and coder to read the PCollection from the cache.
-    pipeline_instrument = pi.PipelineInstrument(p)
-
     # Set up a mock for the Cache's clear function which will be used to clear
     # uncomputed PCollections.
     rm._clear_pcolls = MagicMock()
@@ -434,7 +419,9 @@
     # Assert that the cache cleared the PCollection.
     rm._clear_pcolls.assert_any_call(
         unittest.mock.ANY,
-        set(pipeline_instrument.cache_key(pc) for pc in (elems, squares)))
+        # elems is unbounded source populated by the background job, thus not
+        # cleared.
+        {CacheKey.from_pcoll('squares', squares).to_str()})
 
   def test_clear(self):
     p1 = beam.Pipeline(InteractiveRunner())
diff --git a/sdks/python/apache_beam/runners/interactive/sql/beam_sql_magics.py b/sdks/python/apache_beam/runners/interactive/sql/beam_sql_magics.py
index 1dc42e0..6c0f8d3 100644
--- a/sdks/python/apache_beam/runners/interactive/sql/beam_sql_magics.py
+++ b/sdks/python/apache_beam/runners/interactive/sql/beam_sql_magics.py
@@ -30,18 +30,20 @@
 
 import apache_beam as beam
 from apache_beam.pvalue import PValue
-from apache_beam.runners.interactive import cache_manager as cache
 from apache_beam.runners.interactive import interactive_beam as ib
 from apache_beam.runners.interactive import interactive_environment as ie
-from apache_beam.runners.interactive import pipeline_instrument as inst
-from apache_beam.runners.interactive.cache_manager import FileBasedCacheManager
-from apache_beam.runners.interactive.caching.streaming_cache import StreamingCache
+from apache_beam.runners.interactive.background_caching_job import has_source_to_cache
+from apache_beam.runners.interactive.caching.cacheable import CacheKey
+from apache_beam.runners.interactive.caching.reify import reify_to_cache
+from apache_beam.runners.interactive.caching.reify import unreify_from_cache
+from apache_beam.runners.interactive.display.pcoll_visualization import visualize_computed_pcoll
 from apache_beam.runners.interactive.sql.utils import find_pcolls
 from apache_beam.runners.interactive.sql.utils import is_namedtuple
-from apache_beam.runners.interactive.sql.utils import pcolls_by_name
+from apache_beam.runners.interactive.sql.utils import pformat_namedtuple
 from apache_beam.runners.interactive.sql.utils import register_coder_for_schema
 from apache_beam.runners.interactive.sql.utils import replace_single_pcoll_token
 from apache_beam.runners.interactive.utils import obfuscate
+from apache_beam.runners.interactive.utils import pcoll_by_name
 from apache_beam.runners.interactive.utils import progress_indicated
 from apache_beam.testing import test_stream
 from apache_beam.testing.test_stream_service import TestStreamServiceController
@@ -66,6 +68,19 @@
     depending on the SQL statement executed.
 """
 
+_NOT_SUPPORTED_MSG = """The query was valid and successfully applied.
+    But beam_sql failed to execute the query: %s
+
+    Runner used by beam_sql was %s.
+    Some Beam features might have not been supported by the Python SDK and runner combination.
+    Please check the runner output for more details about the failed items.
+
+    In the meantime, you may check:
+    https://beam.apache.org/documentation/runners/capability-matrix/
+    to choose a runner other than the InteractiveRunner and explicitly apply SqlTransform
+    to build Beam pipelines in a non-interactive manner.
+"""
+
 
 def on_error(error_msg, *args):
   """Logs the error and the usage example."""
@@ -98,7 +113,7 @@
     if not cell or cell.isspace():
       on_error('Please supply the sql to be executed.')
       return
-    found = find_pcolls(cell, pcolls_by_name())
+    found = find_pcolls(cell, pcoll_by_name())
     for _, pcoll in found.items():
       if not is_namedtuple(pcoll.element_type):
         on_error(
@@ -110,15 +125,15 @@
         return
       register_coder_for_schema(pcoll.element_type)
 
-    # TODO(BEAM-10708): implicitly execute the pipeline and write output into
-    # cache.
-    return apply_sql(cell, line, found)
+    output_name, output = apply_sql(cell, line, found)
+    cache_output(output_name, output)
+    return output
 
 
 @progress_indicated
 def apply_sql(
     query: str, output_name: Optional[str],
-    found: Dict[str, beam.PCollection]) -> PValue:
+    found: Dict[str, beam.PCollection]) -> Tuple[str, PValue]:
   """Applies a SqlTransform with the given sql and queried PCollections.
 
   Args:
@@ -127,7 +142,9 @@
     found: The PCollections with variable names found to be used in the query.
 
   Returns:
-    A PValue, mostly a PCollection, depending on the query.
+    A Tuple[str, PValue]. First str value is the output variable name in
+    __main__ module (auto-generated if not provided). Second PValue is
+    most likely a PCollection, depending on the query.
   """
   output_name = _generate_output_name(output_name, query, found)
   query, sql_source = _build_query_components(query, found)
@@ -138,53 +155,20 @@
     setattr(importlib.import_module('__main__'), output_name, output)
     ib.watch({output_name: output})
     _LOGGER.info(
-        "The output PCollection variable is %s: %s", output_name, output)
-    return output
+        "The output PCollection variable is %s with element_type %s",
+        output_name,
+        pformat_namedtuple(output.element_type))
+    return output_name, output
   except (KeyboardInterrupt, SystemExit):
     raise
   except Exception as e:
     on_error('Error when applying the Beam SQL: %s', e)
 
 
-def pcoll_from_file_cache(
-    query_pipeline: beam.Pipeline,
-    pcoll: beam.PCollection,
-    cache_manager: FileBasedCacheManager,
-    key: str) -> beam.PCollection:
-  """Reads PCollection cache from files.
-
-  Args:
-    query_pipeline: The beam.Pipeline object built by the magic to execute the
-        SQL query.
-    pcoll: The PCollection to read cache for.
-    cache_manager: The file based cache manager that holds the PCollection
-        cache.
-    key: The key of the PCollection cache.
-
-  Returns:
-    A PCollection read from the cache.
-  """
-  schema = pcoll.element_type
-
-  class Unreify(beam.DoFn):
-    def process(self, e):
-      if isinstance(e, beam.Row) and hasattr(e, 'windowed_value'):
-        yield e.windowed_value
-
-  return (
-      query_pipeline
-      |
-      '{}{}'.format('QuerySource', key) >> cache.ReadCache(cache_manager, key)
-      | '{}{}'.format('Unreify', key) >> beam.ParDo(
-          Unreify()).with_output_types(schema))
-
-
 def pcolls_from_streaming_cache(
     user_pipeline: beam.Pipeline,
     query_pipeline: beam.Pipeline,
-    name_to_pcoll: Dict[str, beam.PCollection],
-    instrumentation: inst.PipelineInstrument,
-    cache_manager: StreamingCache) -> Dict[str, beam.PCollection]:
+    name_to_pcoll: Dict[str, beam.PCollection]) -> Dict[str, beam.PCollection]:
   """Reads PCollection cache through the TestStream.
 
   Args:
@@ -193,9 +177,6 @@
     query_pipeline: The beam.Pipeline object built by the magic to execute the
         SQL query.
     name_to_pcoll: PCollections with variable names used in the SQL query.
-    instrumentation: A pipeline_instrument.PipelineInstrument that helps
-        calculate the cache key of a given PCollection.
-    cache_manager: The streaming cache manager that holds the PCollection cache.
 
   Returns:
     A Dict[str, beam.PCollection], where each PCollection is tagged with
@@ -208,6 +189,8 @@
     _LOGGER.error(str(e))
     return True
 
+  cache_manager = ie.current_env().get_cache_manager(
+      user_pipeline, create_if_absent=True)
   test_stream_service = ie.current_env().get_test_stream_service_controller(
       user_pipeline)
   if not test_stream_service:
@@ -219,7 +202,7 @@
 
   tag_to_name = {}
   for name, pcoll in name_to_pcoll.items():
-    key = instrumentation.cache_key(pcoll)
+    key = CacheKey.from_pcoll(name, pcoll).to_str()
     tag_to_name[key] = name
   output_pcolls = query_pipeline | test_stream.TestStream(
       output_tags=set(tag_to_name.keys()),
@@ -267,27 +250,54 @@
   """
   if found:
     user_pipeline = next(iter(found.values())).pipeline
-    cache_manager = ie.current_env().get_cache_manager(user_pipeline)
-    instrumentation = inst.build_pipeline_instrument(user_pipeline)
     sql_pipeline = beam.Pipeline(options=user_pipeline._options)
     ie.current_env().add_derived_pipeline(user_pipeline, sql_pipeline)
     sql_source = {}
-    if instrumentation.has_unbounded_sources:
+    if has_source_to_cache(user_pipeline):
       sql_source = pcolls_from_streaming_cache(
-          user_pipeline, sql_pipeline, found, instrumentation, cache_manager)
+          user_pipeline, sql_pipeline, found)
     else:
+      cache_manager = ie.current_env().get_cache_manager(
+          user_pipeline, create_if_absent=True)
       for pcoll_name, pcoll in found.items():
-        cache_key = instrumentation.cache_key(pcoll)
-        sql_source[pcoll_name] = pcoll_from_file_cache(
-            sql_pipeline, pcoll, cache_manager, cache_key)
+        cache_key = CacheKey.from_pcoll(pcoll_name, pcoll).to_str()
+        sql_source[pcoll_name] = unreify_from_cache(
+            pipeline=sql_pipeline,
+            cache_key=cache_key,
+            cache_manager=cache_manager,
+            element_type=pcoll.element_type)
     if len(sql_source) == 1:
       query = replace_single_pcoll_token(query, next(iter(sql_source.keys())))
       sql_source = next(iter(sql_source.values()))
   else:
     sql_source = beam.Pipeline()
+    ie.current_env().add_user_pipeline(sql_source)
   return query, sql_source
 
 
+@progress_indicated
+def cache_output(output_name: str, output: PValue) -> None:
+  user_pipeline = ie.current_env().user_pipeline(output.pipeline)
+  if user_pipeline:
+    cache_manager = ie.current_env().get_cache_manager(
+        user_pipeline, create_if_absent=True)
+  else:
+    _LOGGER.warning(
+        'Something is wrong with %s. Cannot introspect its data.', output)
+    return
+  key = CacheKey.from_pcoll(output_name, output).to_str()
+  _ = reify_to_cache(pcoll=output, cache_key=key, cache_manager=cache_manager)
+  try:
+    output.pipeline.run().wait_until_finish()
+  except (KeyboardInterrupt, SystemExit):
+    raise
+  except Exception as e:
+    _LOGGER.warning(_NOT_SUPPORTED_MSG, e, output.pipeline.runner)
+    return
+  ie.current_env().mark_pcollection_computed([output])
+  visualize_computed_pcoll(output_name, output)
+
+
 def load_ipython_extension(ipython):
   """Marks this module as an IPython extension.
 
diff --git a/sdks/python/apache_beam/runners/interactive/sql/beam_sql_magics_test.py b/sdks/python/apache_beam/runners/interactive/sql/beam_sql_magics_test.py
index d35bd46..7c4de77 100644
--- a/sdks/python/apache_beam/runners/interactive/sql/beam_sql_magics_test.py
+++ b/sdks/python/apache_beam/runners/interactive/sql/beam_sql_magics_test.py
@@ -27,10 +27,13 @@
 import apache_beam as beam
 from apache_beam.runners.interactive import interactive_beam as ib
 from apache_beam.runners.interactive import interactive_environment as ie
+from apache_beam.runners.interactive.cache_manager import FileBasedCacheManager
+from apache_beam.runners.interactive.caching.cacheable import CacheKey
 
 try:
   from apache_beam.runners.interactive.sql.beam_sql_magics import _build_query_components
   from apache_beam.runners.interactive.sql.beam_sql_magics import _generate_output_name
+  from apache_beam.runners.interactive.sql.beam_sql_magics import cache_output
 except (ImportError, NameError):
   pass  # The test is to be skipped because [interactive] dep not installed.
 
@@ -67,11 +70,11 @@
     found = {'target': target}
 
     with patch('apache_beam.runners.interactive.sql.beam_sql_magics.'
-               'pcoll_from_file_cache',
-               lambda a,
-               b,
-               c,
-               d: target):
+               'unreify_from_cache',
+               lambda pipeline,
+               cache_key,
+               cache_manager,
+               element_type: target):
       processed_query, sql_source = _build_query_components(query, found)
 
       self.assertEqual(processed_query, 'SELECT * FROM PCOLLECTION where a=1')
@@ -86,11 +89,11 @@
     found = {'pcoll_1': pcoll_1, 'pcoll_2': pcoll_2}
 
     with patch('apache_beam.runners.interactive.sql.beam_sql_magics.'
-               'pcoll_from_file_cache',
-               lambda a,
-               b,
-               c,
-               d: pcoll_1):
+               'unreify_from_cache',
+               lambda pipeline,
+               cache_key,
+               cache_manager,
+               element_type: pcoll_1):
       processed_query, sql_source = _build_query_components(query, found)
 
       self.assertEqual(processed_query, query)
@@ -110,12 +113,26 @@
                'pcolls_from_streaming_cache',
                lambda a,
                b,
-               c,
-               d,
-               e: found):
+               c: found):
       _, sql_source = _build_query_components(query, found)
       self.assertIs(sql_source, pcoll)
 
+  def test_cache_output(self):
+    p_cache_output = beam.Pipeline()
+    pcoll_co = p_cache_output | 'Create Source' >> beam.Create([1, 2, 3])
+    cache_manager = FileBasedCacheManager()
+    ie.current_env().set_cache_manager(cache_manager, p_cache_output)
+    ib.watch(locals())
+    with patch('apache_beam.runners.interactive.display.pcoll_visualization.'
+               'visualize_computed_pcoll',
+               lambda a,
+               b: None):
+      cache_output('pcoll_co', pcoll_co)
+      self.assertIn(pcoll_co, ie.current_env().computed_pcollections)
+      self.assertTrue(
+          cache_manager.exists(
+              'full', CacheKey.from_pcoll('pcoll_co', pcoll_co).to_str()))
+
 
 if __name__ == '__main__':
   unittest.main()
diff --git a/sdks/python/apache_beam/runners/interactive/sql/utils.py b/sdks/python/apache_beam/runners/interactive/sql/utils.py
index 355b6e6..0c80505 100644
--- a/sdks/python/apache_beam/runners/interactive/sql/utils.py
+++ b/sdks/python/apache_beam/runners/interactive/sql/utils.py
@@ -28,7 +28,6 @@
 
 import apache_beam as beam
 from apache_beam.runners.interactive import interactive_beam as ib
-from apache_beam.runners.interactive import interactive_environment as ie
 
 _LOGGER = logging.getLogger(__name__)
 
@@ -59,17 +58,6 @@
     beam.coders.registry.register_coder(schema, beam.coders.RowCoder)
 
 
-def pcolls_by_name() -> Dict[str, beam.PCollection]:
-  """Finds all PCollections by their variable names defined in the notebook."""
-  inspectables = ie.current_env().inspector.inspectables
-  pcolls = {}
-  for _, inspectable in inspectables.items():
-    metadata = inspectable['metadata']
-    if metadata['type'] == 'pcollection':
-      pcolls[metadata['name']] = inspectable['value']
-  return pcolls
-
-
 def find_pcolls(
     sql: str, pcolls: Dict[str,
                            beam.PCollection]) -> Dict[str, beam.PCollection]:
@@ -100,7 +88,6 @@
             name,
             sql)
         raise
-    _LOGGER.info('Done collecting data.')
   return found
 
 
@@ -123,3 +110,12 @@
     if token_location < len(words) and words[token_location] == pcoll_name:
       words[token_location] = 'PCOLLECTION'
   return ' '.join(words)
+
+
+def pformat_namedtuple(schema: NamedTuple) -> str:
+  return '{}({})'.format(
+      schema.__name__,
+      ', '.join([
+          '{}: {}'.format(k, v.__name__) for k,
+          v in schema._field_types.items()
+      ]))
diff --git a/sdks/python/apache_beam/runners/interactive/sql/utils_test.py b/sdks/python/apache_beam/runners/interactive/sql/utils_test.py
index ed52cad..01a54c3 100644
--- a/sdks/python/apache_beam/runners/interactive/sql/utils_test.py
+++ b/sdks/python/apache_beam/runners/interactive/sql/utils_test.py
@@ -24,10 +24,9 @@
 from unittest.mock import patch
 
 import apache_beam as beam
-from apache_beam.runners.interactive import interactive_beam as ib
 from apache_beam.runners.interactive.sql.utils import find_pcolls
 from apache_beam.runners.interactive.sql.utils import is_namedtuple
-from apache_beam.runners.interactive.sql.utils import pcolls_by_name
+from apache_beam.runners.interactive.sql.utils import pformat_namedtuple
 from apache_beam.runners.interactive.sql.utils import register_coder_for_schema
 from apache_beam.runners.interactive.sql.utils import replace_single_pcoll_token
 
@@ -58,14 +57,6 @@
     self.assertIsInstance(
         beam.coders.registry.get_coder(ANamedTuple), beam.coders.RowCoder)
 
-  def test_pcolls_by_name(self):
-    p = beam.Pipeline()
-    pcoll = p | beam.Create([1])
-    ib.watch({'p': p, 'pcoll': pcoll})
-
-    name_to_pcoll = pcolls_by_name()
-    self.assertIn('pcoll', name_to_pcoll)
-
   def test_find_pcolls(self):
     with patch('apache_beam.runners.interactive.interactive_beam.collect',
                lambda _: None):
@@ -85,6 +76,10 @@
     self.assertEqual(
         replaced_sql, 'SELECT * FROM PCOLLECTION WHERE a=1 AND b=2')
 
+  def test_pformat_namedtuple(self):
+    self.assertEqual(
+        'ANamedTuple(a: int, b: str)', pformat_namedtuple(ANamedTuple))
+
 
 if __name__ == '__main__':
   unittest.main()
diff --git a/sdks/python/apache_beam/runners/interactive/testing/integration/goldens/Darwin/29c9237ddf4f3d5988a503069b4d3c47.png b/sdks/python/apache_beam/runners/interactive/testing/integration/goldens/Darwin/29c9237ddf4f3d5988a503069b4d3c47.png
index 8463b3f..b6af3bd 100644
--- a/sdks/python/apache_beam/runners/interactive/testing/integration/goldens/Darwin/29c9237ddf4f3d5988a503069b4d3c47.png
+++ b/sdks/python/apache_beam/runners/interactive/testing/integration/goldens/Darwin/29c9237ddf4f3d5988a503069b4d3c47.png
Binary files differ
diff --git a/sdks/python/apache_beam/runners/interactive/testing/integration/goldens/Darwin/7a35f487b2a5f3a9b9852a8659eeb4bd.png b/sdks/python/apache_beam/runners/interactive/testing/integration/goldens/Darwin/7a35f487b2a5f3a9b9852a8659eeb4bd.png
index 2179619..cbb27ed 100644
--- a/sdks/python/apache_beam/runners/interactive/testing/integration/goldens/Darwin/7a35f487b2a5f3a9b9852a8659eeb4bd.png
+++ b/sdks/python/apache_beam/runners/interactive/testing/integration/goldens/Darwin/7a35f487b2a5f3a9b9852a8659eeb4bd.png
Binary files differ
diff --git a/sdks/python/apache_beam/runners/interactive/user_pipeline_tracker.py b/sdks/python/apache_beam/runners/interactive/user_pipeline_tracker.py
index 432e3d7..53ee54a 100644
--- a/sdks/python/apache_beam/runners/interactive/user_pipeline_tracker.py
+++ b/sdks/python/apache_beam/runners/interactive/user_pipeline_tracker.py
@@ -24,6 +24,7 @@
 that derived pipelines can link back to the parent user pipeline.
 """
 
+import shutil
 from typing import Iterator
 from typing import Optional
 
@@ -66,6 +67,10 @@
 
   def clear(self) -> None:
     """Clears the tracker of all user and derived pipelines."""
+    # Remove all local_tempdir of created pipelines.
+    for p in self._pid_to_pipelines.values():
+      shutil.rmtree(p.local_tempdir, ignore_errors=True)
+
     self._user_pipelines.clear()
     self._derived_pipelines.clear()
     self._pid_to_pipelines.clear()
diff --git a/sdks/python/apache_beam/runners/interactive/utils.py b/sdks/python/apache_beam/runners/interactive/utils.py
index cb0b7db..2c75cc9 100644
--- a/sdks/python/apache_beam/runners/interactive/utils.py
+++ b/sdks/python/apache_beam/runners/interactive/utils.py
@@ -22,12 +22,16 @@
 import hashlib
 import json
 import logging
+from typing import Dict
 
 import pandas as pd
 
+import apache_beam as beam
 from apache_beam.dataframe.convert import to_pcollection
 from apache_beam.dataframe.frame_base import DeferredBase
 from apache_beam.portability.api.beam_runner_api_pb2 import TestStreamPayload
+from apache_beam.runners.interactive.caching.cacheable import Cacheable
+from apache_beam.runners.interactive.caching.cacheable import CacheKey
 from apache_beam.runners.interactive.caching.expression_cache import ExpressionCache
 from apache_beam.testing.test_stream import WindowedValueHolder
 from apache_beam.typehints.schemas import named_fields_from_element_type
@@ -294,3 +298,96 @@
 
   proxy = df._expr.proxy()
   return to_pcollection(df, yield_elements='pandas', label=str(df._expr)), proxy
+
+
+def pcoll_by_name() -> Dict[str, beam.PCollection]:
+  """Finds all PCollections by their variable names defined in the notebook."""
+  from apache_beam.runners.interactive import interactive_environment as ie
+
+  inspectables = ie.current_env().inspector_with_synthetic.inspectables
+  pcolls = {}
+  for _, inspectable in inspectables.items():
+    metadata = inspectable['metadata']
+    if metadata['type'] == 'pcollection':
+      pcolls[metadata['name']] = inspectable['value']
+  return pcolls
+
+
+def cacheables() -> Dict[CacheKey, Cacheable]:
+  """Finds all Cacheables with their CacheKeys."""
+  from apache_beam.runners.interactive import interactive_environment as ie
+
+  inspectables = ie.current_env().inspector_with_synthetic.inspectables
+  cacheables = {}
+  for _, inspectable in inspectables.items():
+    metadata = inspectable['metadata']
+    if metadata['type'] == 'pcollection':
+      cacheable = Cacheable.from_pcoll(metadata['name'], inspectable['value'])
+      cacheables[cacheable.to_key()] = cacheable
+  return cacheables
+
+
+def watch_sources(pipeline):
+  """Watches the unbounded sources in the pipeline.
+
+  Sources can output to a PCollection without a user variable reference. In
+  this case the source is not cached. We still want to cache the data so we
+  synthetically create a variable to the intermediate PCollection.
+  """
+  from apache_beam.pipeline import PipelineVisitor
+  from apache_beam.runners.interactive import interactive_environment as ie
+
+  retrieved_user_pipeline = ie.current_env().user_pipeline(pipeline)
+  pcoll_to_name = {v: k for k, v in pcoll_by_name().items()}
+
+  class CacheableUnboundedPCollectionVisitor(PipelineVisitor):
+    def __init__(self):
+      self.unbounded_pcolls = set()
+
+    def enter_composite_transform(self, transform_node):
+      self.visit_transform(transform_node)
+
+    def visit_transform(self, transform_node):
+      if isinstance(transform_node.transform,
+                    tuple(ie.current_env().options.recordable_sources)):
+        for pcoll in transform_node.outputs.values():
+          # Only generate a synthetic var when it's not already watched. For
+          # example, the user could have assigned the unbounded source output
+          # to a variable, watching it again with a different variable name
+          # creates ambiguity.
+          if pcoll not in pcoll_to_name:
+            ie.current_env().watch({'synthetic_var_' + str(id(pcoll)): pcoll})
+
+  retrieved_user_pipeline.visit(CacheableUnboundedPCollectionVisitor())
+
+
+def has_unbounded_sources(pipeline):
+  """Checks if a given pipeline has recordable sources."""
+  return len(unbounded_sources(pipeline)) > 0
+
+
+def unbounded_sources(pipeline):
+  """Returns a pipeline's recordable sources."""
+  from apache_beam.pipeline import PipelineVisitor
+  from apache_beam.runners.interactive import interactive_environment as ie
+
+  class CheckUnboundednessVisitor(PipelineVisitor):
+    """Visitor checks if there are any unbounded read sources in the Pipeline.
+
+    Visitor visits all nodes and checks if it is an instance of recordable
+    sources.
+    """
+    def __init__(self):
+      self.unbounded_sources = []
+
+    def enter_composite_transform(self, transform_node):
+      self.visit_transform(transform_node)
+
+    def visit_transform(self, transform_node):
+      if isinstance(transform_node.transform,
+                    tuple(ie.current_env().options.recordable_sources)):
+        self.unbounded_sources.append(transform_node)
+
+  v = CheckUnboundednessVisitor()
+  pipeline.visit(v)
+  return v.unbounded_sources
diff --git a/sdks/python/apache_beam/runners/interactive/utils_test.py b/sdks/python/apache_beam/runners/interactive/utils_test.py
index ecbba30..5929c8e 100644
--- a/sdks/python/apache_beam/runners/interactive/utils_test.py
+++ b/sdks/python/apache_beam/runners/interactive/utils_test.py
@@ -17,6 +17,7 @@
 
 import json
 import logging
+import tempfile
 import unittest
 from typing import NamedTuple
 from unittest.mock import PropertyMock
@@ -30,9 +31,12 @@
 from apache_beam import coders
 from apache_beam.dataframe.convert import to_dataframe
 from apache_beam.portability.api.beam_runner_api_pb2 import TestStreamPayload
+from apache_beam.runners.interactive import interactive_beam as ib
 from apache_beam.runners.interactive import interactive_environment as ie
 from apache_beam.runners.interactive import utils
+from apache_beam.runners.interactive.caching.cacheable import Cacheable
 from apache_beam.runners.interactive.testing.mock_ipython import mock_get_ipython
+from apache_beam.runners.interactive.testing.test_cache_manager import InMemoryCache
 from apache_beam.testing.test_stream import WindowedValueHolder
 from apache_beam.utils.timestamp import Timestamp
 from apache_beam.utils.windowed_value import WindowedValue
@@ -272,5 +276,39 @@
     self.assertEqual(json.loads(dummy()), MessagingUtilTest.SAMPLE_DATA)
 
 
+class GeneralUtilTest(unittest.TestCase):
+  def test_pcoll_by_name(self):
+    p = beam.Pipeline()
+    pcoll = p | beam.Create([1])
+    ib.watch({'p': p, 'pcoll': pcoll})
+
+    name_to_pcoll = utils.pcoll_by_name()
+    self.assertIn('pcoll', name_to_pcoll)
+
+  def test_cacheables(self):
+    p2 = beam.Pipeline()
+    pcoll2 = p2 | beam.Create([2])
+    ib.watch({'p2': p2, 'pcoll2': pcoll2})
+
+    cacheables = utils.cacheables()
+    cacheable_key = Cacheable.from_pcoll('pcoll2', pcoll2).to_key()
+    self.assertIn(cacheable_key, cacheables)
+
+  def test_has_unbounded_source(self):
+    p = beam.Pipeline()
+    ie.current_env().set_cache_manager(InMemoryCache(), p)
+    _ = p | 'ReadUnboundedSource' >> beam.io.ReadFromPubSub(
+        subscription='projects/fake-project/subscriptions/fake_sub')
+    self.assertTrue(utils.has_unbounded_sources(p))
+
+  def test_not_has_unbounded_source(self):
+    p = beam.Pipeline()
+    ie.current_env().set_cache_manager(InMemoryCache(), p)
+    with tempfile.NamedTemporaryFile(delete=False) as f:
+      f.write(b'test')
+    _ = p | 'ReadBoundedSource' >> beam.io.ReadFromText(f.name)
+    self.assertFalse(utils.has_unbounded_sources(p))
+
+
 if __name__ == '__main__':
   unittest.main()
diff --git a/sdks/python/setup.py b/sdks/python/setup.py
index a71e1da..514f4e7 100644
--- a/sdks/python/setup.py
+++ b/sdks/python/setup.py
@@ -219,7 +219,7 @@
     # headless chrome based integration tests
     'selenium>=3.141.0,<4',
     'needle>=0.5.0,<1',
-    'chromedriver-binary>=91,<92',
+    'chromedriver-binary>=93,<94',
     # use a fixed major version of PIL for different python versions
     'pillow>=7.1.1,<8',
 ]