blob: f902f748646cf50536cb37f5a5a7a4cc111b5ca2 [file] [log] [blame]
#
# Licensed to the Apache Software Foundation (ASF) under one or more
# contributor license agreements. See the NOTICE file distributed with
# this work for additional information regarding copyright ownership.
# The ASF licenses this file to You under the Apache License, Version 2.0
# (the "License"); you may not use this file except in compliance with
# the License. You may obtain a copy of the License at
#
# http://www.apache.org/licenses/LICENSE-2.0
#
# Unless required by applicable law or agreed to in writing, software
# distributed under the License is distributed on an "AS IS" BASIS,
# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
# See the License for the specific language governing permissions and
# limitations under the License.
#
import unittest
import requests
import json
import argparse
import dateutil.parser
import pytz
from subprocess import Popen
from utils import AppEngine, pjoin
from pio_tests.integration import BaseTestCase, AppContext
class EventserverTest(BaseTestCase):
""" Integration test for PredictionIO Eventserver API
Refer to below for further information:
http://predictionio.incubator.apache.org/datacollection/eventmodel/
http://predictionio.incubator.apache.org/datacollection/eventapi/
"""
# Helper methods
def eventserver_url(self, path=None):
url = 'http://{}:{}'.format(
self.test_context.es_ip, self.test_context.es_port)
if path: url += '/{}'.format(path)
return url
def load_events(self, json_file):
file_path = pjoin(self.test_context.data_directory,
'eventserver_test/{}'.format(json_file))
return json.loads(open(file_path).read())
def setUp(self):
template_path = pjoin(
self.test_context.engine_directory, "recommendation-engine")
app_context = AppContext(
name="MyRecommender",
template=template_path)
self.app = AppEngine(self.test_context, app_context)
def runTest(self):
self.log.info("Check if Eventserver is alive and running")
r = requests.get(self.eventserver_url())
self.assertDictEqual(r.json(), {"status": "alive"})
self.log.info("Cannot view events with empty accessKey")
r = requests.get(self.eventserver_url(path='events.json'))
self.assertDictEqual(r.json(), {"message": "Missing accessKey."})
self.log.info("Cannot view events with invalid accessKey")
r = requests.get(self.eventserver_url(path='events.json'),
params={'accessKey': ''})
self.assertDictEqual(r.json(), {"message": "Invalid accessKey."})
self.log.info("Adding new pio application")
self.app.new()
self.log.info("No events have been sent yet")
r = self.app.get_events()
self.assertDictEqual(r.json(), {"message": "Not Found"})
# Testing POST
self.log.info("Sending single event")
event1 = {
'event' : 'test',
'entityType' : 'test',
'entityId' : 't1'
}
r = self.app.send_event(event1)
self.assertEqual(201, r.status_code)
self.log.info("Sending batch of events")
r = self.app.send_events_batch(
self.load_events("rate_events_25.json"))
self.assertEqual(200, r.status_code)
self.log.info("Cannot send more than 50 events per batch")
r = self.app.send_events_batch(
self.load_events("signup_events_51.json"))
self.assertEqual(400, r.status_code)
self.log.info("Importing events from file does not have batch size limit")
self.app.import_events_batch(
self.load_events("signup_events_51.json"))
self.log.info("Individual events may fail when sending events as batch")
r = self.app.send_events_batch(
self.load_events("partially_malformed_events.json"))
self.assertEqual(200, r.status_code)
self.assertEqual(201, r.json()[0]['status'])
self.assertEqual(400, r.json()[1]['status'])
# Testing GET for different parameters
params = {'event': 'rate'}
r = self.app.get_events(params=params)
self.assertEqual(20, len(r.json()))
self.assertEqual('rate', r.json()[0]['event'])
params = {
'event': 'rate',
'limit': -1 }
r = self.app.get_events(params=params)
self.assertEqual(25, len(r.json()))
self.assertEqual('rate', r.json()[0]['event'])
params = {
'event': 'rate',
'limit': 10 }
r = self.app.get_events(params=params)
self.assertEqual(10, len(r.json()))
self.assertEqual('rate', r.json()[0]['event'])
params = {
'event': 'rate',
'entityType': 'user',
'entityId': '1' }
r = self.app.get_events(params=params)
self.assertEqual(5, len(r.json()))
self.assertEqual('1', r.json()[0]['entityId'])
params = {
'event': 'rate',
'targetEntityType': 'item',
'targetEntityId': '1' }
r = self.app.get_events(params=params)
self.assertEqual(5, len(r.json()))
self.assertEqual('1', r.json()[0]['targetEntityId'])
params = {
'event': 'rate',
'entityType': 'user',
'entityId': '1',
'startTime': '2014-11-01T09:39:45.618-08:00',
'untilTime': '2014-11-04T09:39:45.618-08:00' }
r = self.app.get_events(params=params)
self.assertEqual(3, len(r.json()))
self.assertEqual('1', r.json()[0]['entityId'])
params = {
'event': 'rate',
'entityType': 'user',
'entityId': '1',
'reversed': 'true' }
r = self.app.get_events(params=params)
self.assertEqual(5, len(r.json()))
event_time = dateutil.parser.parse(r.json()[0]['eventTime']).astimezone(pytz.utc)
self.assertEqual('2014-11-05 17:39:45.618000+00:00', str(event_time))
def tearDown(self):
self.log.info("Deleting all app data")
self.app.delete_data()
self.log.info("Deleting app")
self.app.delete()
if __name__ == '__main__':
suite = unittest.TestSuite([BasicEventserverTest])
result = unittest.TextTestRunner(verbosity=2).run(suite)
if not result.wasSuccessful():
sys.exit(1)