Revert "Merge pull request #13617 from [BEAM-11289] [Python] Integrate Google Cloud Recommendations AI functionality"

This reverts commit d9da8a4dc818b01a86d2dce2e78c0d78b47038bb.
diff --git a/sdks/python/apache_beam/ml/gcp/recommendations_ai.py b/sdks/python/apache_beam/ml/gcp/recommendations_ai.py
deleted file mode 100644
index 6e50cd2..0000000
--- a/sdks/python/apache_beam/ml/gcp/recommendations_ai.py
+++ /dev/null
@@ -1,585 +0,0 @@
-#
-# Licensed to the Apache Software Foundation (ASF) under one or more
-# contributor license agreements.  See the NOTICE file distributed with
-# this work for additional information regarding copyright ownership.
-# The ASF licenses this file to You under the Apache License, Version 2.0
-# (the "License"); you may not use this file except in compliance with
-# the License.  You may obtain a copy of the License at
-#
-#    http://www.apache.org/licenses/LICENSE-2.0
-#
-# Unless required by applicable law or agreed to in writing, software
-# distributed under the License is distributed on an "AS IS" BASIS,
-# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
-# See the License for the specific language governing permissions and
-# limitations under the License.
-#
-
-"""A connector for sending API requests to the GCP Recommendations AI
-API (https://cloud.google.com/recommendations).
-"""
-
-from __future__ import absolute_import
-
-from typing import Sequence
-from typing import Tuple
-
-from google.api_core.retry import Retry
-
-from apache_beam import pvalue
-from apache_beam.metrics import Metrics
-from apache_beam.options.pipeline_options import GoogleCloudOptions
-from apache_beam.transforms import DoFn
-from apache_beam.transforms import ParDo
-from apache_beam.transforms import PTransform
-from apache_beam.transforms.util import GroupIntoBatches
-from cachetools.func import ttl_cache
-
-# pylint: disable=wrong-import-order, wrong-import-position, ungrouped-imports
-try:
-  from google.cloud import recommendationengine
-except ImportError:
-  raise ImportError(
-      'Google Cloud Recommendation AI not supported for this execution '
-      'environment (could not import google.cloud.recommendationengine).')
-# pylint: enable=wrong-import-order, wrong-import-position, ungrouped-imports
-
-__all__ = [
-    'CreateCatalogItem',
-    'WriteUserEvent',
-    'ImportCatalogItems',
-    'ImportUserEvents',
-    'PredictUserEvent'
-]
-
-FAILED_CATALOG_ITEMS = "failed_catalog_items"
-
-
-@ttl_cache(maxsize=128, ttl=3600)
-def get_recommendation_prediction_client():
-  """Returns a Recommendation AI - Prediction Service client."""
-  _client = recommendationengine.PredictionServiceClient()
-  return _client
-
-
-@ttl_cache(maxsize=128, ttl=3600)
-def get_recommendation_catalog_client():
-  """Returns a Recommendation AI - Catalog Service client."""
-  _client = recommendationengine.CatalogServiceClient()
-  return _client
-
-
-@ttl_cache(maxsize=128, ttl=3600)
-def get_recommendation_user_event_client():
-  """Returns a Recommendation AI - UserEvent Service client."""
-  _client = recommendationengine.UserEventServiceClient()
-  return _client
-
-
-class CreateCatalogItem(PTransform):
-  """Creates catalogitem information.
-    The ``PTranform`` returns a PCollectionTuple with a PCollections of
-    successfully and failed created CatalogItems.
-
-    Example usage::
-
-      pipeline | CreateCatalogItem(
-        project='example-gcp-project',
-        catalog_name='my-catalog')
-    """
-  def __init__(
-      self,
-      project: str = None,
-      retry: Retry = None,
-      timeout: float = 120,
-      metadata: Sequence[Tuple[str, str]] = None,
-      catalog_name: str = "default_catalog"):
-    """Initializes a :class:`CreateCatalogItem` transform.
-
-        Args:
-            project (str): Optional. GCP project name in which the catalog
-              data will be imported.
-            retry: Optional. Designation of what
-              errors, if any, should be retried.
-            timeout (float): Optional. The amount of time, in seconds, to wait
-              for the request to complete.
-            metadata: Optional. Strings which
-              should be sent along with the request as metadata.
-            catalog_name (str): Optional. Name of the catalog.
-              Default: 'default_catalog'
-        """
-    self.project = project
-    self.retry = retry
-    self.timeout = timeout
-    self.metadata = metadata
-    self.catalog_name = catalog_name
-
-  def expand(self, pcoll):
-    if self.project is None:
-      self.project = pcoll.pipeline.options.view_as(GoogleCloudOptions).project
-    if self.project is None:
-      raise ValueError(
-          """GCP project name needs to be specified in "project" pipeline
-            option""")
-    return pcoll | ParDo(
-        _CreateCatalogItemFn(
-            self.project,
-            self.retry,
-            self.timeout,
-            self.metadata,
-            self.catalog_name))
-
-
-class _CreateCatalogItemFn(DoFn):
-  def __init__(
-      self,
-      project: str = None,
-      retry: Retry = None,
-      timeout: float = 120,
-      metadata: Sequence[Tuple[str, str]] = None,
-      catalog_name: str = None):
-    self._client = None
-    self.retry = retry
-    self.timeout = timeout
-    self.metadata = metadata
-    self.parent = f"projects/{project}/locations/global/catalogs/{catalog_name}"
-    self.counter = Metrics.counter(self.__class__, "api_calls")
-
-  def setup(self):
-    if self._client is None:
-      self._client = get_recommendation_catalog_client()
-
-  def process(self, element):
-    catalog_item = recommendationengine.CatalogItem(element)
-    request = recommendationengine.CreateCatalogItemRequest(
-        parent=self.parent, catalog_item=catalog_item)
-
-    try:
-      created_catalog_item = self._client.create_catalog_item(
-          request=request,
-          retry=self.retry,
-          timeout=self.timeout,
-          metadata=self.metadata)
-
-      self.counter.inc()
-      yield recommendationengine.CatalogItem.to_dict(created_catalog_item)
-    except Exception:
-      yield pvalue.TaggedOutput(
-          FAILED_CATALOG_ITEMS,
-          recommendationengine.CatalogItem.to_dict(catalog_item))
-
-
-class ImportCatalogItems(PTransform):
-  """Imports catalogitems in bulk.
-    The `PTransform` returns a PCollectionTuple with PCollections of
-    successfully and failed imported CatalogItems.
-
-    Example usage::
-
-      pipeline
-      | ImportCatalogItems(
-          project='example-gcp-project',
-          catalog_name='my-catalog')
-    """
-  def __init__(
-      self,
-      max_batch_size: int = 5000,
-      project: str = None,
-      retry: Retry = None,
-      timeout: float = 120,
-      metadata: Sequence[Tuple[str, str]] = None,
-      catalog_name: str = "default_catalog"):
-    """Initializes a :class:`ImportCatalogItems` transform
-
-        Args:
-            batch_size (int): Required. Maximum number of catalogitems per
-              request.
-            project (str): Optional. GCP project name in which the catalog
-              data will be imported.
-            retry: Optional. Designation of what
-              errors, if any, should be retried.
-            timeout (float): Optional. The amount of time, in seconds, to wait
-              for the request to complete.
-            metadata: Optional. Strings which
-              should be sent along with the request as metadata.
-            catalog_name (str): Optional. Name of the catalog.
-              Default: 'default_catalog'
-        """
-    self.max_batch_size = max_batch_size
-    self.project = project
-    self.retry = retry
-    self.timeout = timeout
-    self.metadata = metadata
-    self.catalog_name = catalog_name
-
-  def expand(self, pcoll):
-    if self.project is None:
-      self.project = pcoll.pipeline.options.view_as(GoogleCloudOptions).project
-    if self.project is None:
-      raise ValueError(
-          'GCP project name needs to be specified in "project" pipeline option')
-    return (
-        pcoll | GroupIntoBatches.WithShardedKey(self.max_batch_size) | ParDo(
-            _ImportCatalogItemsFn(
-                self.project,
-                self.retry,
-                self.timeout,
-                self.metadata,
-                self.catalog_name)))
-
-
-class _ImportCatalogItemsFn(DoFn):
-  def __init__(
-      self,
-      project=None,
-      retry=None,
-      timeout=120,
-      metadata=None,
-      catalog_name=None):
-    self._client = None
-    self.retry = retry
-    self.timeout = timeout
-    self.metadata = metadata
-    self.parent = f"projects/{project}/locations/global/catalogs/{catalog_name}"
-    self.counter = Metrics.counter(self.__class__, "api_calls")
-
-  def setup(self):
-    if self._client is None:
-      self.client = get_recommendation_catalog_client()
-
-  def process(self, element):
-    catalog_items = [recommendationengine.CatalogItem(e) for e in element[1]]
-    catalog_inline_source = recommendationengine.CatalogInlineSource(
-        {"catalog_items": catalog_items})
-    input_config = recommendationengine.InputConfig(
-        catalog_inline_source=catalog_inline_source)
-
-    request = recommendationengine.ImportCatalogItemsRequest(
-        parent=self.parent, input_config=input_config)
-
-    try:
-      operation = self._client.import_catalog_items(
-          request=request,
-          retry=self.retry,
-          timeout=self.timeout,
-          metadata=self.metadata)
-      self.counter.inc(len(catalog_items))
-      yield operation.result()
-    except Exception:
-      yield pvalue.TaggedOutput(FAILED_CATALOG_ITEMS, catalog_items)
-
-
-class WriteUserEvent(PTransform):
-  """Write user event information.
-    The `PTransform` returns a PCollectionTuple with PCollections of
-    successfully and failed written UserEvents.
-
-    Example usage::
-
-      pipeline
-      | WriteUserEvent(
-          project='example-gcp-project',
-          catalog_name='my-catalog',
-          event_store='my_event_store')
-    """
-  def __init__(
-      self,
-      project: str = None,
-      retry: Retry = None,
-      timeout: float = 120,
-      metadata: Sequence[Tuple[str, str]] = None,
-      catalog_name: str = "default_catalog",
-      event_store: str = "default_event_store"):
-    """Initializes a :class:`WriteUserEvent` transform.
-
-        Args:
-            project (str): Optional. GCP project name in which the catalog
-              data will be imported.
-            retry: Optional. Designation of what
-              errors, if any, should be retried.
-            timeout (float): Optional. The amount of time, in seconds, to wait
-              for the request to complete.
-            metadata: Optional. Strings which
-              should be sent along with the request as metadata.
-            catalog_name (str): Optional. Name of the catalog.
-              Default: 'default_catalog'
-            event_store (str): Optional. Name of the event store.
-              Default: 'default_event_store'
-        """
-    self.project = project
-    self.retry = retry
-    self.timeout = timeout
-    self.metadata = metadata
-    self.catalog_name = catalog_name
-    self.event_store = event_store
-
-  def expand(self, pcoll):
-    if self.project is None:
-      self.project = pcoll.pipeline.options.view_as(GoogleCloudOptions).project
-    if self.project is None:
-      raise ValueError(
-          'GCP project name needs to be specified in "project" pipeline option')
-    return pcoll | ParDo(
-        _WriteUserEventFn(
-            self.project,
-            self.retry,
-            self.timeout,
-            self.metadata,
-            self.catalog_name,
-            self.event_store))
-
-
-class _WriteUserEventFn(DoFn):
-  FAILED_USER_EVENTS = "failed_user_events"
-
-  def __init__(
-      self,
-      project=None,
-      retry=None,
-      timeout=120,
-      metadata=None,
-      catalog_name=None,
-      event_store=None):
-    self._client = None
-    self.retry = retry
-    self.timeout = timeout
-    self.metadata = metadata
-    self.parent = f"projects/{project}/locations/global/catalogs/"\
-                  f"{catalog_name}/eventStores/{event_store}"
-    self.counter = Metrics.counter(self.__class__, "api_calls")
-
-  def setup(self):
-    if self._client is None:
-      self._client = get_recommendation_user_event_client()
-
-  def process(self, element):
-    user_event = recommendationengine.UserEvent(element)
-    request = recommendationengine.WriteUserEventRequest(
-        parent=self.parent, user_event=user_event)
-
-    try:
-      created_user_event = self._client.write_user_event(request)
-      self.counter.inc()
-      yield recommendationengine.UserEvent.to_dict(created_user_event)
-    except Exception:
-      yield pvalue.TaggedOutput(
-          self.FAILED_USER_EVENTS,
-          recommendationengine.UserEvent.to_dict(user_event))
-
-
-class ImportUserEvents(PTransform):
-  """Imports userevents in bulk.
-    The `PTransform` returns a PCollectionTuple with PCollections of
-    successfully and failed imported UserEvents.
-
-    Example usage::
-
-      pipeline
-      | ImportUserEvents(
-          project='example-gcp-project',
-          catalog_name='my-catalog',
-          event_store='my_event_store')
-    """
-  def __init__(
-      self,
-      max_batch_size: int = 5000,
-      project: str = None,
-      retry: Retry = None,
-      timeout: float = 120,
-      metadata: Sequence[Tuple[str, str]] = None,
-      catalog_name: str = "default_catalog",
-      event_store: str = "default_event_store"):
-    """Initializes a :class:`WriteUserEvent` transform.
-
-        Args:
-            batch_size (int): Required. Maximum number of catalogitems
-              per request.
-            project (str): Optional. GCP project name in which the catalog
-              data will be imported.
-            retry: Optional. Designation of what
-              errors, if any, should be retried.
-            timeout (float): Optional. The amount of time, in seconds, to wait
-              for the request to complete.
-            metadata: Optional. Strings which
-              should be sent along with the request as metadata.
-            catalog_name (str): Optional. Name of the catalog.
-              Default: 'default_catalog'
-            event_store (str): Optional. Name of the event store.
-              Default: 'default_event_store'
-        """
-    self.max_batch_size = max_batch_size
-    self.project = project
-    self.retry = retry
-    self.timeout = timeout
-    self.metadata = metadata
-    self.catalog_name = catalog_name
-    self.event_store = event_store
-
-  def expand(self, pcoll):
-    if self.project is None:
-      self.project = pcoll.pipeline.options.view_as(GoogleCloudOptions).project
-    if self.project is None:
-      raise ValueError(
-          'GCP project name needs to be specified in "project" pipeline option')
-    return (
-        pcoll | GroupIntoBatches.WithShardedKey(self.max_batch_size) | ParDo(
-            _ImportUserEventsFn(
-                self.project,
-                self.retry,
-                self.timeout,
-                self.metadata,
-                self.catalog_name,
-                self.event_store)))
-
-
-class _ImportUserEventsFn(DoFn):
-  FAILED_USER_EVENTS = "failed_user_events"
-
-  def __init__(
-      self,
-      project=None,
-      retry=None,
-      timeout=120,
-      metadata=None,
-      catalog_name=None,
-      event_store=None):
-    self._client = None
-    self.retry = retry
-    self.timeout = timeout
-    self.metadata = metadata
-    self.parent = f"projects/{project}/locations/global/catalogs/"\
-                  f"{catalog_name}/eventStores/{event_store}"
-    self.counter = Metrics.counter(self.__class__, "api_calls")
-
-  def setup(self):
-    if self._client is None:
-      self.client = get_recommendation_user_event_client()
-
-  def process(self, element):
-
-    user_events = [recommendationengine.UserEvent(e) for e in element[1]]
-    user_event_inline_source = recommendationengine.UserEventInlineSource(
-        {"user_events": user_events})
-    input_config = recommendationengine.InputConfig(
-        user_event_inline_source=user_event_inline_source)
-
-    request = recommendationengine.ImportUserEventsRequest(
-        parent=self.parent, input_config=input_config)
-
-    try:
-      operation = self._client.write_user_event(request)
-      self.counter.inc(len(user_events))
-      yield recommendationengine.PredictResponse.to_dict(operation.result())
-    except Exception:
-      yield pvalue.TaggedOutput(self.FAILED_USER_EVENTS, user_events)
-
-
-class PredictUserEvent(PTransform):
-  """Make a recommendation prediction.
-    The `PTransform` returns a PCollection
-
-    Example usage::
-
-      pipeline
-      | PredictUserEvent(
-          project='example-gcp-project',
-          catalog_name='my-catalog',
-          event_store='my_event_store',
-          placement_id='recently_viewed_default')
-    """
-  def __init__(
-      self,
-      project: str = None,
-      retry: Retry = None,
-      timeout: float = 120,
-      metadata: Sequence[Tuple[str, str]] = None,
-      catalog_name: str = "default_catalog",
-      event_store: str = "default_event_store",
-      placement_id: str = None):
-    """Initializes a :class:`PredictUserEvent` transform.
-
-        Args:
-            project (str): Optional. GCP project name in which the catalog
-              data will be imported.
-            retry: Optional. Designation of what
-              errors, if any, should be retried.
-            timeout (float): Optional. The amount of time, in seconds, to wait
-              for the request to complete.
-            metadata: Optional. Strings which
-              should be sent along with the request as metadata.
-            catalog_name (str): Optional. Name of the catalog.
-              Default: 'default_catalog'
-            event_store (str): Optional. Name of the event store.
-              Default: 'default_event_store'
-            placement_id (str): Required. ID of the recommendation engine
-              placement. This id is used to identify the set of models that
-              will be used to make the prediction.
-        """
-    self.project = project
-    self.retry = retry
-    self.timeout = timeout
-    self.metadata = metadata
-    self.placement_id = placement_id
-    self.catalog_name = catalog_name
-    self.event_store = event_store
-    if placement_id is None:
-      raise ValueError('placement_id must be specified')
-    else:
-      self.placement_id = placement_id
-
-  def expand(self, pcoll):
-    if self.project is None:
-      self.project = pcoll.pipeline.options.view_as(GoogleCloudOptions).project
-    if self.project is None:
-      raise ValueError(
-          'GCP project name needs to be specified in "project" pipeline option')
-    return pcoll | ParDo(
-        _PredictUserEventFn(
-            self.project,
-            self.retry,
-            self.timeout,
-            self.metadata,
-            self.catalog_name,
-            self.event_store,
-            self.placement_id))
-
-
-class _PredictUserEventFn(DoFn):
-  FAILED_PREDICTIONS = "failed_predictions"
-
-  def __init__(
-      self,
-      project=None,
-      retry=None,
-      timeout=120,
-      metadata=None,
-      catalog_name=None,
-      event_store=None,
-      placement_id=None):
-    self._client = None
-    self.retry = retry
-    self.timeout = timeout
-    self.metadata = metadata
-    self.name = f"projects/{project}/locations/global/catalogs/"\
-                f"{catalog_name}/eventStores/{event_store}/placements/"\
-                f"{placement_id}"
-    self.counter = Metrics.counter(self.__class__, "api_calls")
-
-  def setup(self):
-    if self._client is None:
-      self._client = get_recommendation_prediction_client()
-
-  def process(self, element):
-    user_event = recommendationengine.UserEvent(element)
-    request = recommendationengine.PredictRequest(
-        name=self.name, user_event=user_event)
-
-    try:
-      prediction = self._client.predict(request)
-      self.counter.inc()
-      yield [
-          recommendationengine.PredictResponse.to_dict(p)
-          for p in prediction.pages
-      ]
-    except Exception:
-      yield pvalue.TaggedOutput(self.FAILED_PREDICTIONS, user_event)
diff --git a/sdks/python/apache_beam/ml/gcp/recommendations_ai_test.py b/sdks/python/apache_beam/ml/gcp/recommendations_ai_test.py
deleted file mode 100644
index 2f688d9..0000000
--- a/sdks/python/apache_beam/ml/gcp/recommendations_ai_test.py
+++ /dev/null
@@ -1,207 +0,0 @@
-#
-# Licensed to the Apache Software Foundation (ASF) under one or more
-# contributor license agreements.  See the NOTICE file distributed with
-# this work for additional information regarding copyright ownership.
-# The ASF licenses this file to You under the Apache License, Version 2.0
-# (the "License"); you may not use this file except in compliance with
-# the License.  You may obtain a copy of the License at
-#
-#    http://www.apache.org/licenses/LICENSE-2.0
-#
-# Unless required by applicable law or agreed to in writing, software
-# distributed under the License is distributed on an "AS IS" BASIS,
-# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
-# See the License for the specific language governing permissions and
-# limitations under the License.
-#
-
-"""Unit tests for Recommendations AI transforms."""
-
-from __future__ import absolute_import
-
-import unittest
-
-import mock
-
-import apache_beam as beam
-from apache_beam.metrics import MetricsFilter
-
-# pylint: disable=wrong-import-order, wrong-import-position, ungrouped-imports
-try:
-  from google.cloud import recommendationengine
-  from apache_beam.ml.gcp import recommendations_ai
-except ImportError:
-  recommendationengine = None
-# pylint: enable=wrong-import-order, wrong-import-position, ungrouped-imports
-
-
-@unittest.skipIf(
-    recommendationengine is None,
-    "Recommendations AI dependencies not installed.")
-class RecommendationsAICatalogItemTest(unittest.TestCase):
-  def setUp(self):
-    self._mock_client = mock.Mock()
-    self._mock_client.create_catalog_item.return_value = (
-        recommendationengine.CatalogItem())
-    self.m2 = mock.Mock()
-    self.m2.result.return_value = None
-    self._mock_client.import_catalog_items.return_value = self.m2
-
-    self._catalog_item = {
-        "id": "12345",
-        "title": "Sample laptop",
-        "description": "Indisputably the most fantastic laptop ever created.",
-        "language_code": "en",
-        "category_hierarchies": [{
-            "categories": ["Electronic", "Computers"]
-        }]
-    }
-
-  def test_CreateCatalogItem(self):
-    expected_counter = 1
-    with mock.patch.object(recommendations_ai,
-                           'get_recommendation_catalog_client',
-                           return_value=self._mock_client):
-      p = beam.Pipeline()
-
-      _ = (
-          p | "Create data" >> beam.Create([self._catalog_item])
-          | "Create CatalogItem" >>
-          recommendations_ai.CreateCatalogItem(project="test"))
-
-      result = p.run()
-      result.wait_until_finish()
-
-      read_filter = MetricsFilter().with_name('api_calls')
-      query_result = result.metrics().query(read_filter)
-      if query_result['counters']:
-        read_counter = query_result['counters'][0]
-        self.assertTrue(read_counter.result == expected_counter)
-
-  def test_ImportCatalogItems(self):
-    expected_counter = 1
-    with mock.patch.object(recommendations_ai,
-                           'get_recommendation_catalog_client',
-                           return_value=self._mock_client):
-      p = beam.Pipeline()
-
-      _ = (
-          p | "Create data" >> beam.Create([
-              (self._catalog_item["id"], self._catalog_item),
-              (self._catalog_item["id"], self._catalog_item)
-          ]) | "Create CatalogItems" >>
-          recommendations_ai.ImportCatalogItems(project="test"))
-
-      result = p.run()
-      result.wait_until_finish()
-
-      read_filter = MetricsFilter().with_name('api_calls')
-      query_result = result.metrics().query(read_filter)
-      if query_result['counters']:
-        read_counter = query_result['counters'][0]
-        self.assertTrue(read_counter.result == expected_counter)
-
-
-@unittest.skipIf(
-    recommendationengine is None,
-    "Recommendations AI dependencies not installed.")
-class RecommendationsAIUserEventTest(unittest.TestCase):
-  def setUp(self):
-    self._mock_client = mock.Mock()
-    self._mock_client.write_user_event.return_value = (
-        recommendationengine.UserEvent())
-    self.m2 = mock.Mock()
-    self.m2.result.return_value = None
-    self._mock_client.import_user_events.return_value = self.m2
-
-    self._user_event = {
-        "event_type": "page-visit", "user_info": {
-            "visitor_id": "1"
-        }
-    }
-
-  def test_CreateUserEvent(self):
-    expected_counter = 1
-    with mock.patch.object(recommendations_ai,
-                           'get_recommendation_user_event_client',
-                           return_value=self._mock_client):
-      p = beam.Pipeline()
-
-      _ = (
-          p | "Create data" >> beam.Create([self._user_event])
-          | "Create UserEvent" >>
-          recommendations_ai.WriteUserEvent(project="test"))
-
-      result = p.run()
-      result.wait_until_finish()
-
-      read_filter = MetricsFilter().with_name('api_calls')
-      query_result = result.metrics().query(read_filter)
-      if query_result['counters']:
-        read_counter = query_result['counters'][0]
-        self.assertTrue(read_counter.result == expected_counter)
-
-  def test_ImportUserEvents(self):
-    expected_counter = 1
-    with mock.patch.object(recommendations_ai,
-                           'get_recommendation_user_event_client',
-                           return_value=self._mock_client):
-      p = beam.Pipeline()
-
-      _ = (
-          p | "Create data" >> beam.Create([
-              (self._user_event["user_info"]["visitor_id"], self._user_event),
-              (self._user_event["user_info"]["visitor_id"], self._user_event)
-          ]) | "Create UserEvents" >>
-          recommendations_ai.ImportUserEvents(project="test"))
-
-      result = p.run()
-      result.wait_until_finish()
-
-      read_filter = MetricsFilter().with_name('api_calls')
-      query_result = result.metrics().query(read_filter)
-      if query_result['counters']:
-        read_counter = query_result['counters'][0]
-        self.assertTrue(read_counter.result == expected_counter)
-
-
-@unittest.skipIf(
-    recommendationengine is None,
-    "Recommendations AI dependencies not installed.")
-class RecommendationsAIPredictTest(unittest.TestCase):
-  def setUp(self):
-    self._mock_client = mock.Mock()
-    self._mock_client.predict.return_value = [
-        recommendationengine.PredictResponse()
-    ]
-
-    self._user_event = {
-        "event_type": "page-visit", "user_info": {
-            "visitor_id": "1"
-        }
-    }
-
-  def test_Predict(self):
-    expected_counter = 1
-    with mock.patch.object(recommendations_ai,
-                           'get_recommendation_prediction_client',
-                           return_value=self._mock_client):
-      p = beam.Pipeline()
-
-      _ = (
-          p | "Create data" >> beam.Create([self._user_event])
-          | "Prediction UserEvents" >> recommendations_ai.PredictUserEvent(
-              project="test", placement_id="recently_viewed_default"))
-
-      result = p.run()
-      result.wait_until_finish()
-
-      read_filter = MetricsFilter().with_name('api_calls')
-      query_result = result.metrics().query(read_filter)
-      if query_result['counters']:
-        read_counter = query_result['counters'][0]
-        self.assertTrue(read_counter.result == expected_counter)
-
-
-if __name__ == '__main__':
-  unittest.main()
diff --git a/sdks/python/apache_beam/ml/gcp/recommendations_ai_test_it.py b/sdks/python/apache_beam/ml/gcp/recommendations_ai_test_it.py
deleted file mode 100644
index 19e6b9e..0000000
--- a/sdks/python/apache_beam/ml/gcp/recommendations_ai_test_it.py
+++ /dev/null
@@ -1,107 +0,0 @@
-#
-# Licensed to the Apache Software Foundation (ASF) under one or more
-# contributor license agreements.  See the NOTICE file distributed with
-# this work for additional information regarding copyright ownership.
-# The ASF licenses this file to You under the Apache License, Version 2.0
-# (the "License"); you may not use this file except in compliance with
-# the License.  You may obtain a copy of the License at
-#
-#    http://www.apache.org/licenses/LICENSE-2.0
-#
-# Unless required by applicable law or agreed to in writing, software
-# distributed under the License is distributed on an "AS IS" BASIS,
-# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
-# See the License for the specific language governing permissions and
-# limitations under the License.
-#
-
-"""Integration tests for Recommendations AI transforms."""
-
-from __future__ import absolute_import
-
-import random
-import unittest
-
-from nose.plugins.attrib import attr
-
-import apache_beam as beam
-from apache_beam.testing.test_pipeline import TestPipeline
-from apache_beam.testing.util import assert_that
-from apache_beam.testing.util import equal_to
-from apache_beam.testing.util import is_not_empty
-
-# pylint: disable=wrong-import-order, wrong-import-position, ungrouped-imports
-try:
-  from google.cloud import recommendationengine
-  from apache_beam.ml.gcp import recommendations_ai
-except ImportError:
-  recommendationengine = None
-# pylint: enable=wrong-import-order, wrong-import-position, ungrouped-imports
-
-
-def extract_id(response):
-  yield response["id"]
-
-
-def extract_event_type(response):
-  yield response["event_type"]
-
-
-def extract_prediction(response):
-  yield response[0]["results"]
-
-
-@attr('IT')
-@unittest.skipIf(
-    recommendationengine is None,
-    "Recommendations AI dependencies not installed.")
-class RecommendationAIIT(unittest.TestCase):
-  def test_create_catalog_item(self):
-
-    CATALOG_ITEM = {
-        "id": str(int(random.randrange(100000))),
-        "title": "Sample laptop",
-        "description": "Indisputably the most fantastic laptop ever created.",
-        "language_code": "en",
-        "category_hierarchies": [{
-            "categories": ["Electronic", "Computers"]
-        }]
-    }
-
-    with TestPipeline(is_integration_test=True) as p:
-      output = (
-          p | 'Create data' >> beam.Create([CATALOG_ITEM])
-          | 'Create CatalogItem' >>
-          recommendations_ai.CreateCatalogItem(project=p.get_option('project'))
-          | beam.ParDo(extract_id) | beam.combiners.ToList())
-
-      assert_that(output, equal_to([[CATALOG_ITEM["id"]]]))
-
-  def test_create_user_event(self):
-    USER_EVENT = {"event_type": "page-visit", "user_info": {"visitor_id": "1"}}
-
-    with TestPipeline(is_integration_test=True) as p:
-      output = (
-          p | 'Create data' >> beam.Create([USER_EVENT]) | 'Create UserEvent' >>
-          recommendations_ai.WriteUserEvent(project=p.get_option('project'))
-          | beam.ParDo(extract_event_type) | beam.combiners.ToList())
-
-      assert_that(output, equal_to([[USER_EVENT["event_type"]]]))
-
-  def test_create_predict(self):
-    USER_EVENT = {"event_type": "page-visit", "user_info": {"visitor_id": "1"}}
-
-    with TestPipeline(is_integration_test=True) as p:
-      output = (
-          p | 'Create data' >> beam.Create([USER_EVENT])
-          | 'Predict UserEvent' >> recommendations_ai.PredictUserEvent(
-              project=p.get_option('project'),
-              placement_id="recently_viewed_default")
-          | beam.ParDo(extract_prediction))
-
-      assert_that(output, is_not_empty())
-
-
-if __name__ == '__main__':
-  print(recommendationengine.CatalogItem.__module__)
-  unittest.main()
diff --git a/sdks/python/setup.py b/sdks/python/setup.py
index 7117a55..775f321 100644
--- a/sdks/python/setup.py
+++ b/sdks/python/setup.py
@@ -200,7 +200,6 @@
     'google-cloud-language>=1.3.0,<2',
     'google-cloud-videointelligence>=1.8.0,<2',
     'google-cloud-vision>=0.38.0,<2',
-    'google-cloud-recommendations-ai>=0.1.0,<=0.2.0'
 ]
 
 INTERACTIVE_BEAM = [