Merge branch 'develop'
diff --git a/.gitignore b/.gitignore
index 20535e3..6311c3e 100644
--- a/.gitignore
+++ b/.gitignore
@@ -2,6 +2,7 @@
.project
.pydevproject
.settings
+*.swp
build/
dist/
MANIFEST
diff --git a/docs/source/conf.py b/docs/source/conf.py
index bce5d90..43be1a0 100644
--- a/docs/source/conf.py
+++ b/docs/source/conf.py
@@ -49,9 +49,9 @@
# built documents.
#
# The short X.Y version.
-version = '0.6'
+version = '0.7'
# The full version, including alpha/beta/rc tags.
-release = '0.6.3'
+release = '0.7.0'
# The language for content autogenerated by Sphinx. Refer to documentation
# for a list of supported languages.
diff --git a/docs/source/predictionio.rst b/docs/source/predictionio.rst
index 4890e5c..4249a99 100644
--- a/docs/source/predictionio.rst
+++ b/docs/source/predictionio.rst
@@ -79,6 +79,18 @@
>>> result = client.get_itemsim_topn("engine-2", "i200", 5)
+ To get top 5 similar items given a list of items i200,i300,i400 from the item similarity engine "engine-2"
+
+ >>> result = client.get_itemsim_topn("engine-2", "i200,i300,i400", 5)
+
+**Item Rank Engine**
+
+ To rank a list of items i100, i101, i102, i103 for user "u100" from the item rank engine "engine-3"
+
+ >>> client.identify("u100")
+ >>> result = client.get_itemrank_ranked("engine-3", ["i100","i101","i102", "i103"])
+
+
Please refer to the documentation of the :class:`predictionio.Client` class for more details of all available methods.
@@ -193,7 +205,7 @@
.. automethod:: pending_requests
.. versionadded:: 0.6.1
-
+
|
.. _sync-methods-label:
@@ -224,13 +236,17 @@
.. versionadded:: 0.6.0
+ .. automethod:: get_itemrank_ranked
+
+ .. versionadded:: 0.7.0
+
.. automethod:: get_itemrec
-
+
.. deprecated:: 0.5.0
Use :func:`get_itemrec_topn` instead.
.. automethod:: user_conversion_item
-
+
.. deprecated:: 0.5.0
Use :func:`record_action_on_item` instead.
@@ -283,6 +299,10 @@
.. versionadded:: 0.6.0
+ .. automethod:: aget_itemrank_ranked
+
+ .. versionadded:: 0.7.0
+
.. automethod:: aget_itemrec
.. deprecated:: 0.5.0
@@ -314,5 +334,3 @@
Use :func:`arecord_action_on_item` instead.
.. automethod:: aresp
-
-
diff --git a/examples/__init__.py b/examples/__init__.py
new file mode 100644
index 0000000..e69de29
--- /dev/null
+++ b/examples/__init__.py
diff --git a/examples/itemrec/__init__.py b/examples/itemrec/__init__.py
new file mode 100644
index 0000000..e69de29
--- /dev/null
+++ b/examples/itemrec/__init__.py
diff --git a/examples/itemrec/movies/README.md b/examples/itemrec/movies/README.md
new file mode 100644
index 0000000..9ef357a
--- /dev/null
+++ b/examples/itemrec/movies/README.md
@@ -0,0 +1,15 @@
+PredictionIO Python SDK Example
+===============================
+
+Please execute all commands from repository root.
+
+Step 1. Get sample data and unzip it.
+
+ > wget http://www.grouplens.org/system/files/ml-100k.zip
+ > unzip ml-100k.zip
+
+Step 2. Configurate examples/itemrec/movies/appdata.py
+
+Step 3. Run this app:
+
+ > python -m examples.itemrec.movies.movie_rec_app
diff --git a/examples/itemrec/movies/__init__.py b/examples/itemrec/movies/__init__.py
new file mode 100644
index 0000000..e69de29
--- /dev/null
+++ b/examples/itemrec/movies/__init__.py
diff --git a/examples/itemrec/movies/app_config.py b/examples/itemrec/movies/app_config.py
index 24636ab..5efa7a0 100644
--- a/examples/itemrec/movies/app_config.py
+++ b/examples/itemrec/movies/app_config.py
@@ -1,4 +1,2 @@
-
-APP_KEY = 'wWKeyokhpwEgis9BHCHkwqB7Hcop0OEF6KzkZWXfz2uMJYjl2QDwNlFVGl5hhHfy'
+APP_KEY = 'uJKTKyUAFNZYQQO5yxkdrSo3XIlaf9LXejI63CWE0mtZVEYF89hyVtOwpMKfXXXX'
API_URL = 'http://localhost:8000'
-
diff --git a/examples/itemrec/movies/appdata.py b/examples/itemrec/movies/appdata.py
index f7dad34..fb3c7fd 100644
--- a/examples/itemrec/movies/appdata.py
+++ b/examples/itemrec/movies/appdata.py
@@ -15,134 +15,134 @@
class User:
- def __init__(self, uid):
- self.uid = uid
- self.rec = [] # recommendations, list of iid
+ def __init__(self, uid):
+ self.uid = uid
+ self.rec = [] # recommendations, list of iid
- def __str__(self):
- return "User[uid=%s,rec=%s]" % (self.uid, self.rec)
+ def __str__(self):
+ return "User[uid=%s,rec=%s]" % (self.uid, self.rec)
class Item:
- def __init__(self, iid, name):
- self.iid = iid
- self.name = name
+ def __init__(self, iid, name):
+ self.iid = iid
+ self.name = name
- def __str__(self):
- return "Item[iid=%s,name=%s]" % (self.iid, self.name)
+ def __str__(self):
+ return "Item[iid=%s,name=%s]" % (self.iid, self.name)
class RateAction:
- def __init__(self, uid, iid, rating, t):
- self.uid = uid
- self.iid = iid
- self.rating = rating
- self.t = t
+ def __init__(self, uid, iid, rating, t):
+ self.uid = uid
+ self.iid = iid
+ self.rating = rating
+ self.t = t
- def __str__(self):
- return "RateAction[uid=%s,iid=%s,rating=%s,t=%s]" % (self.uid, self.iid, self.rating, self.t)
+ def __str__(self):
+ return "RateAction[uid=%s,iid=%s,rating=%s,t=%s]" % (self.uid, self.iid, self.rating, self.t)
class AppData:
- def __init__(self):
- self._users = {} # dict of User obj
- self._items = {} # dict of Item obj
- self._rate_actions = [] # list of RateAction obj
+ def __init__(self):
+ self._users = {} # dict of User obj
+ self._items = {} # dict of Item obj
+ self._rate_actions = [] # list of RateAction obj
- self._users_file = "%s/%s" % (APPDATA_DIRNAME, USERS_FILENAME)
- self._items_file = "%s/%s" % (APPDATA_DIRNAME, ITEMS_FILENAME)
- self._rate_actions_file = "%s/%s" % (APPDATA_DIRNAME, RATE_ACTIONS_FILENAME)
- self.__init_users()
- self.__init_items()
- self.__init_rate_actions()
+ self._users_file = "%s/%s" % (APPDATA_DIRNAME, USERS_FILENAME)
+ self._items_file = "%s/%s" % (APPDATA_DIRNAME, ITEMS_FILENAME)
+ self._rate_actions_file = "%s/%s" % (APPDATA_DIRNAME, RATE_ACTIONS_FILENAME)
+ self.__init_users()
+ self.__init_items()
+ self.__init_rate_actions()
- def __init_users(self):
- """
- uid|
- """
- print "[Info] Initializing users..."
- f = open(self._users_file, 'r')
- for line in f:
- data = line.rstrip('\r\n').split(USERS_FILE_DELIMITER)
- self.add_user(User(data[0]))
- f.close()
- print "[Info] %s users were initialized." % len(self._users)
+ def __init_users(self):
+ """
+ uid|
+ """
+ print "[Info] Initializing users..."
+ f = open(self._users_file, 'r')
+ for line in f:
+ data = line.rstrip('\r\n').split(USERS_FILE_DELIMITER)
+ self.add_user(User(data[0]))
+ f.close()
+ print "[Info] %s users were initialized." % len(self._users)
- def __init_items(self):
- """
- iid|name
- """
- print "[Info] Initializing items..."
- f = open(self._items_file, 'r')
- for line in f:
- data = line.rstrip('\r\n').split(ITEMS_FILE_DELIMITER)
- self.add_item(Item(data[0], data[1]))
- f.close()
- print "[Info] %s items were initialized." % len(self._items)
+ def __init_items(self):
+ """
+ iid|name
+ """
+ print "[Info] Initializing items..."
+ f = open(self._items_file, 'r')
+ for line in f:
+ data = line.rstrip('\r\n').split(ITEMS_FILE_DELIMITER)
+ self.add_item(Item(data[0], data[1]))
+ f.close()
+ print "[Info] %s items were initialized." % len(self._items)
- def __init_rate_actions(self):
- """
- uid|iid|rating|timestamp
- """
- print "[Info] Initializing rate actions..."
- f = open(self._rate_actions_file, 'r')
- for line in f:
- data = line.rstrip('\r\n').split(RATE_ACTIONS_DELIMITER)
- t = datetime.datetime.utcfromtimestamp(int(data[3])).isoformat()
- self.add_rate_action(RateAction(data[0], data[1], data[2], t))
- f.close()
- print "[Info] %s rate actions were initialized." % len(self._rate_actions)
+ def __init_rate_actions(self):
+ """
+ uid|iid|rating|timestamp
+ """
+ print "[Info] Initializing rate actions..."
+ f = open(self._rate_actions_file, 'r')
+ for line in f:
+ data = line.rstrip('\r\n').split(RATE_ACTIONS_DELIMITER)
+ t = datetime.datetime.utcfromtimestamp(int(data[3])).isoformat()
+ self.add_rate_action(RateAction(data[0], data[1], data[2], t))
+ f.close()
+ print "[Info] %s rate actions were initialized." % len(self._rate_actions)
- def add_user(self, user):
- self._users[user.uid] = user
+ def add_user(self, user):
+ self._users[user.uid] = user
- def add_item(self, item):
- self._items[item.iid] = item
+ def add_item(self, item):
+ self._items[item.iid] = item
- def add_rate_action(self, action):
- self._rate_actions.append(action)
+ def add_rate_action(self, action):
+ self._rate_actions.append(action)
- def get_users(self):
- return self._users
+ def get_users(self):
+ return self._users
- def get_items(self):
- return self._items
+ def get_items(self):
+ return self._items
- def get_rate_actions(self):
- return self._rate_actions
+ def get_rate_actions(self):
+ return self._rate_actions
- def get_user(self, uid):
- """return single user
- """
- if uid in self._users:
- return self._users[uid]
- else:
- return None
+ def get_user(self, uid):
+ """return single user
+ """
+ if uid in self._users:
+ return self._users[uid]
+ else:
+ return None
- def get_item(self, iid):
- """return single item
- """
- if iid in self._items:
- return self._items[iid]
- else:
- return None
+ def get_item(self, iid):
+ """return single item
+ """
+ if iid in self._items:
+ return self._items[iid]
+ else:
+ return None
- def get_top_rated_items(self, uid, n):
- """get top n rated iids by this uid
- """
- if uid in self._users:
- actions = filter(lambda u: u.uid==uid, self._rate_actions)
- top = sorted(actions, key=attrgetter('rating'), reverse=True)
- topn_iids = map(lambda a: a.iid, top[:n])
- return topn_iids
- else:
- return None
+ def get_top_rated_items(self, uid, n):
+ """get top n rated iids by this uid
+ """
+ if uid in self._users:
+ actions = filter(lambda u: u.uid==uid, self._rate_actions)
+ top = sorted(actions, key=attrgetter('rating'), reverse=True)
+ topn_iids = map(lambda a: a.iid, top[:n])
+ return topn_iids
+ else:
+ return None
- def get_top_rate_actions(self, uid, n):
- """get top n rated actions by this uid
- """
- if uid in self._users:
- actions = filter(lambda u: u.uid==uid, self._rate_actions)
- top = sorted(actions, key=attrgetter('rating'), reverse=True)
- return top[:n]
- else:
- return None
+ def get_top_rate_actions(self, uid, n):
+ """get top n rated actions by this uid
+ """
+ if uid in self._users:
+ actions = filter(lambda u: u.uid==uid, self._rate_actions)
+ top = sorted(actions, key=attrgetter('rating'), reverse=True)
+ return top[:n]
+ else:
+ return None
diff --git a/examples/itemrec/movies/batch_import.py b/examples/itemrec/movies/batch_import.py
index 1cb797a..b2e0e63 100644
--- a/examples/itemrec/movies/batch_import.py
+++ b/examples/itemrec/movies/batch_import.py
@@ -1,4 +1,3 @@
-
from appdata import AppData
import predictionio
import sys
@@ -7,60 +6,60 @@
def batch_import_task(app_data, client, all_info=False):
- print "[Info] Importing users to PredictionIO..."
- count = 0
- for k, v in app_data.get_users().iteritems():
- count += 1
- if all_info:
- print "[Info] Importing %s..." % v
- else:
- if (count % 32 == 0):
- sys.stdout.write('\r[Info] %s' % count)
- sys.stdout.flush()
+ print "[Info] Importing users to PredictionIO..."
+ count = 0
+ for k, v in app_data.get_users().iteritems():
+ count += 1
+ if all_info:
+ print "[Info] Importing %s..." % v
+ else:
+ if (count % 32 == 0):
+ sys.stdout.write('\r[Info] %s' % count)
+ sys.stdout.flush()
- client.create_user(v.uid)
-
- sys.stdout.write('\r[Info] %s users were imported.\n' % count)
- sys.stdout.flush()
+ client.create_user(v.uid)
+
+ sys.stdout.write('\r[Info] %s users were imported.\n' % count)
+ sys.stdout.flush()
- print "[Info] Importing items to PredictionIO..."
- count = 0
- for k, v in app_data.get_items().iteritems():
- count += 1
- if all_info:
- print "[Info] Importing %s..." % v
- else:
- if (count % 32 == 0):
- sys.stdout.write('\r[Info] %s' % count)
- sys.stdout.flush()
+ print "[Info] Importing items to PredictionIO..."
+ count = 0
+ for k, v in app_data.get_items().iteritems():
+ count += 1
+ if all_info:
+ print "[Info] Importing %s..." % v
+ else:
+ if (count % 32 == 0):
+ sys.stdout.write('\r[Info] %s' % count)
+ sys.stdout.flush()
- client.create_item(v.iid, ("movie",))
-
- sys.stdout.write('\r[Info] %s items were imported.\n' % count)
- sys.stdout.flush()
+ client.create_item(v.iid, ("movie",))
+
+ sys.stdout.write('\r[Info] %s items were imported.\n' % count)
+ sys.stdout.flush()
- print "[Info] Importing rate actions to PredictionIO..."
- count = 0
- for v in app_data.get_rate_actions():
- count += 1
- if all_info:
- print "[Info] Importing %s..." % v
- else:
- if (count % 32 == 0):
- sys.stdout.write('\r[Info] %s' % count)
- sys.stdout.flush()
+ print "[Info] Importing rate actions to PredictionIO..."
+ count = 0
+ for v in app_data.get_rate_actions():
+ count += 1
+ if all_info:
+ print "[Info] Importing %s..." % v
+ else:
+ if (count % 32 == 0):
+ sys.stdout.write('\r[Info] %s' % count)
+ sys.stdout.flush()
- client.identify(v.uid)
- client.record_action_on_item("rate", v.iid, { "pio_rate": v.rating, "pio_t": v.t })
+ client.identify(v.uid)
+ client.record_action_on_item("rate", v.iid, { "pio_rate": v.rating, "pio_t": v.t })
- sys.stdout.write('\r[Info] %s rate actions were imported.\n' % count)
- sys.stdout.flush()
+ sys.stdout.write('\r[Info] %s rate actions were imported.\n' % count)
+ sys.stdout.flush()
if __name__ == '__main__':
- app_data = AppData()
- client = predictionio.Client(APP_KEY, 1, API_URL)
- batch_import_task(app_data, client)
- client.close()
+ app_data = AppData()
+ client = predictionio.Client(APP_KEY, 1, API_URL)
+ batch_import_task(app_data, client)
+ client.close()
diff --git a/examples/itemrec/movies/movie_rec_app.py b/examples/itemrec/movies/movie_rec_app.py
index 21bc51e..436af38 100644
--- a/examples/itemrec/movies/movie_rec_app.py
+++ b/examples/itemrec/movies/movie_rec_app.py
@@ -1,3 +1,15 @@
+# To run this example app
+#
+# Please execute all commands from repository root.
+#
+# Step 1. Get sample data and unzip it.
+# > wget http://www.grouplens.org/system/files/ml-100k.zip
+# > unzip ml-100k.zip
+#
+# Step 2. Configurate examples/itemrec/movies/appdata.py
+#
+# Step 3. Run this app:
+# python -m examples.itemrec.movies.movie_rec_app
from appdata import AppData
import predictionio
@@ -9,132 +21,132 @@
class App:
- def __init__(self):
- self._app_data = AppData()
- self._client = predictionio.Client(APP_KEY, 1, API_URL)
+ def __init__(self):
+ self._app_data = AppData()
+ self._client = predictionio.Client(APP_KEY, 1, API_URL)
- def run(self):
- state = "[Main Menu]"
+ def run(self):
+ state = "[Main Menu]"
- prompt = "\n"\
- "%s\n"\
- "%s\n"\
- "Please input selection:\n"\
- " 0: Quit application.\n"\
- " 1: Get Recommendations from PredictionIO.\n"\
- " 2: Display user's data." % (state, '-'*len(state))
+ prompt = "\n"\
+ "%s\n"\
+ "%s\n"\
+ "Please input selection:\n"\
+ " 0: Quit application.\n"\
+ " 1: Get Recommendations from PredictionIO.\n"\
+ " 2: Display user's data." % (state, '-'*len(state))
- while True:
- print prompt
- choice = raw_input().lower()
- if choice == '0':
- print "\nGood Bye!\n"
- break
- elif choice == '1':
- self.recommend_task(state)
- elif choice == '2':
- self.display_user_task(state)
- else:
- print '[Error] \'%s\' is not a valid selection.' % choice
+ while True:
+ print prompt
+ choice = raw_input().lower()
+ if choice == '0':
+ print "\nGood Bye!\n"
+ break
+ elif choice == '1':
+ self.recommend_task(state)
+ elif choice == '2':
+ self.display_user_task(state)
+ else:
+ print '[Error] \'%s\' is not a valid selection.' % choice
- self._client.close()
+ self._client.close()
- def recommend_task(self, prev_state):
- state = prev_state + " / [Get Recommendations]"
- prompt = "\n"\
- "%s\n"\
- "%s\n"\
- "Please enter user id:" % (state, '-'*len(state))
+ def recommend_task(self, prev_state):
+ state = prev_state + " / [Get Recommendations]"
+ prompt = "\n"\
+ "%s\n"\
+ "%s\n"\
+ "Please enter user id:" % (state, '-'*len(state))
- while True:
- print prompt
- choice = raw_input().lower()
- u = self._app_data.get_user(choice)
- if u:
- n = 10
- print "[Info] Getting top %s item recommendations for user %s..." % (n, u.uid)
- try:
- self._client.identify(u.uid)
- rec = self._client.get_itemrec_topn(ENGINE_NAME, n)
- u.rec = rec['pio_iids']
- self.display_items(u.rec)
- except predictionio.ItemRecNotFoundError:
- print "[Info] Recommendation not found"
+ while True:
+ print prompt
+ choice = raw_input().lower()
+ u = self._app_data.get_user(choice)
+ if u:
+ n = 10
+ print "[Info] Getting top %s item recommendations for user %s..." % (n, u.uid)
+ try:
+ self._client.identify(u.uid)
+ rec = self._client.get_itemrec_topn(ENGINE_NAME, n)
+ u.rec = rec['pio_iids']
+ self.display_items(u.rec)
+ except predictionio.ItemRecNotFoundError:
+ print "[Info] Recommendation not found"
- print "[Info] Go back to previous menu..."
- break
- else:
- print "[Error] invalid user id %s. Go back to previous menu..." % choice
- break
+ print "[Info] Go back to previous menu..."
+ break
+ else:
+ print "[Error] invalid user id %s. Go back to previous menu..." % choice
+ break
- def display_user_task(self, prev_state):
- state = prev_state + " / [Display User]"
- prompt = "\n"\
- "%s\n"\
- "%s\n"\
- "Please enter user id:" % (state, '-'*len(state))
+ def display_user_task(self, prev_state):
+ state = prev_state + " / [Display User]"
+ prompt = "\n"\
+ "%s\n"\
+ "%s\n"\
+ "Please enter user id:" % (state, '-'*len(state))
- while True:
- print prompt
- choice = raw_input().lower()
- u = self._app_data.get_user(choice)
- if u:
- print "[Info] User %s:" % u.uid
- n = 10
- topn_rate_actions = self._app_data.get_top_rate_actions(u.uid, n)
- print "\n[Info] Top %s movies rated by this user:" % n
- self.display_rate_actions(topn_rate_actions)
+ while True:
+ print prompt
+ choice = raw_input().lower()
+ u = self._app_data.get_user(choice)
+ if u:
+ print "[Info] User %s:" % u.uid
+ n = 10
+ topn_rate_actions = self._app_data.get_top_rate_actions(u.uid, n)
+ print "\n[Info] Top %s movies rated by this user:" % n
+ self.display_rate_actions(topn_rate_actions)
- print "\n[Info] Movies recommended to this user:"
- self.display_items(u.rec)
+ print "\n[Info] Movies recommended to this user:"
+ self.display_items(u.rec)
- self.wait_for_ack()
- print "\n[Info] Go back to previous menu..."
- break
- else:
- print "[Error] invalid user id %s. Go back to previous menu..." % choice
- break
-
- def display_items(self, iids, all_info=False):
- """print item info for each iid in the list
- """
- if iids:
- for iid in iids:
- item = self._app_data.get_item(iid)
- if item:
- if all_info:
- print "[Info] %s" % item
- else:
- print "[Info] %s" % item.name
- else:
- print "[Error] Invalid item id %s" % iid
- else:
- print "[Info] Empty."
+ self.wait_for_ack()
+ print "\n[Info] Go back to previous menu..."
+ break
+ else:
+ print "[Error] invalid user id %s. Go back to previous menu..." % choice
+ break
+
+ def display_items(self, iids, all_info=False):
+ """print item info for each iid in the list
+ """
+ if iids:
+ for iid in iids:
+ item = self._app_data.get_item(iid)
+ if item:
+ if all_info:
+ print "[Info] %s" % item
+ else:
+ print "[Info] %s" % item.name
+ else:
+ print "[Error] Invalid item id %s" % iid
+ else:
+ print "[Info] Empty."
- def display_rate_actions(self, actions):
- """print iid and rating
- """
- if actions:
- for a in actions:
- item = self._app_data.get_item(a.iid)
- if item:
- print "[Info] %s, rating = %s" % (item.name, a.rating)
- else:
- print "[Error] Invalid item id %s" % a.iid
- else:
- print "[Info] Empty."
+ def display_rate_actions(self, actions):
+ """print iid and rating
+ """
+ if actions:
+ for a in actions:
+ item = self._app_data.get_item(a.iid)
+ if item:
+ print "[Info] %s, rating = %s" % (item.name, a.rating)
+ else:
+ print "[Error] Invalid item id %s" % a.iid
+ else:
+ print "[Info] Empty."
- def wait_for_ack(self):
+ def wait_for_ack(self):
- prompt = "\nPress enter to continue..."
- print prompt
- choice = raw_input().lower()
+ prompt = "\nPress enter to continue..."
+ print prompt
+ choice = raw_input().lower()
if __name__ == '__main__':
- print "\nWelcome To PredictionIO Python-SDK Demo App!"
- print "============================================\n"
-
- my_app = App()
- my_app.run()
+ print "\nWelcome To PredictionIO Python-SDK Demo App!"
+ print "============================================\n"
+
+ my_app = App()
+ my_app.run()
diff --git a/predictionio/__init__.py b/predictionio/__init__.py
index 648c0cd..30e39e6 100644
--- a/predictionio/__init__.py
+++ b/predictionio/__init__.py
@@ -6,18 +6,20 @@
__author__ = "The PredictionIO Team"
__email__ = "help@tappingstone.com"
-__copyright__ = "Copyright 2013, TappingStone, Inc."
+__copyright__ = "Copyright 2014, TappingStone, Inc."
__license__ = "Apache License, Version 2.0"
-__version__ = "0.6.3"
+__version__ = "0.7.0"
# import packages
import re
try:
- import httplib
+ import httplib
except ImportError:
- from http import client as httplib
+ # pylint: disable=F0401
+ # http is a Python3 module, replacing httplib
+ from http import client as httplib
import json
import urllib
@@ -33,68 +35,72 @@
class ServerStatusError(PredictionIOAPIError):
- "Error happened when tried to get status of the API server"
- pass
+ "Error happened when tried to get status of the API server"
+ pass
class UserNotCreatedError(PredictionIOAPIError):
- "Error happened when tried to create user"
- pass
+ "Error happened when tried to create user"
+ pass
class UserNotFoundError(PredictionIOAPIError):
- "Error happened when tried to get user"
- pass
+ "Error happened when tried to get user"
+ pass
class UserNotDeletedError(PredictionIOAPIError):
- "Error happened when tried to delete user"
- pass
+ "Error happened when tried to delete user"
+ pass
class ItemNotCreatedError(PredictionIOAPIError):
- "Error happened when tried to create item"
- pass
+ "Error happened when tried to create item"
+ pass
class ItemNotFoundError(PredictionIOAPIError):
- "Error happened when tried to get item"
- pass
+ "Error happened when tried to get item"
+ pass
class ItemNotDeletedError(PredictionIOAPIError):
- "Error happened when tried to delete item"
- pass
+ "Error happened when tried to delete item"
+ pass
class U2IActionNotCreatedError(PredictionIOAPIError):
- "Error happened when tried to create user-to-item action"
- pass
+ "Error happened when tried to create user-to-item action"
+ pass
class ItemRecNotFoundError(PredictionIOAPIError):
- "Error happened when tried to get item recommendation"
- pass
+ "Error happened when tried to get item recommendation"
+ pass
class ItemSimNotFoundError(PredictionIOAPIError):
- "Error happened when tried to get similar items"
- pass
+ "Error happened when tried to get similar items"
+ pass
+class ItemRankNotFoundError(PredictionIOAPIError):
+
+ "Error happened when treid to rank item"
+ pass
class InvalidArgumentError(PredictionIOAPIError):
- "Arguments are not valid"
- pass
+ "Arguments are not valid"
+ pass
# map to API
LIKE_API = "like"
@@ -106,1012 +112,1103 @@
class Client(object):
- """PredictionIO client object.
+ """PredictionIO client object.
- This is an object representing a PredictionIO's client. This object
- provides methods for making PredictionIO API requests.
+ This is an object representing a PredictionIO's client. This object
+ provides methods for making PredictionIO API requests.
- :param appkey: the App Key provided by PredictionIO.
- :param threads: number of threads to handle PredictionIO API requests.
- Must be >= 1.
- :param apiurl: the PredictionIO API URL path.
- :param apiversion: the PredictionIO API version. (optional)
- (eg. "", or "/v1")
- :param qsize: the max size of the request queue (optional).
- The asynchronous request becomes blocking once this size has been
- reached, until the queued requests are handled.
- Default value is 0, which means infinite queue size.
- :param timeout: timeout for HTTP connection attempts and requests in
- seconds (optional).
- Default value is 5.
+ :param appkey: the App Key provided by PredictionIO.
+ :param threads: number of threads to handle PredictionIO API requests.
+ Must be >= 1.
+ :param apiurl: the PredictionIO API URL path.
+ :param apiversion: the PredictionIO API version. (optional)
+ (eg. "", or "/v1")
+ :param qsize: the max size of the request queue (optional).
+ The asynchronous request becomes blocking once this size has been
+ reached, until the queued requests are handled.
+ Default value is 0, which means infinite queue size.
+ :param timeout: timeout for HTTP connection attempts and requests in
+ seconds (optional).
+ Default value is 5.
+
+ """
+
+ def __init__(self, appkey, threads=1, apiurl="http://localhost:8000",
+ apiversion="", qsize=0, timeout=5):
+ """Constructor of Client object.
+
+ """
+ self.appkey = appkey
+ self.threads = threads
+ self.apiurl = apiurl
+ self.apiversion = apiversion
+ self.qsize = qsize
+ self.timeout = timeout
+
+ # check connection type
+ https_pattern = r'^https://(.*)'
+ http_pattern = r'^http://(.*)'
+ m = re.match(https_pattern, apiurl)
+ self.https = True
+ if m is None: # not matching https
+ m = re.match(http_pattern, apiurl)
+ self.https = False
+ if m is None: # not matching http either
+ raise InvalidArgumentError("apiurl is not valid: %s" % apiurl)
+ self.host = m.group(1)
+
+ self._uid = None # identified uid
+ self._connection = Connection(host=self.host, threads=self.threads,
+ qsize=self.qsize, https=self.https,
+ timeout=self.timeout)
+
+ def close(self):
+ """Close this client and the connection.
+
+ Call this method when you want to completely terminate the connection
+ with PredictionIO.
+ It will wait for all pending requests to finish.
+ """
+ self._connection.close()
+
+ def pending_requests(self):
+ """Return the number of pending requests.
+
+ :returns:
+ The number of pending requests of this client.
+ """
+ return self._connection.pending_requests()
+
+ def identify(self, uid):
+ """Identify the uid
+
+ :param uid: user id. type str.
+ """
+ self._uid = uid
+
+ def get_status(self):
+ """Get the status of the PredictionIO API Server
+
+ :returns:
+ status message.
+
+ :raises:
+ ServerStatusError.
+ """
+ path = "/"
+ request = AsyncRequest("GET", path)
+ request.set_rfunc(self._aget_status_resp)
+ self._connection.make_request(request)
+ result = self.aresp(request)
+ return result
+
+ def _aget_status_resp(self, response):
+ """Handle the AsyncResponse of get status request"""
+ if response.error is not None:
+ raise ServerStatusError("Exception happened: %s for request %s" %
+ (response.error, response.request))
+ elif response.status != httplib.OK:
+ raise ServerStatusError("request: %s status: %s body: %s" %
+ (response.request, response.status,
+ response.body))
+
+ # data = json.loads(response.body) # convert json string to dict
+ return response.body
+
+ def acreate_user(self, uid, params={}):
+ """Asynchronously create a user.
+
+ :param uid: user id. type str.
+ :param params: optional attributes. type dictionary.
+ For example,
+ { 'custom': 'value', 'pio_inactive' : True,
+ 'pio_latlng': [4.5,67.8] }
+
+ :returns:
+ AsyncRequest object. You should call the aresp() method using this
+ AsyncRequest object as argument to get the final result or status
+ of this asynchronous request.
"""
- def __init__(self, appkey, threads=1, apiurl="http://localhost:8000",
- apiversion="", qsize=0, timeout=5):
- """Constructor of Client object.
+ if "pio_latlng" in params:
+ params["pio_latlng"] = ",".join(map(str, params["pio_latlng"]))
+ if "pio_inactive" in params:
+ params["pio_inactive"] = str(params["pio_inactive"]).lower()
- """
- self.appkey = appkey
- self.threads = threads
- self.apiurl = apiurl
- self.apiversion = apiversion
- self.qsize = qsize
- self.timeout = timeout
+ path = "%s/users.json" % self.apiversion
+ request = AsyncRequest(
+ "POST", path, pio_appkey=self.appkey, pio_uid=uid, **params)
+ request.set_rfunc(self._acreate_user_resp)
+ self._connection.make_request(request)
- # check connection type
- https_pattern = r'^https://(.*)'
- http_pattern = r'^http://(.*)'
- m = re.match(https_pattern, apiurl)
- self.https = True
- if m is None: # not matching https
- m = re.match(http_pattern, apiurl)
- self.https = False
- if m is None: # not matching http either
- raise InvalidArgumentError("apiurl is not valid: %s" % apiurl)
- self.host = m.group(1)
+ return request
- self._uid = None # identified uid
- self._connection = Connection(host=self.host, threads=self.threads,
- qsize=self.qsize, https=self.https,
- timeout=self.timeout)
+ def _acreate_user_resp(self, response):
+ """Private function to handle the AsyncResponse of the acreate_user
+ request.
- def close(self):
- """Close this client and the connection.
+ :param response: AsyncResponse object.
- Call this method when you want to completely terminate the connection
- with PredictionIO.
- It will wait for all pending requests to finish.
- """
- self._connection.close()
+ :returns:
+ None.
- def pending_requests(self):
- """Return the number of pending requests.
+ :raises:
+ UserNotCreatedError.
- :returns:
- The number of pending requests of this client.
- """
- return self._connection.pending_requests()
+ """
+ if response.error is not None:
+ raise UserNotCreatedError("Exception happened: %s for request %s" %
+ (response.error, response.request))
+ elif response.status != httplib.CREATED:
+ raise UserNotCreatedError("request: %s status: %s body: %s" %
+ (response.request, response.status,
+ response.body))
- def identify(self, uid):
- """Identify the uid
+ return None
- :param uid: user id. type str.
- """
- self._uid = uid
+ def aget_user(self, uid):
+ """Asynchronously get user.
- def get_status(self):
- """Get the status of the PredictionIO API Server
+ :param uid: user id. type str.
- :returns:
- status message.
+ :returns:
+ AsyncRequest object. You should call the aresp() method using this
+ AsyncRequest object as argument to get the final result or status
+ of this asynchronous request.
+ """
- :raises:
- ServerStatusError.
- """
- path = "/"
- request = AsyncRequest("GET", path)
- request.set_rfunc(self._aget_status_resp)
- self._connection.make_request(request)
- result = self.aresp(request)
- return result
+ enc_uid = urllib.quote(uid, "") # replace special char with %xx
+ path = "%s/users/%s.json" % (self.apiversion, enc_uid)
+ request = AsyncRequest("GET", path, pio_appkey=self.appkey)
+ request.set_rfunc(self._aget_user_resp)
+ self._connection.make_request(request)
- def _aget_status_resp(self, response):
- """Handle the AsyncResponse of get status request"""
- if response.error is not None:
- raise ServerStatusError("Exception happened: %s for request %s" %
- (response.error, response.request))
- elif response.status != httplib.OK:
- raise ServerStatusError("request: %s status: %s body: %s" %
- (response.request, response.status,
- response.body))
+ return request
- # data = json.loads(response.body) # convert json string to dict
- return response.body
+ def _aget_user_resp(self, response):
+ """Private function to handle the AsyncResponse of the aget_user
+ request .
- def acreate_user(self, uid, params={}):
- """Asynchronously create a user.
+ :param response: AsyncResponse object.
- :param uid: user id. type str.
- :param params: optional attributes. type dictionary.
- For example,
- { 'custom': 'value', 'pio_inactive' : True,
- 'pio_latlng': [4.5,67.8] }
+ :returns:
+ User data in Dictionary format.
- :returns:
- AsyncRequest object. You should call the aresp() method using this
- AsyncRequest object as argument to get the final result or status
- of this asynchronous request.
+ :rasies:
+ UserNotFoundError.
- """
+ """
+ if response.error is not None:
+ raise UserNotFoundError("Exception happened: %s for request %s" %
+ (response.error, response.request))
+ elif response.status != httplib.OK:
+ raise UserNotFoundError("request: %s status: %s body: %s" %
+ (response.request, response.status,
+ response.body))
- if "pio_latlng" in params:
- params["pio_latlng"] = ",".join(map(str, params["pio_latlng"]))
- if "pio_inactive" in params:
- params["pio_inactive"] = str(params["pio_inactive"]).lower()
+ data = json.loads(response.body) # convert json string to dict
+ return data
- path = "%s/users.json" % self.apiversion
- request = AsyncRequest(
- "POST", path, pio_appkey=self.appkey, pio_uid=uid, **params)
- request.set_rfunc(self._acreate_user_resp)
- self._connection.make_request(request)
+ def adelete_user(self, uid):
+ """Asynchronously delete user.
- return request
+ :param uid: user id. type str.
- def _acreate_user_resp(self, response):
- """Private function to handle the AsyncResponse of the acreate_user
- request.
+ :returns:
+ AsyncRequest object. You should call the aresp() method using this
+ AsyncRequest object as argument to get the final result or status
+ of this asynchronous request.
+ """
- :param response: AsyncResponse object.
+ enc_uid = urllib.quote(uid, "") # replace special char with %xx
+ path = "%s/users/%s.json" % (self.apiversion, enc_uid)
+ request = AsyncRequest("DELETE", path, pio_appkey=self.appkey)
+ request.set_rfunc(self._adelete_user_resp)
+ self._connection.make_request(request)
- :returns:
- None.
+ return request
- :raises:
- UserNotCreatedError.
+ def _adelete_user_resp(self, response):
+ """Private function to handle the AsyncResponse of the adelete_user
+ request.
- """
- if response.error is not None:
- raise UserNotCreatedError("Exception happened: %s for request %s" %
- (response.error, response.request))
- elif response.status != httplib.CREATED:
- raise UserNotCreatedError("request: %s status: %s body: %s" %
- (response.request, response.status,
- response.body))
+ :param response: AsyncResponse object.
- return None
+ :returns:
+ None.
- def aget_user(self, uid):
- """Asynchronously get user.
+ :raises:
+ UserNotDeletedError.
- :param uid: user id. type str.
+ """
+ if response.error is not None:
+ raise UserNotDeletedError("Exception happened: %s for request %s" %
+ (response.error, response.request))
+ elif response.status != httplib.OK:
+ raise UserNotDeletedError("request: %s status: %s body: %s" %
+ (response.request, response.status,
+ response.body))
+ return None
- :returns:
- AsyncRequest object. You should call the aresp() method using this
- AsyncRequest object as argument to get the final result or status
- of this asynchronous request.
- """
+ def acreate_item(self, iid, itypes, params={}):
+ """Asynchronously create item.
- enc_uid = urllib.quote(uid, "") # replace special char with %xx
- path = "%s/users/%s.json" % (self.apiversion, enc_uid)
- request = AsyncRequest("GET", path, pio_appkey=self.appkey)
- request.set_rfunc(self._aget_user_resp)
- self._connection.make_request(request)
+ :param iid: item id. type str.
+ :param itypes: item types. Tuple of Str.
+ For example, if this item belongs to item types "t1", "t2",
+ "t3", "t4",then itypes=("t1", "t2", "t3", "t4").
+ NOTE: if this item belongs to only one itype, use tuple of one
+ element, eg. itypes=("t1",)
+ :param params: optional attributes. type dictionary.
+ For example,
+ { 'custom': 'value', 'pio_inactive' : True,
+ 'pio_latlng': [4.5,67.8] }
+ :returns:
+ AsyncRequest object. You should call the aresp() method using this
+ AsyncRequest object as argument to get the final result or status
+ of this asynchronous request.
+ """
+ itypes_str = ",".join(itypes) # join items with ","
- return request
+ if "pio_latlng" in params:
+ params["pio_latlng"] = ",".join(map(str, params["pio_latlng"]))
+ if "pio_inactive" in params:
+ params["pio_inactive"] = str(params["pio_inactive"]).lower()
- def _aget_user_resp(self, response):
- """Private function to handle the AsyncResponse of the aget_user
- request .
+ path = "%s/items.json" % self.apiversion
+ request = AsyncRequest("POST", path, pio_appkey=self.appkey,
+ pio_iid=iid, pio_itypes=itypes_str, **params)
+ request.set_rfunc(self._acreate_item_resp)
+ self._connection.make_request(request)
+ return request
- :param response: AsyncResponse object.
+ def _acreate_item_resp(self, response):
+ """Private function to handle the AsyncResponse of the acreate_item
+ request
- :returns:
- User data in Dictionary format.
+ :param response: AsyncResponse object.
- :rasies:
- UserNotFoundError.
+ :returns:
+ None
+ :raises:
+ ItemNotCreatedError
- """
- if response.error is not None:
- raise UserNotFoundError("Exception happened: %s for request %s" %
- (response.error, response.request))
- elif response.status != httplib.OK:
- raise UserNotFoundError("request: %s status: %s body: %s" %
- (response.request, response.status,
- response.body))
+ """
+ if response.error is not None:
+ raise ItemNotCreatedError("Exception happened: %s for request %s" %
+ (response.error, response.request))
+ elif response.status != httplib.CREATED:
+ raise ItemNotCreatedError("request: %s status: %s body: %s" %
+ (response.request, response.status,
+ response.body))
+ return None
- data = json.loads(response.body) # convert json string to dict
- return data
+ def aget_item(self, iid):
+ """Asynchronously get item
- def adelete_user(self, uid):
- """Asynchronously delete user.
+ :param iid: item id. type str.
- :param uid: user id. type str.
+ :returns:
+ AsyncRequest object. You should call the aresp() method using this
+ AsyncRequest object as argument to get the final result or status
+ of this asynchronous request.
+ """
+ enc_iid = urllib.quote(iid, "")
+ path = "%s/items/%s.json" % (self.apiversion, enc_iid)
+ request = AsyncRequest("GET", path, pio_appkey=self.appkey)
+ request.set_rfunc(self._aget_item_resp)
+ self._connection.make_request(request)
+ return request
- :returns:
- AsyncRequest object. You should call the aresp() method using this
- AsyncRequest object as argument to get the final result or status
- of this asynchronous request.
- """
+ def _aget_item_resp(self, response):
+ """Private function to handle the AsyncResponse of the aget_item
+ request
- enc_uid = urllib.quote(uid, "") # replace special char with %xx
- path = "%s/users/%s.json" % (self.apiversion, enc_uid)
- request = AsyncRequest("DELETE", path, pio_appkey=self.appkey)
- request.set_rfunc(self._adelete_user_resp)
- self._connection.make_request(request)
+ :param response: AsyncResponse object.
- return request
+ :returns:
+ item data in dictionary format.
- def _adelete_user_resp(self, response):
- """Private function to handle the AsyncResponse of the adelete_user
- request.
+ :raises:
+ ItemNotFoundError.
- :param response: AsyncResponse object.
+ """
+ if response.error is not None:
+ raise ItemNotFoundError("Exception happened: %s for request %s" %
+ (response.error, response.request))
+ elif response.status != httplib.OK:
+ raise ItemNotFoundError("request: %s status: %s body: %s" %
+ (response.request, response.status,
+ response.body))
- :returns:
- None.
+ data = json.loads(response.body) # convert json string to dict
+ if "pio_itypes" in data:
+ # convert from list to tuple
+ data["pio_itypes"] = tuple(data["pio_itypes"])
- :raises:
- UserNotDeletedError.
+ return data
- """
- if response.error is not None:
- raise UserNotDeletedError("Exception happened: %s for request %s" %
- (response.error, response.request))
- elif response.status != httplib.OK:
- raise UserNotDeletedError("request: %s status: %s body: %s" %
- (response.request, response.status,
- response.body))
- return None
+ def adelete_item(self, iid):
+ """Asynchronously delete item
- def acreate_item(self, iid, itypes, params={}):
- """Asynchronously create item.
+ :param iid: item id. type str.
- :param iid: item id. type str.
- :param itypes: item types. Tuple of Str.
- For example, if this item belongs to item types "t1", "t2",
- "t3", "t4",then itypes=("t1", "t2", "t3", "t4").
- NOTE: if this item belongs to only one itype, use tuple of one
- element, eg. itypes=("t1",)
- :param params: optional attributes. type dictionary.
- For example,
- { 'custom': 'value', 'pio_inactive' : True,
- 'pio_latlng': [4.5,67.8] }
- :returns:
- AsyncRequest object. You should call the aresp() method using this
- AsyncRequest object as argument to get the final result or status
- of this asynchronous request.
- """
- itypes_str = ",".join(itypes) # join items with ","
+ :returns:
+ AsyncRequest object. You should call the aresp() method using this
+ AsyncRequest object as argument to get the final result or status
+ of this asynchronous request.
+ """
- if "pio_latlng" in params:
- params["pio_latlng"] = ",".join(map(str, params["pio_latlng"]))
- if "pio_inactive" in params:
- params["pio_inactive"] = str(params["pio_inactive"]).lower()
+ enc_iid = urllib.quote(iid, "")
+ path = "%s/items/%s.json" % (self.apiversion, enc_iid)
+ request = AsyncRequest("DELETE", path, pio_appkey=self.appkey)
+ request.set_rfunc(self._adelete_item_resp)
+ self._connection.make_request(request)
+ return request
- path = "%s/items.json" % self.apiversion
- request = AsyncRequest("POST", path, pio_appkey=self.appkey,
- pio_iid=iid, pio_itypes=itypes_str, **params)
- request.set_rfunc(self._acreate_item_resp)
- self._connection.make_request(request)
- return request
+ def _adelete_item_resp(self, response):
+ """Private function to handle the AsyncResponse of the adelete_item
+ request
- def _acreate_item_resp(self, response):
- """Private function to handle the AsyncResponse of the acreate_item
- request
+ :param response: AsyncResponse object
- :param response: AsyncResponse object.
+ :returns:
+ None
- :returns:
- None
- :raises:
- ItemNotCreatedError
+ :raises:
+ ItemNotDeletedError
+ """
+ if response.error is not None:
+ raise ItemNotDeletedError("Exception happened: %s for request %s" %
+ (response.error, response.request))
+ elif response.status != httplib.OK:
+ raise ItemNotDeletedError("request: %s status: %s body: %s" %
+ (response.request, response.status,
+ response.body))
+ return None
- """
- if response.error is not None:
- raise ItemNotCreatedError("Exception happened: %s for request %s" %
- (response.error, response.request))
- elif response.status != httplib.CREATED:
- raise ItemNotCreatedError("request: %s status: %s body: %s" %
- (response.request, response.status,
- response.body))
- return None
+ def _aget_user_itemrec_topn(self, engine, uid, n, params={}):
+ """Private function to asynchronously get recommendations for user
- def aget_item(self, iid):
- """Asynchronously get item
+ :param engine: name of the prediction engine. type str.
+ :param uid: user id. type str.
+ :param n: number of recommendation. type int.
+ :param params: optional parameters. type dictionary
+ For example, { 'pio_itypes' : ("t1","t2") }
+ :returns:
+ AsyncRequest object. You should call the aresp() method using this
+ AsyncRequest object as argument to get the final result or status
+ of this asynchronous request.
+ """
+ if "pio_itypes" in params:
+ params["pio_itypes"] = ",".join(params["pio_itypes"])
+ if "pio_latlng" in params:
+ params["pio_latlng"] = ",".join(map(str, params["pio_latlng"]))
+ if "pio_attributes" in params:
+ params["pio_attributes"] = ",".join(params["pio_attributes"])
- :param iid: item id. type str.
+ path = "%s/engines/itemrec/%s/topn.json" % (self.apiversion, engine)
+ request = AsyncRequest("GET", path, pio_appkey=self.appkey,
+ pio_uid=uid, pio_n=n, **params)
+ request.set_rfunc(self._aget_user_itemrec_topn_resp)
+ self._connection.make_request(request)
+ return request
- :returns:
- AsyncRequest object. You should call the aresp() method using this
- AsyncRequest object as argument to get the final result or status
- of this asynchronous request.
- """
- enc_iid = urllib.quote(iid, "")
- path = "%s/items/%s.json" % (self.apiversion, enc_iid)
- request = AsyncRequest("GET", path, pio_appkey=self.appkey)
- request.set_rfunc(self._aget_item_resp)
- self._connection.make_request(request)
- return request
+ def _aget_user_itemrec_topn_resp(self, response):
+ """Private function to handle the AsyncResponse of the aget_itemrec
+ request
- def _aget_item_resp(self, response):
- """Private function to handle the AsyncResponse of the aget_item
- request
+ :param response: AsyncResponse object
- :param response: AsyncResponse object.
+ :returns:
+ data in dictionary format.
- :returns:
- item data in dictionary format.
+ :raises:
+ ItemRecNotFoundError.
+ """
+ if response.error is not None:
+ raise ItemRecNotFoundError(
+ "Exception happened: %s for request %s" %
+ (response.error, response.request))
+ elif response.status != httplib.OK:
+ raise ItemRecNotFoundError("request: %s status: %s body: %s" %
+ (response.request, response.status,
+ response.body))
- :raises:
- ItemNotFoundError.
+ data = json.loads(response.body) # convert json string to dict
+ return data
- """
- if response.error is not None:
- raise ItemNotFoundError("Exception happened: %s for request %s" %
- (response.error, response.request))
- elif response.status != httplib.OK:
- raise ItemNotFoundError("request: %s status: %s body: %s" %
- (response.request, response.status,
- response.body))
+ def aget_itemrec_topn(self, engine, n, params={}):
+ """Asynchronously get recommendations for the identified user
- data = json.loads(response.body) # convert json string to dict
- if "pio_itypes" in data:
- # convert from list to tuple
- data["pio_itypes"] = tuple(data["pio_itypes"])
+ :param engine: name of the prediction engine. type str.
+ :param n: number of recommendation. type int.
+ :param params: optional parameters. type dictionary
+ For example, { 'pio_itypes' : ("t1",) }
+ :returns:
+ AsyncRequest object. You should call the aresp() method using this
+ AsyncRequest object as argument to get the final result or status
+ of this asynchronous request.
+ """
- return data
+ if self._uid is None:
+ raise InvalidArgumentError(
+ "uid is not identified. Please call identify(uid) first.")
- def adelete_item(self, iid):
- """Asynchronously delete item
+ request = self._aget_user_itemrec_topn(engine, self._uid, n, params)
+ return request
- :param iid: item id. type str.
+ def aget_itemrec(self, uid, n, engine, **params):
+ """Deprecated. Asynchronously get recommendations
- :returns:
- AsyncRequest object. You should call the aresp() method using this
- AsyncRequest object as argument to get the final result or status
- of this asynchronous request.
- """
+ :param uid: user id. type str.
+ :param n: number of recommendation. type int.
+ :param engine: name of the prediction engine. type str.
+ :param params: keyword arguments for optional attributes.
+ For example, pio_latlng="123.4, 56.7"
+ :returns:
+ AsyncRequest object. You should call the aresp() method using this
+ AsyncRequest object as argument to get the final result or status
+ of this asynchronous request.
+ """
+ request = self._aget_user_itemrec_topn(engine, uid, n, params)
+ return request
- enc_iid = urllib.quote(iid, "")
- path = "%s/items/%s.json" % (self.apiversion, enc_iid)
- request = AsyncRequest("DELETE", path, pio_appkey=self.appkey)
- request.set_rfunc(self._adelete_item_resp)
- self._connection.make_request(request)
- return request
+ def _aget_itemsim_topn(self, engine, iid, n, params={}):
+ """Private function to asynchronously get top n similar items of the
+ item
- def _adelete_item_resp(self, response):
- """Private function to handle the AsyncResponse of the adelete_item
- request
+ :param engine: name of the prediction engine. type str.
+ :param iid: item id. type str.
+ :param n: number of similar items. type int.
+ :param params: optional parameters. type dictionary
+ For example, { 'pio_itypes' : ("t1","t2") }
+ :returns:
+ AsyncRequest object. You should call the aresp() method using this
+ AsyncRequest object as argument to get the final result or status
+ of this asynchronous request.
+ """
+ if "pio_itypes" in params:
+ params["pio_itypes"] = ",".join(params["pio_itypes"])
+ if "pio_latlng" in params:
+ params["pio_latlng"] = ",".join(map(str, params["pio_latlng"]))
+ if "pio_attributes" in params:
+ params["pio_attributes"] = ",".join(params["pio_attributes"])
- :param response: AsyncResponse object
+ path = "%s/engines/itemsim/%s/topn.json" % (self.apiversion, engine)
+ request = AsyncRequest("GET", path, pio_appkey=self.appkey,
+ pio_iid=iid, pio_n=n, **params)
+ request.set_rfunc(self._aget_itemsim_topn_resp)
+ self._connection.make_request(request)
+ return request
- :returns:
- None
+ def _aget_itemsim_topn_resp(self, response):
+ """Private function to handle the AsyncResponse of the aget_itemsim
+ request
- :raises:
- ItemNotDeletedError
- """
- if response.error is not None:
- raise ItemNotDeletedError("Exception happened: %s for request %s" %
- (response.error, response.request))
- elif response.status != httplib.OK:
- raise ItemNotDeletedError("request: %s status: %s body: %s" %
- (response.request, response.status,
- response.body))
- return None
+ :param response: AsyncResponse object
- def _aget_user_itemrec_topn(self, engine, uid, n, params={}):
- """Private function to asynchronously get recommendations for user
+ :returns:
+ data in dictionary format.
- :param engine: name of the prediction engine. type str.
- :param uid: user id. type str.
- :param n: number of recommendation. type int.
- :param params: optional parameters. type dictionary
- For example, { 'pio_itypes' : ("t1","t2") }
- :returns:
- AsyncRequest object. You should call the aresp() method using this
- AsyncRequest object as argument to get the final result or status
- of this asynchronous request.
- """
- if "pio_itypes" in params:
- params["pio_itypes"] = ",".join(params["pio_itypes"])
- if "pio_latlng" in params:
- params["pio_latlng"] = ",".join(map(str, params["pio_latlng"]))
- if "pio_attributes" in params:
- params["pio_attributes"] = ",".join(params["pio_attributes"])
+ :raises:
+ ItemSimNotFoundError.
+ """
+ if response.error is not None:
+ raise ItemSimNotFoundError(
+ "Exception happened: %s for request %s" %
+ (response.error, response.request))
+ elif response.status != httplib.OK:
+ raise ItemSimNotFoundError("request: %s status: %s body: %s" %
+ (response.request, response.status,
+ response.body))
- path = "%s/engines/itemrec/%s/topn.json" % (self.apiversion, engine)
- request = AsyncRequest("GET", path, pio_appkey=self.appkey,
- pio_uid=uid, pio_n=n, **params)
- request.set_rfunc(self._aget_user_itemrec_topn_resp)
- self._connection.make_request(request)
- return request
+ data = json.loads(response.body) # convert json string to dict
+ return data
- def _aget_user_itemrec_topn_resp(self, response):
- """Private function to handle the AsyncResponse of the aget_itemrec
- request
+ def aget_itemsim_topn(self, engine, iid, n, params={}):
+ """Asynchronously get top n similar items of the item
- :param response: AsyncResponse object
+ :param engine: name of the prediction engine. type str.
+ :param iid: item id. type str.
+ :param n: number of similar items. type int.
+ :param params: optional parameters. type dictionary
+ For example, { 'pio_itypes' : ("t1",) }
+ :returns:
+ AsyncRequest object. You should call the aresp() method using this
+ AsyncRequest object as argument to get the final result or status
+ of this asynchronous request.
+ """
- :returns:
- data in dictionary format.
+ request = self._aget_itemsim_topn(engine, iid, n, params)
+ return request
- :raises:
- ItemRecNotFoundError.
- """
- if response.error is not None:
- raise ItemRecNotFoundError(
- "Exception happened: %s for request %s" %
- (response.error, response.request))
- elif response.status != httplib.OK:
- raise ItemRecNotFoundError("request: %s status: %s body: %s" %
- (response.request, response.status,
- response.body))
+ def _aget_user_itemrank_ranked(self, engine, uid, iids, params={}):
+ """Private function to asynchronously get ranked item for user
- data = json.loads(response.body) # convert json string to dict
- return data
+ :param engine: name of the prediction engine. type str.
+ :param uid: user id. type str.
+ :param iids: items to be ranked. type list of item ids.
+ For example, ["i0", "i1", "i2"]
+ :param params: optional parameters. type dictionary
+ For example. { 'pio_attributes' : "name" }
+ :returns:
+ AsyncRequest object. You should call the aresp() method using this
+ AsyncRequest object as argument to get the final result or status
+ of this asynchronous request.
+ """
+ if "pio_attributes" in params:
+ params["pio_attributes"] = ",".join(params["pio_attributes"])
- def aget_itemrec_topn(self, engine, n, params={}):
- """Asynchronously get recommendations for the identified user
+ pio_iids = ",".join(iids)
- :param engine: name of the prediction engine. type str.
- :param n: number of recommendation. type int.
- :param params: optional parameters. type dictionary
- For example, { 'pio_itypes' : ("t1",) }
- :returns:
- AsyncRequest object. You should call the aresp() method using this
- AsyncRequest object as argument to get the final result or status
- of this asynchronous request.
- """
+ path = "%s/engines/itemrank/%s/ranked.json" % \
+ (self.apiversion, engine)
+ request = AsyncRequest("GET", path, pio_appkey=self.appkey,
+ pio_uid=uid, pio_iids=pio_iids, **params)
+ request.set_rfunc(self._aget_user_itemrank_ranked_resp)
+ self._connection.make_request(request)
+ return request
- if self._uid is None:
- raise InvalidArgumentError(
- "uid is not identified. Please call identify(uid) first.")
+ def _aget_user_itemrank_ranked_resp(self, response):
+ """Private function to handle the AsyncResponse of the aget_itemreoder
+ request
- request = self._aget_user_itemrec_topn(engine, self._uid, n, params)
- return request
+ :param response: AsyncResponse object
- def aget_itemrec(self, uid, n, engine, **params):
- """Deprecated. Asynchronously get recommendations
+ :returns:
+ data in dictionary format.
- :param uid: user id. type str.
- :param n: number of recommendation. type int.
- :param engine: name of the prediction engine. type str.
- :param params: keyword arguments for optional attributes.
- For example, pio_latlng="123.4, 56.7"
- :returns:
- AsyncRequest object. You should call the aresp() method using this
- AsyncRequest object as argument to get the final result or status
- of this asynchronous request.
- """
- request = self._aget_user_itemrec_topn(engine, uid, n, params)
- return request
+ :raises:
+ ItemRankNotFoundError.
+ """
+ if response.error is not None:
+ raise ItemRankNotFoundError(
+ "Exception happened: %s for request %s" %
+ (response.error, response.request))
+ elif response.status != httplib.OK:
+ raise ItemRankNotFoundError("request: %s status: %s body: %s" %
+ (response.request, response.status,
+ response.body))
- def _aget_itemsim_topn(self, engine, iid, n, params={}):
- """Private function to asynchronously get top n similar items of the
- item
+ data = json.loads(response.body) # convert json string to dict
+ return data
- :param engine: name of the prediction engine. type str.
- :param iid: item id. type str.
- :param n: number of similar items. type int.
- :param params: optional parameters. type dictionary
- For example, { 'pio_itypes' : ("t1","t2") }
- :returns:
- AsyncRequest object. You should call the aresp() method using this
- AsyncRequest object as argument to get the final result or status
- of this asynchronous request.
- """
- if "pio_itypes" in params:
- params["pio_itypes"] = ",".join(params["pio_itypes"])
- if "pio_latlng" in params:
- params["pio_latlng"] = ",".join(map(str, params["pio_latlng"]))
- if "pio_attributes" in params:
- params["pio_attributes"] = ",".join(params["pio_attributes"])
+ def aget_itemrank_ranked(self, engine, iids, params={}):
+ """Asynchronously get ranked item for user
- path = "%s/engines/itemsim/%s/topn.json" % (self.apiversion, engine)
- request = AsyncRequest("GET", path, pio_appkey=self.appkey,
- pio_iid=iid, pio_n=n, **params)
- request.set_rfunc(self._aget_itemsim_topn_resp)
- self._connection.make_request(request)
- return request
+ :param engine: name of the prediction engine. type str.
+ :param iids: items to be ranked. type list of item ids.
+ For example, ["i0", "i1", "i2"]
+ :param params: optional parameters. type dictionary
+ For example. { 'pio_attributes' : "name" }
+ :returns:
+ AsyncRequest object. You should call the aresp() method using this
+ AsyncRequest object as argument to get the final result or status
+ of this asynchronous request.
+ """
- def _aget_itemsim_topn_resp(self, response):
- """Private function to handle the AsyncResponse of the aget_itemsim
- request
+ if self._uid is None:
+ raise InvalidArgumentError(
+ "uid is not identified. Please call identify(uid) first.")
- :param response: AsyncResponse object
+ request = self._aget_user_itemrank_ranked(engine,
+ self._uid, iids, params)
+ return request
- :returns:
- data in dictionary format.
+ def _auser_action_on_item(self, action, uid, iid, params):
+ """Private function to asynchronously create an user action on an item
- :raises:
- ItemSimNotFoundError.
- """
- if response.error is not None:
- raise ItemSimNotFoundError(
- "Exception happened: %s for request %s" %
- (response.error, response.request))
- elif response.status != httplib.OK:
- raise ItemSimNotFoundError("request: %s status: %s body: %s" %
- (response.request, response.status,
- response.body))
-
- data = json.loads(response.body) # convert json string to dict
- return data
-
- def aget_itemsim_topn(self, engine, iid, n, params={}):
- """Asynchronously get top n similar items of the item
-
- :param engine: name of the prediction engine. type str.
- :param iid: item id. type str.
- :param n: number of similar items. type int.
- :param params: optional parameters. type dictionary
- For example, { 'pio_itypes' : ("t1",) }
- :returns:
- AsyncRequest object. You should call the aresp() method using this
- AsyncRequest object as argument to get the final result or status
- of this asynchronous request.
- """
+ :param action: action type. type str. ("like", "dislike", "conversion",
+ "rate", "view")
+ :param uid: user id. type str or int.
+ :param iid: item id. type str or int.
+ :param params: optional attributes. type dictionary.
+ For example, { 'pio_rate' : 4, 'pio_latlng' : [1.23,4.56] }
+ NOTE: For "rate" action, pio_rate attribute is required.
+ integer value of 1-5 (1 is least preferred and 5 is most
+ preferred)
- request = self._aget_itemsim_topn(engine, iid, n, params)
- return request
+ :returns:
+ AsyncRequest object. You should call the aresp() method using this
+ AsyncRequest object as argument to get the final result or status
+ of this asynchronous request.
+ """
+ if "pio_latlng" in params:
+ params["pio_latlng"] = ",".join(map(str, params["pio_latlng"]))
- def _auser_action_on_item(self, action, uid, iid, params):
- """Private function to asynchronously create an user action on an item
+ path = "%s/actions/u2i.json" % (self.apiversion)
+ request = AsyncRequest("POST", path, pio_appkey=self.appkey,
+ pio_action=action, pio_uid=uid, pio_iid=iid,
+ **params)
+ request.set_rfunc(self._auser_action_on_item_resp)
+ self._connection.make_request(request)
+ return request
- :param action: action type. type str. ("like", "dislike", "conversion",
- "rate", "view")
- :param uid: user id. type str or int.
- :param iid: item id. type str or int.
- :param params: optional attributes. type dictionary.
- For example, { 'pio_rate' : 4, 'pio_latlng' : [1.23,4.56] }
- NOTE: For "rate" action, pio_rate attribute is required.
- integer value of 1-5 (1 is least preferred and 5 is most
- preferred)
+ def _auser_action_on_item_resp(self, response):
+ """Private function to handle the AsyncResponse of the
+ _auser_action_on_item request
- :returns:
- AsyncRequest object. You should call the aresp() method using this
- AsyncRequest object as argument to get the final result or status
- of this asynchronous request.
- """
- if "pio_latlng" in params:
- params["pio_latlng"] = ",".join(map(str, params["pio_latlng"]))
+ :param response: AsyncResponse object
- path = "%s/actions/u2i.json" % (self.apiversion)
- request = AsyncRequest("POST", path, pio_appkey=self.appkey,
- pio_action=action, pio_uid=uid, pio_iid=iid,
- **params)
- request.set_rfunc(self._auser_action_on_item_resp)
- self._connection.make_request(request)
- return request
+ :returns:
+ None
- def _auser_action_on_item_resp(self, response):
- """Private function to handle the AsyncResponse of the
- _auser_action_on_item request
+ :raises:
+ U2IActionNotCreatedError
+ """
+ if response.error is not None:
+ raise U2IActionNotCreatedError(
+ "Exception happened: %s for request %s" %
+ (response.error, response.request))
+ elif response.status != httplib.CREATED:
+ raise U2IActionNotCreatedError("request: %s status: %s body: %s" %
+ (response.request, response.status,
+ response.body))
+ return None
- :param response: AsyncResponse object
+ def arecord_action_on_item(self, action, iid, params={}):
+ """Asynchronously create action on item
- :returns:
- None
+ :param action: action name. type String. For example, "rate", "like",
+ etc
+ :param iid: item id. type str or int.
+ :param params: optional attributes. type dictionary.
+ For example, { 'pio_rate' : 4, 'pio_latlng': [4.5,67.8] }
+ NOTE: For "rate" action, pio_rate attribute is required.
+ integer value of 1-5 (1 is least preferred and 5 is most
+ preferred)
- :raises:
- U2IActionNotCreatedError
- """
- if response.error is not None:
- raise U2IActionNotCreatedError(
- "Exception happened: %s for request %s" %
- (response.error, response.request))
- elif response.status != httplib.CREATED:
- raise U2IActionNotCreatedError("request: %s status: %s body: %s" %
- (response.request, response.status,
- response.body))
- return None
+ :returns:
+ AsyncRequest object. You should call the aresp() method using this
+ AsyncRequest object as argument to get the final result or status
+ of this asynchronous request.
- def arecord_action_on_item(self, action, iid, params={}):
- """Asynchronously create action on item
+ :raises:
+ U2IActionNotCreatedError
+ """
- :param action: action name. type String. For example, "rate", "like",
- etc
- :param iid: item id. type str or int.
- :param params: optional attributes. type dictionary.
- For example, { 'pio_rate' : 4, 'pio_latlng': [4.5,67.8] }
- NOTE: For "rate" action, pio_rate attribute is required.
- integer value of 1-5 (1 is least preferred and 5 is most
- preferred)
+ if self._uid is None:
+ raise InvalidArgumentError(
+ "uid is not identified. Please call identify(uid) first.")
- :returns:
- AsyncRequest object. You should call the aresp() method using this
- AsyncRequest object as argument to get the final result or status
- of this asynchronous request.
+ request = self._auser_action_on_item(action, self._uid, iid, params)
+ return request
- :raises:
- U2IActionNotCreatedError
- """
+ def auser_conversion_item(self, uid, iid, **params):
+ """Deprecated. Asynchronously create an user conversion action on an
+ item
- if self._uid is None:
- raise InvalidArgumentError(
- "uid is not identified. Please call identify(uid) first.")
+ :param uid: user id. type str.
+ :param iid: item id. type str.
+ :param params: keyword arguments for optional attributes.
+ For example, pio_latlng=[123.4, 56.7]
+ :returns:
+ AsyncRequest object. You should call the aresp() method using this
+ AsyncRequest object as argument to get the final result or status
+ of this asynchronous request.
+ """
+ request = self._auser_action_on_item(CONVERSION_API, uid, iid, params)
+ return request
- request = self._auser_action_on_item(action, self._uid, iid, params)
- return request
+ def auser_dislike_item(self, uid, iid, **params):
+ """Deprecated. Asynchronously create an user dislike action on an item
- def auser_conversion_item(self, uid, iid, **params):
- """Deprecated. Asynchronously create an user conversion action on an
- item
+ :param uid: user id. type str.
+ :param iid: item id. type str.
+ :param params: keyword arguments for optional attributes.
+ For example, pio_latlng=[123.4, 56.7]
+ :returns:
+ AsyncRequest object. You should call the aresp() method using this
+ AsyncRequest object as argument to get the final result or status
+ of this asynchronous request.
+ """
+ request = self._auser_action_on_item(DISLIKE_API, uid, iid, params)
+ return request
- :param uid: user id. type str.
- :param iid: item id. type str.
- :param params: keyword arguments for optional attributes.
- For example, pio_latlng=[123.4, 56.7]
- :returns:
- AsyncRequest object. You should call the aresp() method using this
- AsyncRequest object as argument to get the final result or status
- of this asynchronous request.
- """
- request = self._auser_action_on_item(CONVERSION_API, uid, iid, params)
- return request
+ def auser_like_item(self, uid, iid, **params):
+ """Deprecated. Asynchronously create an user like action on an item
- def auser_dislike_item(self, uid, iid, **params):
- """Deprecated. Asynchronously create an user dislike action on an item
+ :param uid: user id. type str.
+ :param iid: item id. type str.
+ :param params: keyword arguments for optional attributes.
+ For example, pio_latlng=[123.4, 56.7]
+ :returns:
+ AsyncRequest object. You should call the aresp() method using this
+ AsyncRequest object as argument to get the final result or status
+ of this asynchronous request.
+ """
+ request = self._auser_action_on_item(LIKE_API, uid, iid, params)
+ return request
- :param uid: user id. type str.
- :param iid: item id. type str.
- :param params: keyword arguments for optional attributes.
- For example, pio_latlng=[123.4, 56.7]
- :returns:
- AsyncRequest object. You should call the aresp() method using this
- AsyncRequest object as argument to get the final result or status
- of this asynchronous request.
- """
- request = self._auser_action_on_item(DISLIKE_API, uid, iid, params)
- return request
+ def auser_rate_item(self, uid, iid, rate, **params):
+ """Deprecated. Asynchronously create an user rate action on an item
- def auser_like_item(self, uid, iid, **params):
- """Deprecated. Asynchronously create an user like action on an item
+ :param uid: user id. type str.
+ :param iid: item id. type str.
+ :param rate: rating. integer value of 1-5 (1 is least preferred and 5
+ is most preferred)
+ :param params: keyword arguments for optional attributes.
+ For example, pio_latlng=[123.4, 56.7]
+ :returns:
+ AsyncRequest object. You should call the aresp() method using this
+ AsyncRequest object as argument to get the final result or status
+ of this asynchronous request.
+ """
- :param uid: user id. type str.
- :param iid: item id. type str.
- :param params: keyword arguments for optional attributes.
- For example, pio_latlng=[123.4, 56.7]
- :returns:
- AsyncRequest object. You should call the aresp() method using this
- AsyncRequest object as argument to get the final result or status
- of this asynchronous request.
- """
- request = self._auser_action_on_item(LIKE_API, uid, iid, params)
- return request
+ params['pio_rate'] = rate
+ request = self._auser_action_on_item(RATE_API, uid, iid, params)
+ return request
- def auser_rate_item(self, uid, iid, rate, **params):
- """Deprecated. Asynchronously create an user rate action on an item
+ def auser_view_item(self, uid, iid, **params):
+ """Deprecated. Asynchronously create an user view action on an item
- :param uid: user id. type str.
- :param iid: item id. type str.
- :param rate: rating. integer value of 1-5 (1 is least preferred and 5
- is most preferred)
- :param params: keyword arguments for optional attributes.
- For example, pio_latlng=[123.4, 56.7]
- :returns:
- AsyncRequest object. You should call the aresp() method using this
- AsyncRequest object as argument to get the final result or status
- of this asynchronous request.
- """
+ :param uid: user id. type str.
+ :param iid: item id. type str.
+ :param params: keyword arguments for optional attributes.
+ For example, pio_latlng=[123.4, 56.7]
+ :returns:
+ AsyncRequest object. You should call the aresp() method using this
+ AsyncRequest object as argument to get the final result or status
+ of this asynchronous request.
+ """
+ request = self._auser_action_on_item(VIEW_API, uid, iid, params)
+ return request
- params['pio_rate'] = rate
- request = self._auser_action_on_item(RATE_API, uid, iid, params)
- return request
+ def aresp(self, request):
+ """Get the result of the asynchronous request
- def auser_view_item(self, uid, iid, **params):
- """Deprecated. Asynchronously create an user view action on an item
+ :param request: AsyncRequest object. This object must be returned by
+ the asynchronous request function
+ For example, to get the result of a aget_user()
+ request, call this aresp() with the argument of
+ AsyncRequest object returned by aget_user().
- :param uid: user id. type str.
- :param iid: item id. type str.
- :param params: keyword arguments for optional attributes.
- For example, pio_latlng=[123.4, 56.7]
- :returns:
- AsyncRequest object. You should call the aresp() method using this
- AsyncRequest object as argument to get the final result or status
- of this asynchronous request.
- """
- request = self._auser_action_on_item(VIEW_API, uid, iid, params)
- return request
+ :returns:
+ The result of this AsyncRequest. The return type is the same as
+ the return type of corresponding blocking request.
- def aresp(self, request):
- """Get the result of the asynchronous request
+ For example,
- :param request: AsyncRequest object. This object must be returned by
- the asynchronous request function
- For example, to get the result of a aget_user()
- request, call this aresp() with the argument of
- AsyncRequest object returned by aget_user().
+ Calling aresp() with acreate_user() AsyncRequest returns the same
+ type as create_user(), which is None.
- :returns:
- The result of this AsyncRequest. The return type is the same as
- the return type of corresponding blocking request.
+ Calling aresp() with aget_user() AsyncRequest returns the same
+ type as get_user(), which is dictionary data.
- For example,
+ :raises:
+ Exception may be raised if there is error happened. The type of
+ exception is the same as exception type of the correspdoning
+ blocking request.
- Calling aresp() with acreate_user() AsyncRequest returns the same
- type as create_user(), which is None.
+ For example,
- Calling aresp() with aget_user() AsyncRequest returns the same
- type as get_user(), which is dictionary data.
+ Calling aresp() with acreate_user() AsyncRequest may raise
+ UserNotCreatedError exception.
- :raises:
- Exception may be raised if there is error happened. The type of
- exception is the same as exception type of the correspdoning
- blocking request.
+ Calling aresp() with aget_user() AsyncRequest may raise
+ UserNotFoundError exception.
- For example,
+ """
+ response = request.get_response()
+ result = request.rfunc(response)
+ return result
- Calling aresp() with acreate_user() AsyncRequest may raise
- UserNotCreatedError exception.
+ def create_user(self, uid, params={}):
+ """Blocking request to create user
- Calling aresp() with aget_user() AsyncRequest may raise
- UserNotFoundError exception.
+ :param uid: user id. type str.
+ :param params: optional attributes. type dictionary.
+ For example, { 'custom': 'value', 'pio_inactive' : True,
+ 'pio_latlng': [4.5,67.8] }
- """
- response = request.get_response()
- result = request.rfunc(response)
- return result
+ :returns:
+ None.
- def create_user(self, uid, params={}):
- """Blocking request to create user
+ :raises:
+ UserNotCreatedError.
- :param uid: user id. type str.
- :param params: optional attributes. type dictionary.
- For example, { 'custom': 'value', 'pio_inactive' : True,
- 'pio_latlng': [4.5,67.8] }
+ """
+ request = self.acreate_user(uid, params)
+ result = self.aresp(request)
+ return result
- :returns:
- None.
+ def get_user(self, uid):
+ """Blocking request to get user
- :raises:
- UserNotCreatedError.
+ :param uid: user id. type str or int.
- """
- request = self.acreate_user(uid, params)
- result = self.aresp(request)
- return result
+ :returns:
+ User data in Dictionary format.
- def get_user(self, uid):
- """Blocking request to get user
+ :rasies:
+ UserNotFoundError.
- :param uid: user id. type str or int.
+ """
+ request = self.aget_user(uid)
+ result = self.aresp(request)
+ return result
- :returns:
- User data in Dictionary format.
+ def delete_user(self, uid):
+ """Blocking request to delete the user
- :rasies:
- UserNotFoundError.
+ :param uid: user id. type str.
- """
- request = self.aget_user(uid)
- result = self.aresp(request)
- return result
+ :returns:
+ None.
- def delete_user(self, uid):
- """Blocking request to delete the user
+ :raises:
+ UserNotDeletedError.
- :param uid: user id. type str.
+ """
+ request = self.adelete_user(uid)
+ result = self.aresp(request)
+ return result
- :returns:
- None.
+ def create_item(self, iid, itypes, params={}):
+ """Blocking request to create item
- :raises:
- UserNotDeletedError.
+ :param iid: item id. type str.
+ :param itypes: item types. Tuple of Str.
+ For example, if this item belongs to item types "t1", "t2",
+ "t3", "t4", then itypes=("t1", "t2", "t3", "t4").
+ NOTE: if this item belongs to only one itype, use tuple of one
+ element, eg. itypes=("t1",)
+ :param params: optional attributes. type dictionary.
+ For example,
+ { 'custom': 'value', 'pio_inactive' : True,
+ 'pio_latlng': [4.5,67.8] }
- """
- request = self.adelete_user(uid)
- result = self.aresp(request)
- return result
+ :returns:
+ None
- def create_item(self, iid, itypes, params={}):
- """Blocking request to create item
+ :raises:
+ ItemNotCreatedError
- :param iid: item id. type str.
- :param itypes: item types. Tuple of Str.
- For example, if this item belongs to item types "t1", "t2",
- "t3", "t4", then itypes=("t1", "t2", "t3", "t4").
- NOTE: if this item belongs to only one itype, use tuple of one
- element, eg. itypes=("t1",)
- :param params: optional attributes. type dictionary.
- For example,
- { 'custom': 'value', 'pio_inactive' : True,
- 'pio_latlng': [4.5,67.8] }
+ """
+ request = self.acreate_item(iid, itypes, params)
+ result = self.aresp(request)
+ return result
- :returns:
- None
+ def get_item(self, iid):
+ """Blocking request to get item
- :raises:
- ItemNotCreatedError
+ :param iid: item id. type str.
- """
- request = self.acreate_item(iid, itypes, params)
- result = self.aresp(request)
- return result
+ :returns:
+ item data in dictionary format.
- def get_item(self, iid):
- """Blocking request to get item
+ :raises:
+ ItemNotFoundError.
- :param iid: item id. type str.
+ """
+ request = self.aget_item(iid)
+ result = self.aresp(request)
+ return result
- :returns:
- item data in dictionary format.
+ def delete_item(self, iid):
+ """Blocking request to delete item
- :raises:
- ItemNotFoundError.
+ :param iid: item id. type str.
- """
- request = self.aget_item(iid)
- result = self.aresp(request)
- return result
+ :returns:
+ None
- def delete_item(self, iid):
- """Blocking request to delete item
+ :raises:
+ ItemNotDeletedError
- :param iid: item id. type str.
+ """
+ request = self.adelete_item(iid)
+ result = self.aresp(request)
+ return result
- :returns:
- None
+ def get_itemrec_topn(self, engine, n, params={}):
+ """Blocking request to get recommendations for the identified user
- :raises:
- ItemNotDeletedError
+ :param engine: name of the prediction engine. type str.
+ :param n: number of recommendation. type int.
+ :param params: optional parameters. type dictionary
+ For example, { 'pio_itypes' : ("t1", "t2") }
+ :returns:
+ data in dictionary format.
- """
- request = self.adelete_item(iid)
- result = self.aresp(request)
- return result
+ :raises:
+ ItemRecNotFoundError.
+ """
+ request = self.aget_itemrec_topn(engine, n, params)
+ result = self.aresp(request)
+ return result
- def get_itemrec_topn(self, engine, n, params={}):
- """Blocking request to get recommendations for the identified user
+ def get_itemrec(self, uid, n, engine, **params):
+ """Deprecated. Blocking request to get recommendations
- :param engine: name of the prediction engine. type str.
- :param n: number of recommendation. type int.
- :param params: optional parameters. type dictionary
- For example, { 'pio_itypes' : ("t1", "t2") }
- :returns:
- data in dictionary format.
+ :param uid: user id. type str or int.
+ :param n: number of recommendation. type int.
+ :param engine: name of the prediction engine. type str.
+ :param params: keyword arguments for optional attributes.
+ For example, pio_latlng=[123.4, 56.7]
- :raises:
- ItemRecNotFoundError.
- """
- request = self.aget_itemrec_topn(engine, n, params)
- result = self.aresp(request)
- return result
+ :returns:
+ data in dictionary format.
- def get_itemrec(self, uid, n, engine, **params):
- """Deprecated. Blocking request to get recommendations
+ :raises:
+ ItemRecNotFoundError.
- :param uid: user id. type str or int.
- :param n: number of recommendation. type int.
- :param engine: name of the prediction engine. type str.
- :param params: keyword arguments for optional attributes.
- For example, pio_latlng=[123.4, 56.7]
+ """
+ request = self.aget_itemrec(uid, n, engine, **params)
+ result = self.aresp(request)
+ return result
- :returns:
- data in dictionary format.
+ def get_itemsim_topn(self, engine, iid, n, params={}):
+ """Blocking request to get top n similar items of the item
- :raises:
- ItemRecNotFoundError.
+ :param engine: name of the prediction engine. type str.
+ :param iid: item id. type str.
+ :param n: number of similar items. type int.
+ :param params: optional parameters. type dictionary
+ For example, { 'pio_itypes' : ("t1",) }
+ :returns:
+ data in dictionary format.
- """
- request = self.aget_itemrec(uid, n, engine, **params)
- result = self.aresp(request)
- return result
+ :raises:
+ ItemSimNotFoundError.
+ """
- def get_itemsim_topn(self, engine, iid, n, params={}):
- """Blocking request to get top n similar items of the item
+ request = self.aget_itemsim_topn(engine, iid, n, params)
+ result = self.aresp(request)
+ return result
- :param engine: name of the prediction engine. type str.
- :param iid: item id. type str.
- :param n: number of similar items. type int.
- :param params: optional parameters. type dictionary
- For example, { 'pio_itypes' : ("t1",) }
- :returns:
- data in dictionary format.
+ def get_itemrank_ranked(self, engine, iids, params={}):
+ """Blocking request to get ranked item for user
- :raises:
- ItemSimNotFoundError.
- """
+ :param engine: name of the prediction engine. type str.
+ :param iids: items to be ranked. type list of item ids.
+ For example, ["i0", "i1", "i2"]
+ :param params: optional parameters. type dictionary
+ For example. { 'pio_attributes' : "name" }
+ :returns:
+ data in dictionary format.
- request = self.aget_itemsim_topn(engine, iid, n, params)
- result = self.aresp(request)
- return result
+ :raises:
+ ItemRankNotFoundError.
+ """
+ request = self.aget_itemrank_ranked(engine, iids, params)
+ result = self.aresp(request)
+ return result
- def record_action_on_item(self, action, iid, params={}):
- """Blocking request to create action on an item
+ def record_action_on_item(self, action, iid, params={}):
+ """Blocking request to create action on an item
- :param action: action name. type String. For example, "rate", "like",
- etc
- :param iid: item id. type str.
- :param params: optional attributes. type dictionary.
- For example, { 'pio_rate' : 4, 'pio_latlng' : [1.23,4.56] }
- NOTE: For "rate" action, pio_rate attribute is required.
- integer value of 1-5 (1 is least preferred and 5 is most
- preferred)
+ :param action: action name. type String. For example, "rate", "like",
+ etc
+ :param iid: item id. type str.
+ :param params: optional attributes. type dictionary.
+ For example, { 'pio_rate' : 4, 'pio_latlng' : [1.23,4.56] }
+ NOTE: For "rate" action, pio_rate attribute is required.
+ integer value of 1-5 (1 is least preferred and 5 is most
+ preferred)
- :returns:
- None
+ :returns:
+ None
- :raises:
- U2IActionNotCreatedError
- """
- request = self.arecord_action_on_item(action, iid, params)
- result = self.aresp(request)
- return result
+ :raises:
+ U2IActionNotCreatedError
+ """
+ request = self.arecord_action_on_item(action, iid, params)
+ result = self.aresp(request)
+ return result
- def user_conversion_item(self, uid, iid, **params):
- """Deprecated. Blocking request to create user conversion action on an
- item
+ def user_conversion_item(self, uid, iid, **params):
+ """Deprecated. Blocking request to create user conversion action on an
+ item
- :param uid: user id. type str.
- :param iid: item id. type str.
- :param params: keyword arguments for optional attributes.
- For example, pio_latlng=[123.4, 56.7]
+ :param uid: user id. type str.
+ :param iid: item id. type str.
+ :param params: keyword arguments for optional attributes.
+ For example, pio_latlng=[123.4, 56.7]
- :returns:
- None
+ :returns:
+ None
- :raises:
- U2IActionNotCreatedError
- """
- request = self.auser_conversion_item(uid, iid, **params)
- result = self.aresp(request)
- return result
+ :raises:
+ U2IActionNotCreatedError
+ """
+ request = self.auser_conversion_item(uid, iid, **params)
+ result = self.aresp(request)
+ return result
- def user_dislike_item(self, uid, iid, **params):
- """Deprecated. Blocking request to create user dislike action on an
- item
+ def user_dislike_item(self, uid, iid, **params):
+ """Deprecated. Blocking request to create user dislike action on an
+ item
- :param uid: user id. type str.
- :param iid: item id. type str.
- :param params: keyword arguments for optional attributes.
- For example, pio_latlng=[123.4, 56.7]
+ :param uid: user id. type str.
+ :param iid: item id. type str.
+ :param params: keyword arguments for optional attributes.
+ For example, pio_latlng=[123.4, 56.7]
- :returns:
- None
+ :returns:
+ None
- :raises:
- U2IActionNotCreatedError
- """
- request = self.auser_dislike_item(uid, iid, **params)
- result = self.aresp(request)
- return result
+ :raises:
+ U2IActionNotCreatedError
+ """
+ request = self.auser_dislike_item(uid, iid, **params)
+ result = self.aresp(request)
+ return result
- def user_like_item(self, uid, iid, **params):
- """Deprecated. Blocking request to create user like action on an item
+ def user_like_item(self, uid, iid, **params):
+ """Deprecated. Blocking request to create user like action on an item
- :param uid: user id. type str.
- :param iid: item id. type str.
- :param params: keyword arguments for optional attributes.
- For example, pio_latlng=[123.4, 56.7]
+ :param uid: user id. type str.
+ :param iid: item id. type str.
+ :param params: keyword arguments for optional attributes.
+ For example, pio_latlng=[123.4, 56.7]
- :returns:
- None
+ :returns:
+ None
- :raises:
- U2IActionNotCreatedError
- """
- request = self.auser_like_item(uid, iid, **params)
- result = self.aresp(request)
- return result
+ :raises:
+ U2IActionNotCreatedError
+ """
+ request = self.auser_like_item(uid, iid, **params)
+ result = self.aresp(request)
+ return result
- def user_rate_item(self, uid, iid, rate, **params):
- """Deprecated. Blocking request to create user rate action on an item
+ def user_rate_item(self, uid, iid, rate, **params):
+ """Deprecated. Blocking request to create user rate action on an item
- :param uid: user id. type str.
- :param iid: item id. type str.
- :param rate: rating. integer value of 1-5 (1 is least preferred and 5
- is most preferred)
- :param params: keyword arguments for optional attributes.
- For example, pio_latlng=[123.4, 56.7]
+ :param uid: user id. type str.
+ :param iid: item id. type str.
+ :param rate: rating. integer value of 1-5 (1 is least preferred and 5
+ is most preferred)
+ :param params: keyword arguments for optional attributes.
+ For example, pio_latlng=[123.4, 56.7]
- :returns:
- None
+ :returns:
+ None
- :raises:
- U2IActionNotCreatedError
- """
- request = self.auser_rate_item(uid, iid, rate, **params)
- result = self.aresp(request)
- return result
+ :raises:
+ U2IActionNotCreatedError
+ """
+ request = self.auser_rate_item(uid, iid, rate, **params)
+ result = self.aresp(request)
+ return result
- def user_view_item(self, uid, iid, **params):
- """Deprecated. Blocking request to create user view action on an item
+ def user_view_item(self, uid, iid, **params):
+ """Deprecated. Blocking request to create user view action on an item
- :param uid: user id. type str.
- :param iid: item id. type str.
- :param params: keyword arguments for optional attributes.
- For example, pio_latlng=[123.4, 56.7]
+ :param uid: user id. type str.
+ :param iid: item id. type str.
+ :param params: keyword arguments for optional attributes.
+ For example, pio_latlng=[123.4, 56.7]
- :returns:
- None
+ :returns:
+ None
- :raises:
- U2IActionNotCreatedError
- """
- request = self.auser_view_item(uid, iid, **params)
- result = self.aresp(request)
- return result
+ :raises:
+ U2IActionNotCreatedError
+ """
+ request = self.auser_view_item(uid, iid, **params)
+ result = self.aresp(request)
+ return result
diff --git a/predictionio/connection.py b/predictionio/connection.py
index c2ee60b..e124c5b 100644
--- a/predictionio/connection.py
+++ b/predictionio/connection.py
@@ -1,29 +1,32 @@
try:
- import Queue
+ import Queue
except ImportError:
- import queue as Queue
+ # pylint: disable=F0401
+ # http is a Python3 module, replacing httplib. Ditto.
+ import queue as Queue
import threading
try:
- import httplib
+ import httplib
except ImportError:
- from http import client as httplib
+ # pylint: disable=F0401
+ from http import client as httplib
try:
- from urllib import urlencode
+ from urllib import urlencode
except ImportError:
- from urllib.parse import urlencode
+ # pylint: disable=F0401,E0611
+ from urllib.parse import urlencode
-import urllib
import datetime
import logging
# use generators for python2 and python3
try:
- xrange
+ xrange
except NameError:
- xrange = range
+ xrange = range
# some constants
MAX_RETRY = 1 # 0 means no retry
@@ -35,325 +38,325 @@
def enable_log(filename=None):
- global logger
- global DEBUG_LOG
- timestamp = datetime.datetime.today()
- if not filename:
- logfile = "./log/predictionio_%s.log" % timestamp.strftime(
- "%Y-%m-%d_%H:%M:%S.%f")
- else:
- logfile = filename
- logging.basicConfig(filename=logfile,
- filemode='w',
- level=logging.DEBUG,
- format='[%(levelname)s] %(name)s (%(threadName)s) %(message)s')
- logger = logging.getLogger(__name__)
- DEBUG_LOG = True
+ global logger
+ global DEBUG_LOG
+ timestamp = datetime.datetime.today()
+ if not filename:
+ logfile = "./log/predictionio_%s.log" % timestamp.strftime(
+ "%Y-%m-%d_%H:%M:%S.%f")
+ else:
+ logfile = filename
+ logging.basicConfig(filename=logfile,
+ filemode='w',
+ level=logging.DEBUG,
+ format='[%(levelname)s] %(name)s (%(threadName)s) %(message)s')
+ logger = logging.getLogger(__name__)
+ DEBUG_LOG = True
class PredictionIOAPIError(Exception):
- pass
+ pass
class NotSupportMethodError(PredictionIOAPIError):
- pass
+ pass
class ProgramError(PredictionIOAPIError):
- pass
+ pass
class AsyncRequest(object):
- """AsyncRequest object
+ """AsyncRequest object
+
+ """
+
+ def __init__(self, method, path, **params):
+ self.method = method # "GET" "POST" etc
+ # the sub path eg. POST /v1/users.json GET /v1/users/1.json
+ self.path = path
+ # dictionary format eg. {"appkey" : 123, "id" : 3}
+ self.params = params
+ # use queue to implement response, store AsyncResponse object
+ self.response_q = Queue.Queue(1)
+ self.qpath = "%s?%s" % (self.path, urlencode(self.params))
+ self._response = None
+ # response function to be called to handle the response
+ self.rfunc = None
+
+ def __str__(self):
+ return "%s %s %s %s" % (self.method, self.path, self.params,
+ self.qpath)
+
+ def set_rfunc(self, func):
+ self.rfunc = func
+
+ def set_response(self, response):
+ """ store the response
+
+ NOTE: Must be only called once
+ """
+ self.response_q.put(response)
+
+ def get_response(self):
+ """get the response
"""
+ if self._response is None:
+ self._response = self.response_q.get(True) # NOTE: blocking
- def __init__(self, method, path, **params):
- self.method = method # "GET" "POST" etc
- # the sub path eg. POST /v1/users.json GET /v1/users/1.json
- self.path = path
- # dictionary format eg. {"appkey" : 123, "id" : 3}
- self.params = params
- # use queue to implement response, store AsyncResponse object
- self.response_q = Queue.Queue(1)
- self.qpath = "%s?%s" % (self.path, urlencode(self.params))
- self._response = None
- # response function to be called to handle the response
- self.rfunc = None
-
- def __str__(self):
- return "%s %s %s %s" % (self.method, self.path, self.params,
- self.qpath)
-
- def set_rfunc(self, func):
- self.rfunc = func
-
- def set_response(self, response):
- """ store the response
-
- NOTE: Must be only called once
- """
- self.response_q.put(response)
-
- def get_response(self):
- """get the response
-
- """
- if self._response is None:
- self._response = self.response_q.get(True) # NOTE: blocking
-
- return self._response
+ return self._response
class AsyncResponse(object):
- """AsyncResponse object.
+ """AsyncResponse object.
- Store the response of asynchronous request
+ Store the response of asynchronous request
- when get the response, user should check if error is None (which means no
- Exception happens)
- if error is None, then should check if the status is expected
+ when get the response, user should check if error is None (which means no
+ Exception happens)
+ if error is None, then should check if the status is expected
- Attributes:
- error: exception object if any happens
- version: int
- status: int
- reason: str
- headers: dict
- body: str (NOTE: not necessarily can be converted to JSON,
- eg, for GET request to /v1/status)
- request: the corresponding AsyncRequest object
- """
+ Attributes:
+ error: exception object if any happens
+ version: int
+ status: int
+ reason: str
+ headers: dict
+ body: str (NOTE: not necessarily can be converted to JSON,
+ eg, for GET request to /v1/status)
+ request: the corresponding AsyncRequest object
+ """
- def __init__(self):
- self.error = None
- self.version = None
- self.status = None
- self.reason = None
- self.headers = None
- self.body = None
- self.request = None # point back to the request object
+ def __init__(self):
+ self.error = None
+ self.version = None
+ self.status = None
+ self.reason = None
+ self.headers = None
+ self.body = None
+ self.request = None # point back to the request object
- def __str__(self):
- return "e:%s v:%s s:%s r:%s h:%s b:%s" % (self.error, self.version,
- self.status, self.reason,
- self.headers, self.body)
+ def __str__(self):
+ return "e:%s v:%s s:%s r:%s h:%s b:%s" % (self.error, self.version,
+ self.status, self.reason,
+ self.headers, self.body)
- def set_resp(self, version, status, reason, headers, body):
- self.version = version
- self.status = status
- self.reason = reason
- self.headers = headers
- self.body = body
+ def set_resp(self, version, status, reason, headers, body):
+ self.version = version
+ self.status = status
+ self.reason = reason
+ self.headers = headers
+ self.body = body
- def set_error(self, error):
- self.error = error
+ def set_error(self, error):
+ self.error = error
- def set_request(self, request):
- self.request = request
+ def set_request(self, request):
+ self.request = request
class PredictionIOHttpConnection(object):
- def __init__(self, host, https=True, timeout=5):
- if https: # https connection
- self._connection = httplib.HTTPSConnection(host, timeout=timeout)
- else:
- self._connection = httplib.HTTPConnection(host, timeout=timeout)
+ def __init__(self, host, https=True, timeout=5):
+ if https: # https connection
+ self._connection = httplib.HTTPSConnection(host, timeout=timeout)
+ else:
+ self._connection = httplib.HTTPConnection(host, timeout=timeout)
- def connect(self):
- self._connection.connect()
+ def connect(self):
+ self._connection.connect()
- def close(self):
+ def close(self):
+ self._connection.close()
+
+ def request(self, method, url, body={}, headers={}):
+ """
+ http request wrapper function, with retry capability in case of error.
+ catch error exception and store it in AsyncResponse object
+ return AsyncResponse object
+
+ Args:
+ method: http method, type str
+ url: url path, type str
+ body: http request body content, type dict
+ header: http request header , type dict
+ """
+
+ response = AsyncResponse()
+
+ try:
+ # number of retry in case of error (minimum 0 means no retry)
+ retry_limit = MAX_RETRY
+ mod_headers = dict(headers) # copy the headers
+ mod_headers["Connection"] = "keep-alive"
+ enc_body = None
+ if body: # if body is not empty
+ enc_body = urlencode(body)
+ mod_headers[
+ "Content-type"] = "application/x-www-form-urlencoded"
+ #mod_headers["Accept"] = "text/plain"
+ except Exception as e:
+ response.set_error(e)
+ return response
+
+ if DEBUG_LOG:
+ logger.debug("Request m:%s u:%s h:%s b:%s", method, url,
+ mod_headers, enc_body)
+ # retry loop
+ for i in xrange(retry_limit + 1):
+ try:
+ if i != 0:
+ if DEBUG_LOG:
+ logger.debug("retry request %s times" % i)
+ if self._connection.sock is None:
+ self._connection.connect()
+ self._connection.request(method, url, enc_body, mod_headers)
+ except Exception as e:
self._connection.close()
-
- def request(self, method, url, body={}, headers={}):
- """
- http request wrapper function, with retry capability in case of error.
- catch error exception and store it in AsyncResponse object
- return AsyncResponse object
-
- Args:
- method: http method, type str
- url: url path, type str
- body: http request body content, type dict
- header: http request header , type dict
- """
-
- response = AsyncResponse()
-
+ if i == retry_limit:
+ # new copy of e created everytime??
+ response.set_error(e)
+ else: # NOTE: this is try's else clause
+ # connect() and request() OK
try:
- # number of retry in case of error (minimum 0 means no retry)
- retry_limit = MAX_RETRY
- mod_headers = dict(headers) # copy the headers
- mod_headers["Connection"] = "keep-alive"
- enc_body = None
- if body: # if body is not empty
- enc_body = urlencode(body)
- mod_headers[
- "Content-type"] = "application/x-www-form-urlencoded"
- #mod_headers["Accept"] = "text/plain"
+ resp = self._connection.getresponse()
except Exception as e:
+ self._connection.close()
+ if i == retry_limit:
response.set_error(e)
- return response
-
- if DEBUG_LOG:
- logger.debug("Request m:%s u:%s h:%s b:%s", method, url,
- mod_headers, enc_body)
- # retry loop
- for i in xrange(retry_limit + 1):
- try:
- if i != 0:
- if DEBUG_LOG:
- logger.debug("retry request %s times" % i)
- if self._connection.sock is None:
- self._connection.connect()
- self._connection.request(method, url, enc_body, mod_headers)
- except Exception as e:
- self._connection.close()
- if i == retry_limit:
- # new copy of e created everytime??
- response.set_error(e)
- else: # NOTE: this is try's else clause
- # connect() and request() OK
- try:
- resp = self._connection.getresponse()
- except Exception as e:
- self._connection.close()
- if i == retry_limit:
- response.set_error(e)
- else: # NOTE: this is try's else clause
- # getresponse() OK
- resp_version = resp.version # int
- resp_status = resp.status # int
- resp_reason = resp.reason # str
- # resp.getheaders() returns list of tuples
- # converted to dict format
- resp_headers = dict(resp.getheaders())
- # NOTE: have to read the response before sending out next
- # http request
- resp_body = resp.read() # str
- response.set_resp(version=resp_version, status=resp_status,
- reason=resp_reason, headers=resp_headers,
- body=resp_body)
- break # exit retry loop
- # end of retry loop
- if DEBUG_LOG:
- logger.debug("Response %s", response)
- return response # AsyncResponse object
+ else: # NOTE: this is try's else clause
+ # getresponse() OK
+ resp_version = resp.version # int
+ resp_status = resp.status # int
+ resp_reason = resp.reason # str
+ # resp.getheaders() returns list of tuples
+ # converted to dict format
+ resp_headers = dict(resp.getheaders())
+ # NOTE: have to read the response before sending out next
+ # http request
+ resp_body = resp.read() # str
+ response.set_resp(version=resp_version, status=resp_status,
+ reason=resp_reason, headers=resp_headers,
+ body=resp_body)
+ break # exit retry loop
+ # end of retry loop
+ if DEBUG_LOG:
+ logger.debug("Response %s", response)
+ return response # AsyncResponse object
def connection_worker(host, request_queue, https=True, timeout=5, loop=True):
- """worker function which establishes connection and wait for request jobs
- from the request_queue
+ """worker function which establishes connection and wait for request jobs
+ from the request_queue
- Args:
- request_queue: the request queue storing the AsyncRequest object
- valid requests:
- GET
- POST
- DELETE
- KILL
- https: HTTPS (True) or HTTP (False)
- timeout: timeout for HTTP connection attempts and requests in seconds
- loop: This worker function stays in a loop waiting for request
- For testing purpose only. should always be set to True.
- """
+ Args:
+ request_queue: the request queue storing the AsyncRequest object
+ valid requests:
+ GET
+ POST
+ DELETE
+ KILL
+ https: HTTPS (True) or HTTP (False)
+ timeout: timeout for HTTP connection attempts and requests in seconds
+ loop: This worker function stays in a loop waiting for request
+ For testing purpose only. should always be set to True.
+ """
- connect = PredictionIOHttpConnection(host, https, timeout)
+ connect = PredictionIOHttpConnection(host, https, timeout)
- # loop waiting for job form request queue
- killed = not loop
+ # loop waiting for job form request queue
+ killed = not loop
- while True:
- # print "thread %s waiting for request" % thread.get_ident()
- request = request_queue.get(True) # NOTE: blocking get
- # print "get request %s" % request
- method = request.method
- if method == "GET":
- path = request.qpath
- d = connect.request("GET", path)
- elif method == "POST":
- path = request.path
- body = request.params
- d = connect.request("POST", path, body)
- elif method == "DELETE":
- path = request.qpath
- d = connect.request("DELETE", path)
- elif method == "KILL":
- # tell the thread to kill the connection
- killed = True
- d = AsyncResponse()
- else:
- d = AsyncResponse()
- d.set_error(NotSupportMethodError(
- "Don't Support the method %s" % method))
+ while True:
+ # print "thread %s waiting for request" % thread.get_ident()
+ request = request_queue.get(True) # NOTE: blocking get
+ # print "get request %s" % request
+ method = request.method
+ if method == "GET":
+ path = request.qpath
+ d = connect.request("GET", path)
+ elif method == "POST":
+ path = request.path
+ body = request.params
+ d = connect.request("POST", path, body)
+ elif method == "DELETE":
+ path = request.qpath
+ d = connect.request("DELETE", path)
+ elif method == "KILL":
+ # tell the thread to kill the connection
+ killed = True
+ d = AsyncResponse()
+ else:
+ d = AsyncResponse()
+ d.set_error(NotSupportMethodError(
+ "Don't Support the method %s" % method))
- d.set_request(request)
- request.set_response(d)
- request_queue.task_done()
- if killed:
- break
+ d.set_request(request)
+ request.set_response(d)
+ request_queue.task_done()
+ if killed:
+ break
- # end of while loop
- connect.close()
+ # end of while loop
+ connect.close()
class Connection(object):
- """abstract object for connection with server
+ """abstract object for connection with server
- spawn multiple connection_worker threads to handle jobs in the queue q
+ spawn multiple connection_worker threads to handle jobs in the queue q
+ """
+
+ def __init__(self, host, threads=1, qsize=0, https=True, timeout=5):
+ """constructor
+
+ Args:
+ host: host of the server.
+ threads: type int, number of threads to be spawn
+ qsize: size of the queue q
+ https: indicate it is httpS (True) or http connection (False)
+ timeout: timeout for HTTP connection attempts and requests in
+ seconds
"""
+ self.host = host
+ self.https = https
+ self.q = Queue.Queue(qsize) # if qsize=0, means infinite
+ self.threads = threads
+ self.timeout = timeout
+ # start thread based on threads number
+ self.tid = {} # dictionary of thread object
- def __init__(self, host, threads=1, qsize=0, https=True, timeout=5):
- """constructor
+ for i in xrange(threads):
+ tname = "PredictionIOThread-%s" % i # thread name
+ self.tid[i] = threading.Thread(
+ target=connection_worker, name=tname,
+ kwargs={'host': self.host, 'request_queue': self.q,
+ 'https': self.https, 'timeout': self.timeout})
+ self.tid[i].setDaemon(True)
+ self.tid[i].start()
- Args:
- host: host of the server.
- threads: type int, number of threads to be spawn
- qsize: size of the queue q
- https: indicate it is httpS (True) or http connection (False)
- timeout: timeout for HTTP connection attempts and requests in
- seconds
- """
- self.host = host
- self.https = https
- self.q = Queue.Queue(qsize) # if qsize=0, means infinite
- self.threads = threads
- self.timeout = timeout
- # start thread based on threads number
- self.tid = {} # dictionary of thread object
+ def make_request(self, request):
+ """put the request into the q
+ """
+ self.q.put(request)
- for i in xrange(threads):
- tname = "PredictionIOThread-%s" % i # thread name
- self.tid[i] = threading.Thread(
- target=connection_worker, name=tname,
- kwargs={'host':self.host, 'request_queue':self.q,
- 'https':self.https, 'timeout':self.timeout})
- self.tid[i].setDaemon(True)
- self.tid[i].start()
+ def pending_requests(self):
+ """number of pending requests in the queue
+ """
+ return self.q.qsize()
- def make_request(self, request):
- """put the request into the q
- """
- self.q.put(request)
+ def close(self):
+ """close this Connection. Call this when main program exits
+ """
+ # set kill message to q
+ for i in xrange(self.threads):
+ self.make_request(AsyncRequest("KILL", ""))
- def pending_requests(self):
- """number of pending requests in the queue
- """
- return self.q.qsize()
+ self.q.join() # wait for q empty
- def close(self):
- """close this Connection. Call this when main program exits
- """
- # set kill message to q
- for i in xrange(self.threads):
- self.make_request(AsyncRequest("KILL", ""))
-
- self.q.join() # wait for q empty
-
- for i in xrange(self.threads): # wait for all thread finish
- self.tid[i].join()
+ for i in xrange(self.threads): # wait for all thread finish
+ self.tid[i].join()