| # |
| # Licensed to the Apache Software Foundation (ASF) under one or more |
| # contributor license agreements. See the NOTICE file distributed with |
| # this work for additional information regarding copyright ownership. |
| # The ASF licenses this file to You under the Apache License, Version 2.0 |
| # (the "License"); you may not use this file except in compliance with |
| # the License. You may obtain a copy of the License at |
| # |
| # http://www.apache.org/licenses/LICENSE-2.0 |
| # |
| # Unless required by applicable law or agreed to in writing, software |
| # distributed under the License is distributed on an "AS IS" BASIS, |
| # WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. |
| # See the License for the specific language governing permissions and |
| # limitations under the License. |
| # |
| |
| """Integration tests for Recommendations AI transforms.""" |
| |
| from __future__ import absolute_import |
| |
| import random |
| import unittest |
| |
| from nose.plugins.attrib import attr |
| |
| import apache_beam as beam |
| from apache_beam.testing.test_pipeline import TestPipeline |
| from apache_beam.testing.util import assert_that |
| from apache_beam.testing.util import equal_to |
| from apache_beam.testing.util import is_not_empty |
| |
| # pylint: disable=wrong-import-order, wrong-import-position, ungrouped-imports |
| try: |
| from google.cloud import recommendationengine |
| from apache_beam.ml.gcp import recommendations_ai |
| except ImportError: |
| recommendationengine = None |
| # pylint: enable=wrong-import-order, wrong-import-position, ungrouped-imports |
| |
| |
| def extract_id(response): |
| yield response["id"] |
| |
| |
| def extract_event_type(response): |
| yield response["event_type"] |
| |
| |
| def extract_prediction(response): |
| yield response[0]["results"] |
| |
| |
| @attr('IT') |
| @unittest.skipIf( |
| recommendationengine is None, |
| "Recommendations AI dependencies not installed.") |
| class RecommendationAIIT(unittest.TestCase): |
| def test_create_catalog_item(self): |
| |
| CATALOG_ITEM = { |
| "id": str(int(random.randrange(100000))), |
| "title": "Sample laptop", |
| "description": "Indisputably the most fantastic laptop ever created.", |
| "language_code": "en", |
| "category_hierarchies": [{ |
| "categories": ["Electronic", "Computers"] |
| }] |
| } |
| |
| with TestPipeline(is_integration_test=True) as p: |
| output = ( |
| p | 'Create data' >> beam.Create([CATALOG_ITEM]) |
| | 'Create CatalogItem' >> |
| recommendations_ai.CreateCatalogItem(project=p.get_option('project')) |
| | beam.ParDo(extract_id) | beam.combiners.ToList()) |
| |
| assert_that(output, equal_to([[CATALOG_ITEM["id"]]])) |
| |
| def test_create_user_event(self): |
| USER_EVENT = {"event_type": "page-visit", "user_info": {"visitor_id": "1"}} |
| |
| with TestPipeline(is_integration_test=True) as p: |
| output = ( |
| p | 'Create data' >> beam.Create([USER_EVENT]) | 'Create UserEvent' >> |
| recommendations_ai.WriteUserEvent(project=p.get_option('project')) |
| | beam.ParDo(extract_event_type) | beam.combiners.ToList()) |
| |
| assert_that(output, equal_to([[USER_EVENT["event_type"]]])) |
| |
| def test_create_predict(self): |
| USER_EVENT = {"event_type": "page-visit", "user_info": {"visitor_id": "1"}} |
| |
| with TestPipeline(is_integration_test=True) as p: |
| output = ( |
| p | 'Create data' >> beam.Create([USER_EVENT]) |
| | 'Predict UserEvent' >> recommendations_ai.PredictUserEvent( |
| project=p.get_option('project'), |
| placement_id="recently_viewed_default") |
| | beam.ParDo(extract_prediction)) |
| |
| assert_that(output, is_not_empty()) |
| |
| |
| if __name__ == '__main__': |
| print(recommendationengine.CatalogItem.__module__) |
| unittest.main() |