blob: 19e6b9e8693f0d5b24361209f9cac9f10b279698 [file] [log] [blame]
#
# Licensed to the Apache Software Foundation (ASF) under one or more
# contributor license agreements. See the NOTICE file distributed with
# this work for additional information regarding copyright ownership.
# The ASF licenses this file to You under the Apache License, Version 2.0
# (the "License"); you may not use this file except in compliance with
# the License. You may obtain a copy of the License at
#
# http://www.apache.org/licenses/LICENSE-2.0
#
# Unless required by applicable law or agreed to in writing, software
# distributed under the License is distributed on an "AS IS" BASIS,
# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
# See the License for the specific language governing permissions and
# limitations under the License.
#
"""Integration tests for Recommendations AI transforms."""
from __future__ import absolute_import
import random
import unittest
from nose.plugins.attrib import attr
import apache_beam as beam
from apache_beam.testing.test_pipeline import TestPipeline
from apache_beam.testing.util import assert_that
from apache_beam.testing.util import equal_to
from apache_beam.testing.util import is_not_empty
# pylint: disable=wrong-import-order, wrong-import-position, ungrouped-imports
try:
from google.cloud import recommendationengine
from apache_beam.ml.gcp import recommendations_ai
except ImportError:
recommendationengine = None
# pylint: enable=wrong-import-order, wrong-import-position, ungrouped-imports
def extract_id(response):
yield response["id"]
def extract_event_type(response):
yield response["event_type"]
def extract_prediction(response):
yield response[0]["results"]
@attr('IT')
@unittest.skipIf(
recommendationengine is None,
"Recommendations AI dependencies not installed.")
class RecommendationAIIT(unittest.TestCase):
def test_create_catalog_item(self):
CATALOG_ITEM = {
"id": str(int(random.randrange(100000))),
"title": "Sample laptop",
"description": "Indisputably the most fantastic laptop ever created.",
"language_code": "en",
"category_hierarchies": [{
"categories": ["Electronic", "Computers"]
}]
}
with TestPipeline(is_integration_test=True) as p:
output = (
p | 'Create data' >> beam.Create([CATALOG_ITEM])
| 'Create CatalogItem' >>
recommendations_ai.CreateCatalogItem(project=p.get_option('project'))
| beam.ParDo(extract_id) | beam.combiners.ToList())
assert_that(output, equal_to([[CATALOG_ITEM["id"]]]))
def test_create_user_event(self):
USER_EVENT = {"event_type": "page-visit", "user_info": {"visitor_id": "1"}}
with TestPipeline(is_integration_test=True) as p:
output = (
p | 'Create data' >> beam.Create([USER_EVENT]) | 'Create UserEvent' >>
recommendations_ai.WriteUserEvent(project=p.get_option('project'))
| beam.ParDo(extract_event_type) | beam.combiners.ToList())
assert_that(output, equal_to([[USER_EVENT["event_type"]]]))
def test_create_predict(self):
USER_EVENT = {"event_type": "page-visit", "user_info": {"visitor_id": "1"}}
with TestPipeline(is_integration_test=True) as p:
output = (
p | 'Create data' >> beam.Create([USER_EVENT])
| 'Predict UserEvent' >> recommendations_ai.PredictUserEvent(
project=p.get_option('project'),
placement_id="recently_viewed_default")
| beam.ParDo(extract_prediction))
assert_that(output, is_not_empty())
if __name__ == '__main__':
print(recommendationengine.CatalogItem.__module__)
unittest.main()