Small fixes for PEP8 compliance
diff --git a/predictionio/__init__.py b/predictionio/__init__.py
index 5f981bf..8863f65 100644
--- a/predictionio/__init__.py
+++ b/predictionio/__init__.py
@@ -20,54 +20,76 @@
from predictionio.connection import Connection
from predictionio.connection import AsyncRequest
-from predictionio.connection import AsyncResponse
from predictionio.connection import PredictionIOAPIError
"""Error exception defined for this API
Should be handled by the user
"""
+
+
class ServerStatusError(PredictionIOAPIError):
+
"Error happened when tried to get status of the API server"
pass
+
class UserNotCreatedError(PredictionIOAPIError):
+
"Error happened when tried to create user"
pass
+
class UserNotFoundError(PredictionIOAPIError):
+
"Error happened when tried to get user"
pass
+
class UserNotDeletedError(PredictionIOAPIError):
+
"Error happened when tried to delete user"
pass
+
class ItemNotCreatedError(PredictionIOAPIError):
+
"Error happened when tried to create item"
pass
+
class ItemNotFoundError(PredictionIOAPIError):
+
"Error happened when tried to get item"
pass
+
class ItemNotDeletedError(PredictionIOAPIError):
+
"Error happened when tried to delete item"
pass
+
class U2IActionNotCreatedError(PredictionIOAPIError):
+
"Error happened when tried to create user-to-item action"
pass
+
class ItemRecNotFoundError(PredictionIOAPIError):
+
"Error happened when tried to get item recommendation"
pass
+
class ItemSimNotFoundError(PredictionIOAPIError):
+
"Error happened when tried to get similar items"
pass
+
class InvalidArgumentError(PredictionIOAPIError):
+
"Arguments are not valid"
pass
@@ -78,21 +100,29 @@
CONVERSION_API = "conversion"
RATE_API = "rate"
+
class Client:
+
"""PredictionIO client object.
- This is an object representing a PredictionIO's client. This object provides methods for making PredictionIO API requests.
+ This is an object representing a PredictionIO's client. This object
+ provides methods for making PredictionIO API requests.
:param appkey: the App Key provided by PredictionIO.
- :param threads: number of threads to handle PredictionIO API requests. Must be >= 1.
+ :param threads: number of threads to handle PredictionIO API requests.
+ Must be >= 1.
:param apiurl: the PredictionIO API URL path.
- :param apiversion: the PredictionIO API version. (optional) (eg. "", or "/v1")
+ :param apiversion: the PredictionIO API version. (optional)
+ (eg. "", or "/v1")
:param qsize: the max size of the request queue (optional).
- The asynchronous request becomes blocking once this size has been reached, until the queued requests are handled.
+ The asynchronous request becomes blocking once this size has been
+ reached, until the queued requests are handled.
Default value is 0, which means infinite queue size.
"""
- def __init__(self, appkey, threads=1, apiurl="http://localhost:8000", apiversion = "", qsize=0):
+
+ def __init__(self, appkey, threads=1, apiurl="http://localhost:8000",
+ apiversion="", qsize=0):
"""Constructor of Client object.
"""
@@ -107,20 +137,22 @@
http_pattern = r'^http://(.*)'
m = re.match(https_pattern, apiurl)
self.https = True
- if m is None: # not matching https
+ if m is None: # not matching https
m = re.match(http_pattern, apiurl)
self.https = False
- if m is None: # not matching http either
+ if m is None: # not matching http either
raise InvalidArgumentError("apiurl is not valid: %s" % apiurl)
self.host = m.group(1)
- self._uid = None # identified uid
- self._connection = Connection(host=self.host, threads=self.threads, qsize=self.qsize, https=self.https)
+ self._uid = None # identified uid
+ self._connection = Connection(host=self.host, threads=self.threads,
+ qsize=self.qsize, https=self.https)
def close(self):
"""Close this client and the connection.
- Call this method when you want to completely terminate the connection with PredictionIO.
+ Call this method when you want to completely terminate the connection
+ with PredictionIO.
It will wait for all pending requests to finish.
"""
self._connection.close()
@@ -159,13 +191,14 @@
def _aget_status_resp(self, response):
"""Handle the AsyncResponse of get status request"""
if response.error is not None:
- raise ServerStatusError("Exception happened: %s for request %s" % \
+ raise ServerStatusError("Exception happened: %s for request %s" %
(response.error, response.request))
elif response.status != httplib.OK:
- raise ServerStatusError("request: %s status: %s body: %s" % \
- (response.request, response.status, response.body))
+ raise ServerStatusError("request: %s status: %s body: %s" %
+ (response.request, response.status,
+ response.body))
- #data = json.loads(response.body) # convert json string to dict
+ # data = json.loads(response.body) # convert json string to dict
return response.body
def acreate_user(self, uid, params={}):
@@ -173,28 +206,32 @@
:param uid: user id. type str.
:param params: optional attributes. type dictionary.
- For example, { 'custom': 'value', 'pio_inactive' : True, 'pio_latlng': [4.5,67.8] }
-
+ For example, { 'custom': 'value', 'pio_inactive' : True,
+ 'pio_latlng': [4.5,67.8] }
+
:returns:
- AsyncRequest object. You should call the aresp() method using this AsyncRequest
- object as argument to get the final result or status of this asynchronous request.
+ AsyncRequest object. You should call the aresp() method using this
+ AsyncRequest object as argument to get the final result or status
+ of this asynchronous request.
"""
-
+
if "pio_latlng" in params:
params["pio_latlng"] = ",".join(map(str, params["pio_latlng"]))
if "pio_inactive" in params:
params["pio_inactive"] = str(params["pio_inactive"]).lower()
path = "%s/users.json" % self.apiversion
- request = AsyncRequest("POST", path, pio_appkey=self.appkey, pio_uid=uid, **params)
+ request = AsyncRequest(
+ "POST", path, pio_appkey=self.appkey, pio_uid=uid, **params)
request.set_rfunc(self._acreate_user_resp)
self._connection.make_request(request)
return request
def _acreate_user_resp(self, response):
- """Private function to handle the AsyncResponse of the acreate_user request.
+ """Private function to handle the AsyncResponse of the acreate_user
+ request.
:param response: AsyncResponse object.
@@ -206,11 +243,12 @@
"""
if response.error is not None:
- raise UserNotCreatedError("Exception happened: %s for request %s" % \
+ raise UserNotCreatedError("Exception happened: %s for request %s" %
(response.error, response.request))
elif response.status != httplib.CREATED:
- raise UserNotCreatedError("request: %s status: %s body: %s" % \
- (response.request, response.status, response.body))
+ raise UserNotCreatedError("request: %s status: %s body: %s" %
+ (response.request, response.status,
+ response.body))
return None
@@ -220,11 +258,12 @@
:param uid: user id. type str.
:returns:
- AsyncRequest object. You should call the aresp() method using this AsyncRequest
- object as argument to get the final result or status of this asynchronous request.
+ AsyncRequest object. You should call the aresp() method using this
+ AsyncRequest object as argument to get the final result or status
+ of this asynchronous request.
"""
- enc_uid = urllib.quote(uid,"") # replace special char with %xx
+ enc_uid = urllib.quote(uid, "") # replace special char with %xx
path = "%s/users/%s.json" % (self.apiversion, enc_uid)
request = AsyncRequest("GET", path, pio_appkey=self.appkey)
request.set_rfunc(self._aget_user_resp)
@@ -233,7 +272,8 @@
return request
def _aget_user_resp(self, response):
- """Private function to handle the AsyncResponse of the aget_user request .
+ """Private function to handle the AsyncResponse of the aget_user
+ request .
:param response: AsyncResponse object.
@@ -245,13 +285,14 @@
"""
if response.error is not None:
- raise UserNotFoundError("Exception happened: %s for request %s" % \
+ raise UserNotFoundError("Exception happened: %s for request %s" %
(response.error, response.request))
elif response.status != httplib.OK:
- raise UserNotFoundError("request: %s status: %s body: %s" % \
- (response.request, response.status, response.body))
+ raise UserNotFoundError("request: %s status: %s body: %s" %
+ (response.request, response.status,
+ response.body))
- data = json.loads(response.body) # convert json string to dict
+ data = json.loads(response.body) # convert json string to dict
return data
def adelete_user(self, uid):
@@ -260,11 +301,12 @@
:param uid: user id. type str.
:returns:
- AsyncRequest object. You should call the aresp() method using this AsyncRequest
- object as argument to get the final result or status of this asynchronous request.
+ AsyncRequest object. You should call the aresp() method using this
+ AsyncRequest object as argument to get the final result or status
+ of this asynchronous request.
"""
-
- enc_uid = urllib.quote(uid,"") # replace special char with %xx
+
+ enc_uid = urllib.quote(uid, "") # replace special char with %xx
path = "%s/users/%s.json" % (self.apiversion, enc_uid)
request = AsyncRequest("DELETE", path, pio_appkey=self.appkey)
request.set_rfunc(self._adelete_user_resp)
@@ -273,7 +315,8 @@
return request
def _adelete_user_resp(self, response):
- """Private function to handle the AsyncResponse of the adelete_user request.
+ """Private function to handle the AsyncResponse of the adelete_user
+ request.
:param response: AsyncResponse object.
@@ -285,11 +328,12 @@
"""
if response.error is not None:
- raise UserNotDeletedError("Exception happened: %s for request %s" % \
+ raise UserNotDeletedError("Exception happened: %s for request %s" %
(response.error, response.request))
elif response.status != httplib.OK:
- raise UserNotDeletedError("request: %s status: %s body: %s" % \
- (response.request, response.status, response.body))
+ raise UserNotDeletedError("request: %s status: %s body: %s" %
+ (response.request, response.status,
+ response.body))
return None
def acreate_item(self, iid, itypes, params={}):
@@ -297,16 +341,19 @@
:param iid: item id. type str.
:param itypes: item types. Tuple of Str.
- For example, if this item belongs to item types "t1", "t2", "t3", "t4",
- then itypes=("t1", "t2", "t3", "t4").
- NOTE: if this item belongs to only one itype, use tuple of one element, eg. itypes=("t1",)
+ For example, if this item belongs to item types "t1", "t2",
+ "t3", "t4",then itypes=("t1", "t2", "t3", "t4").
+ NOTE: if this item belongs to only one itype, use tuple of one
+ element, eg. itypes=("t1",)
:param params: optional attributes. type dictionary.
- For example, { 'custom': 'value', 'pio_inactive' : True, 'pio_latlng': [4.5,67.8] }
+ For example, { 'custom': 'value', 'pio_inactive' : True,
+ 'pio_latlng': [4.5,67.8] }
:returns:
- AsyncRequest object. You should call the aresp() method using this AsyncRequest
- object as argument to get the final result or status of this asynchronous request.
+ AsyncRequest object. You should call the aresp() method using this
+ AsyncRequest object as argument to get the final result or status
+ of this asynchronous request.
"""
- itypes_str = ",".join(itypes) # join items with ","
+ itypes_str = ",".join(itypes) # join items with ","
if "pio_latlng" in params:
params["pio_latlng"] = ",".join(map(str, params["pio_latlng"]))
@@ -314,13 +361,15 @@
params["pio_inactive"] = str(params["pio_inactive"]).lower()
path = "%s/items.json" % self.apiversion
- request = AsyncRequest("POST", path, pio_appkey=self.appkey, pio_iid=iid, pio_itypes=itypes_str, **params)
+ request = AsyncRequest("POST", path, pio_appkey=self.appkey,
+ pio_iid=iid, pio_itypes=itypes_str, **params)
request.set_rfunc(self._acreate_item_resp)
self._connection.make_request(request)
return request
def _acreate_item_resp(self, response):
- """Private function to handle the AsyncResponse of the acreate_item request
+ """Private function to handle the AsyncResponse of the acreate_item
+ request
:param response: AsyncResponse object.
@@ -331,11 +380,12 @@
"""
if response.error is not None:
- raise ItemNotCreatedError("Exception happened: %s for request %s" % \
+ raise ItemNotCreatedError("Exception happened: %s for request %s" %
(response.error, response.request))
elif response.status != httplib.CREATED:
- raise ItemNotCreatedError("request: %s status: %s body: %s" % \
- (response.request, response.status, response.body))
+ raise ItemNotCreatedError("request: %s status: %s body: %s" %
+ (response.request, response.status,
+ response.body))
return None
def aget_item(self, iid):
@@ -344,8 +394,9 @@
:param iid: item id. type str.
:returns:
- AsyncRequest object. You should call the aresp() method using this AsyncRequest
- object as argument to get the final result or status of this asynchronous request.
+ AsyncRequest object. You should call the aresp() method using this
+ AsyncRequest object as argument to get the final result or status
+ of this asynchronous request.
"""
enc_iid = urllib.quote(iid, "")
path = "%s/items/%s.json" % (self.apiversion, enc_iid)
@@ -355,7 +406,8 @@
return request
def _aget_item_resp(self, response):
- """Private function to handle the AsyncResponse of the aget_item request
+ """Private function to handle the AsyncResponse of the aget_item
+ request
:param response: AsyncResponse object.
@@ -367,15 +419,17 @@
"""
if response.error is not None:
- raise ItemNotFoundError("Exception happened: %s for request %s" % \
+ raise ItemNotFoundError("Exception happened: %s for request %s" %
(response.error, response.request))
elif response.status != httplib.OK:
- raise ItemNotFoundError("request: %s status: %s body: %s" % \
- (response.request, response.status, response.body))
+ raise ItemNotFoundError("request: %s status: %s body: %s" %
+ (response.request, response.status,
+ response.body))
- data = json.loads(response.body) # convert json string to dict
+ data = json.loads(response.body) # convert json string to dict
if "pio_itypes" in data:
- data["pio_itypes"] = tuple(data["pio_itypes"]) # convert from list to tuple
+ # convert from list to tuple
+ data["pio_itypes"] = tuple(data["pio_itypes"])
return data
@@ -385,10 +439,11 @@
:param iid: item id. type str.
:returns:
- AsyncRequest object. You should call the aresp() method using this AsyncRequest
- object as argument to get the final result or status of this asynchronous request.
+ AsyncRequest object. You should call the aresp() method using this
+ AsyncRequest object as argument to get the final result or status
+ of this asynchronous request.
"""
-
+
enc_iid = urllib.quote(iid, "")
path = "%s/items/%s.json" % (self.apiversion, enc_iid)
request = AsyncRequest("DELETE", path, pio_appkey=self.appkey)
@@ -397,7 +452,8 @@
return request
def _adelete_item_resp(self, response):
- """Private function to handle the AsyncResponse of the adelete_item request
+ """Private function to handle the AsyncResponse of the adelete_item
+ request
:param response: AsyncResponse object
@@ -408,11 +464,12 @@
ItemNotDeletedError
"""
if response.error is not None:
- raise ItemNotDeletedError("Exception happened: %s for request %s" % \
+ raise ItemNotDeletedError("Exception happened: %s for request %s" %
(response.error, response.request))
elif response.status != httplib.OK:
- raise ItemNotDeletedError("request: %s status: %s body: %s" % \
- (response.request, response.status, response.body))
+ raise ItemNotDeletedError("request: %s status: %s body: %s" %
+ (response.request, response.status,
+ response.body))
return None
def _aget_user_itemrec_topn(self, engine, uid, n, params={}):
@@ -424,8 +481,9 @@
:param params: optional parameters. type dictionary
For example, { 'pio_itypes' : ("t1","t2") }
:returns:
- AsyncRequest object. You should call the aresp() method using this AsyncRequest
- object as argument to get the final result or status of this asynchronous request.
+ AsyncRequest object. You should call the aresp() method using this
+ AsyncRequest object as argument to get the final result or status
+ of this asynchronous request.
"""
if "pio_itypes" in params:
params["pio_itypes"] = ",".join(params["pio_itypes"])
@@ -435,13 +493,15 @@
params["pio_attributes"] = ",".join(params["pio_attributes"])
path = "%s/engines/itemrec/%s/topn.json" % (self.apiversion, engine)
- request = AsyncRequest("GET", path, pio_appkey=self.appkey, pio_uid=uid, pio_n=n, **params)
+ request = AsyncRequest("GET", path, pio_appkey=self.appkey,
+ pio_uid=uid, pio_n=n, **params)
request.set_rfunc(self._aget_user_itemrec_topn_resp)
self._connection.make_request(request)
return request
def _aget_user_itemrec_topn_resp(self, response):
- """Private function to handle the AsyncResponse of the aget_itemrec request
+ """Private function to handle the AsyncResponse of the aget_itemrec
+ request
:param response: AsyncResponse object
@@ -452,13 +512,15 @@
ItemRecNotFoundError.
"""
if response.error is not None:
- raise ItemRecNotFoundError("Exception happened: %s for request %s" % \
- (response.error, response.request))
+ raise ItemRecNotFoundError(
+ "Exception happened: %s for request %s" %
+ (response.error, response.request))
elif response.status != httplib.OK:
- raise ItemRecNotFoundError("request: %s status: %s body: %s" % \
- (response.request, response.status, response.body))
+ raise ItemRecNotFoundError("request: %s status: %s body: %s" %
+ (response.request, response.status,
+ response.body))
- data = json.loads(response.body) # convert json string to dict
+ data = json.loads(response.body) # convert json string to dict
return data
def aget_itemrec_topn(self, engine, n, params={}):
@@ -469,12 +531,14 @@
:param params: optional parameters. type dictionary
For example, { 'pio_itypes' : ("t1",) }
:returns:
- AsyncRequest object. You should call the aresp() method using this AsyncRequest
- object as argument to get the final result or status of this asynchronous request.
+ AsyncRequest object. You should call the aresp() method using this
+ AsyncRequest object as argument to get the final result or status
+ of this asynchronous request.
"""
if self._uid is None:
- raise InvalidArgumentError("uid is not identified. Please call identify(uid) first.")
+ raise InvalidArgumentError(
+ "uid is not identified. Please call identify(uid) first.")
request = self._aget_user_itemrec_topn(engine, self._uid, n, params)
return request
@@ -488,14 +552,16 @@
:param params: keyword arguments for optional attributes.
For example, pio_latlng="123.4, 56.7"
:returns:
- AsyncRequest object. You should call the aresp() method using this AsyncRequest
- object as argument to get the final result or status of this asynchronous request.
+ AsyncRequest object. You should call the aresp() method using this
+ AsyncRequest object as argument to get the final result or status
+ of this asynchronous request.
"""
request = self._aget_user_itemrec_topn(engine, uid, n, params)
return request
def _aget_itemsim_topn(self, engine, iid, n, params={}):
- """Private function to asynchronously get top n similar items of the item
+ """Private function to asynchronously get top n similar items of the
+ item
:param engine: name of the prediction engine. type str.
:param iid: item id. type str.
@@ -503,8 +569,9 @@
:param params: optional parameters. type dictionary
For example, { 'pio_itypes' : ("t1","t2") }
:returns:
- AsyncRequest object. You should call the aresp() method using this AsyncRequest
- object as argument to get the final result or status of this asynchronous request.
+ AsyncRequest object. You should call the aresp() method using this
+ AsyncRequest object as argument to get the final result or status
+ of this asynchronous request.
"""
if "pio_itypes" in params:
params["pio_itypes"] = ",".join(params["pio_itypes"])
@@ -514,13 +581,15 @@
params["pio_attributes"] = ",".join(params["pio_attributes"])
path = "%s/engines/itemsim/%s/topn.json" % (self.apiversion, engine)
- request = AsyncRequest("GET", path, pio_appkey=self.appkey, pio_iid=iid, pio_n=n, **params)
+ request = AsyncRequest("GET", path, pio_appkey=self.appkey,
+ pio_iid=iid, pio_n=n, **params)
request.set_rfunc(self._aget_itemsim_topn_resp)
self._connection.make_request(request)
return request
def _aget_itemsim_topn_resp(self, response):
- """Private function to handle the AsyncResponse of the aget_itemsim request
+ """Private function to handle the AsyncResponse of the aget_itemsim
+ request
:param response: AsyncResponse object
@@ -531,13 +600,15 @@
ItemSimNotFoundError.
"""
if response.error is not None:
- raise ItemSimNotFoundError("Exception happened: %s for request %s" % \
- (response.error, response.request))
+ raise ItemSimNotFoundError(
+ "Exception happened: %s for request %s" %
+ (response.error, response.request))
elif response.status != httplib.OK:
- raise ItemSimNotFoundError("request: %s status: %s body: %s" % \
- (response.request, response.status, response.body))
+ raise ItemSimNotFoundError("request: %s status: %s body: %s" %
+ (response.request, response.status,
+ response.body))
- data = json.loads(response.body) # convert json string to dict
+ data = json.loads(response.body) # convert json string to dict
return data
def aget_itemsim_topn(self, engine, iid, n, params={}):
@@ -549,8 +620,9 @@
:param params: optional parameters. type dictionary
For example, { 'pio_itypes' : ("t1",) }
:returns:
- AsyncRequest object. You should call the aresp() method using this AsyncRequest
- object as argument to get the final result or status of this asynchronous request.
+ AsyncRequest object. You should call the aresp() method using this
+ AsyncRequest object as argument to get the final result or status
+ of this asynchronous request.
"""
request = self._aget_itemsim_topn(engine, iid, n, params)
@@ -559,28 +631,35 @@
def _auser_action_on_item(self, action, uid, iid, params):
"""Private function to asynchronously create an user action on an item
- :param action: action type. type str. ("like", "dislike", "conversion", "rate", "view")
+ :param action: action type. type str. ("like", "dislike", "conversion",
+ "rate", "view")
:param uid: user id. type str or int.
:param iid: item id. type str or int.
:param params: optional attributes. type dictionary.
For example, { 'pio_rate' : 4, 'pio_latlng' : [1.23,4.56] }
- NOTE: For "rate" action, pio_rate attribute is required. integer value of 1-5 (1 is least preferred and 5 is most preferred)
+ NOTE: For "rate" action, pio_rate attribute is required.
+ integer value of 1-5 (1 is least preferred and 5 is most
+ preferred)
:returns:
- AsyncRequest object. You should call the aresp() method using this AsyncRequest
- object as argument to get the final result or status of this asynchronous request.
+ AsyncRequest object. You should call the aresp() method using this
+ AsyncRequest object as argument to get the final result or status
+ of this asynchronous request.
"""
if "pio_latlng" in params:
params["pio_latlng"] = ",".join(map(str, params["pio_latlng"]))
path = "%s/actions/u2i.json" % (self.apiversion)
- request = AsyncRequest("POST", path, pio_appkey=self.appkey, pio_action=action, pio_uid=uid, pio_iid=iid, **params)
+ request = AsyncRequest("POST", path, pio_appkey=self.appkey,
+ pio_action=action, pio_uid=uid, pio_iid=iid,
+ **params)
request.set_rfunc(self._auser_action_on_item_resp)
self._connection.make_request(request)
return request
def _auser_action_on_item_resp(self, response):
- """Private function to handle the AsyncResponse of the _auser_action_on_item request
+ """Private function to handle the AsyncResponse of the
+ _auser_action_on_item request
:param response: AsyncResponse object
@@ -591,46 +670,55 @@
U2IActionNotCreatedError
"""
if response.error is not None:
- raise U2IActionNotCreatedError("Exception happened: %s for request %s" % \
- (response.error, response.request))
+ raise U2IActionNotCreatedError(
+ "Exception happened: %s for request %s" %
+ (response.error, response.request))
elif response.status != httplib.CREATED:
- raise U2IActionNotCreatedError("request: %s status: %s body: %s" % \
- (response.request, response.status, response.body))
+ raise U2IActionNotCreatedError("request: %s status: %s body: %s" %
+ (response.request, response.status,
+ response.body))
return None
def arecord_action_on_item(self, action, iid, params={}):
"""Asynchronously create action on item
- :param action: action name. type String. For example, "rate", "like", etc
+ :param action: action name. type String. For example, "rate", "like",
+ etc
:param iid: item id. type str or int.
:param params: optional attributes. type dictionary.
For example, { 'pio_rate' : 4, 'pio_latlng': [4.5,67.8] }
- NOTE: For "rate" action, pio_rate attribute is required. integer value of 1-5 (1 is least preferred and 5 is most preferred)
-
+ NOTE: For "rate" action, pio_rate attribute is required.
+ integer value of 1-5 (1 is least preferred and 5 is most
+ preferred)
+
:returns:
- AsyncRequest object. You should call the aresp() method using this AsyncRequest
- object as argument to get the final result or status of this asynchronous request.
+ AsyncRequest object. You should call the aresp() method using this
+ AsyncRequest object as argument to get the final result or status
+ of this asynchronous request.
:raises:
U2IActionNotCreatedError
"""
if self._uid is None:
- raise InvalidArgumentError("uid is not identified. Please call identify(uid) first.")
+ raise InvalidArgumentError(
+ "uid is not identified. Please call identify(uid) first.")
request = self._auser_action_on_item(action, self._uid, iid, params)
return request
def auser_conversion_item(self, uid, iid, **params):
- """Deprecated. Asynchronously create an user conversion action on an item
+ """Deprecated. Asynchronously create an user conversion action on an
+ item
:param uid: user id. type str.
:param iid: item id. type str.
:param params: keyword arguments for optional attributes.
For example, pio_latlng=[123.4, 56.7]
:returns:
- AsyncRequest object. You should call the aresp() method using this AsyncRequest
- object as argument to get the final result or status of this asynchronous request.
+ AsyncRequest object. You should call the aresp() method using this
+ AsyncRequest object as argument to get the final result or status
+ of this asynchronous request.
"""
request = self._auser_action_on_item(CONVERSION_API, uid, iid, params)
return request
@@ -643,8 +731,9 @@
:param params: keyword arguments for optional attributes.
For example, pio_latlng=[123.4, 56.7]
:returns:
- AsyncRequest object. You should call the aresp() method using this AsyncRequest
- object as argument to get the final result or status of this asynchronous request.
+ AsyncRequest object. You should call the aresp() method using this
+ AsyncRequest object as argument to get the final result or status
+ of this asynchronous request.
"""
request = self._auser_action_on_item(DISLIKE_API, uid, iid, params)
return request
@@ -657,8 +746,9 @@
:param params: keyword arguments for optional attributes.
For example, pio_latlng=[123.4, 56.7]
:returns:
- AsyncRequest object. You should call the aresp() method using this AsyncRequest
- object as argument to get the final result or status of this asynchronous request.
+ AsyncRequest object. You should call the aresp() method using this
+ AsyncRequest object as argument to get the final result or status
+ of this asynchronous request.
"""
request = self._auser_action_on_item(LIKE_API, uid, iid, params)
return request
@@ -668,12 +758,14 @@
:param uid: user id. type str.
:param iid: item id. type str.
- :param rate: rating. integer value of 1-5 (1 is least preferred and 5 is most preferred)
+ :param rate: rating. integer value of 1-5 (1 is least preferred and 5
+ is most preferred)
:param params: keyword arguments for optional attributes.
For example, pio_latlng=[123.4, 56.7]
:returns:
- AsyncRequest object. You should call the aresp() method using this AsyncRequest
- object as argument to get the final result or status of this asynchronous request.
+ AsyncRequest object. You should call the aresp() method using this
+ AsyncRequest object as argument to get the final result or status
+ of this asynchronous request.
"""
params['pio_rate'] = rate
@@ -688,8 +780,9 @@
:param params: keyword arguments for optional attributes.
For example, pio_latlng=[123.4, 56.7]
:returns:
- AsyncRequest object. You should call the aresp() method using this AsyncRequest
- object as argument to get the final result or status of this asynchronous request.
+ AsyncRequest object. You should call the aresp() method using this
+ AsyncRequest object as argument to get the final result or status
+ of this asynchronous request.
"""
request = self._auser_action_on_item(VIEW_API, uid, iid, params)
return request
@@ -697,28 +790,36 @@
def aresp(self, request):
"""Get the result of the asynchronous request
- :param request: AsyncRequest object. This object must be returned by the asynchronous request function
- For example, to get the result of a aget_user() request, call this aresp() with the argument of
- AsyncRequest object returned by aget_user().
+ :param request: AsyncRequest object. This object must be returned by
+ the asynchronous request function
+ For example, to get the result of a aget_user()
+ request, call this aresp() with the argument of
+ AsyncRequest object returned by aget_user().
:returns:
- The result of this AsyncRequest. The return type is the same as the return type of corresponding blocking request.
+ The result of this AsyncRequest. The return type is the same as
+ the return type of corresponding blocking request.
For example,
- Calling aresp() with acreate_user() AsyncRequest returns the same type as create_user(), which is None.
+ Calling aresp() with acreate_user() AsyncRequest returns the same
+ type as create_user(), which is None.
- Calling aresp() with aget_user() AsyncRequest returns the same type as get_user(), which is dictionary data.
+ Calling aresp() with aget_user() AsyncRequest returns the same
+ type as get_user(), which is dictionary data.
:raises:
- Exception may be raised if there is error happened. The type of exception is the same as exception type
- of the correspdoning blocking request.
+ Exception may be raised if there is error happened. The type of
+ exception is the same as exception type of the correspdoning
+ blocking request.
For example,
- Calling aresp() with acreate_user() AsyncRequest may raise UserNotCreatedError exception.
+ Calling aresp() with acreate_user() AsyncRequest may raise
+ UserNotCreatedError exception.
- Calling aresp() with aget_user() AsyncRequest may raise UserNotFoundError exception.
+ Calling aresp() with aget_user() AsyncRequest may raise
+ UserNotFoundError exception.
"""
response = request.get_response()
@@ -730,7 +831,8 @@
:param uid: user id. type str.
:param params: optional attributes. type dictionary.
- For example, { 'custom': 'value', 'pio_inactive' : True, 'pio_latlng': [4.5,67.8] }
+ For example, { 'custom': 'value', 'pio_inactive' : True,
+ 'pio_latlng': [4.5,67.8] }
:returns:
None.
@@ -780,11 +882,13 @@
:param iid: item id. type str.
:param itypes: item types. Tuple of Str.
- For example, if this item belongs to item types "t1", "t2", "t3", "t4",
- then itypes=("t1", "t2", "t3", "t4").
- NOTE: if this item belongs to only one itype, use tuple of one element, eg. itypes=("t1",)
+ For example, if this item belongs to item types "t1", "t2",
+ "t3", "t4", then itypes=("t1", "t2", "t3", "t4").
+ NOTE: if this item belongs to only one itype, use tuple of one
+ element, eg. itypes=("t1",)
:param params: optional attributes. type dictionary.
- For example, { 'custom': 'value', 'pio_inactive' : True, 'pio_latlng': [4.5,67.8] }
+ For example, { 'custom': 'value', 'pio_inactive' : True,
+ 'pio_latlng': [4.5,67.8] }
:returns:
None
@@ -888,11 +992,14 @@
def record_action_on_item(self, action, iid, params={}):
"""Blocking request to create action on an item
- :param action: action name. type String. For example, "rate", "like", etc
+ :param action: action name. type String. For example, "rate", "like",
+ etc
:param iid: item id. type str.
:param params: optional attributes. type dictionary.
For example, { 'pio_rate' : 4, 'pio_latlng' : [1.23,4.56] }
- NOTE: For "rate" action, pio_rate attribute is required. integer value of 1-5 (1 is least preferred and 5 is most preferred)
+ NOTE: For "rate" action, pio_rate attribute is required.
+ integer value of 1-5 (1 is least preferred and 5 is most
+ preferred)
:returns:
None
@@ -905,7 +1012,8 @@
return result
def user_conversion_item(self, uid, iid, **params):
- """Deprecated. Blocking request to create user conversion action on an item
+ """Deprecated. Blocking request to create user conversion action on an
+ item
:param uid: user id. type str.
:param iid: item id. type str.
@@ -923,7 +1031,8 @@
return result
def user_dislike_item(self, uid, iid, **params):
- """Deprecated. Blocking request to create user dislike action on an item
+ """Deprecated. Blocking request to create user dislike action on an
+ item
:param uid: user id. type str.
:param iid: item id. type str.
@@ -963,7 +1072,8 @@
:param uid: user id. type str.
:param iid: item id. type str.
- :param rate: rating. integer value of 1-5 (1 is least preferred and 5 is most preferred)
+ :param rate: rating. integer value of 1-5 (1 is least preferred and 5
+ is most preferred)
:param params: keyword arguments for optional attributes.
For example, pio_latlng=[123.4, 56.7]
diff --git a/predictionio/connection.py b/predictionio/connection.py
index f114ed3..400b33a 100644
--- a/predictionio/connection.py
+++ b/predictionio/connection.py
@@ -1,33 +1,33 @@
import Queue
-import thread
import threading
import httplib
import urllib
-import time
import datetime
import logging
# some constants
-MAX_RETRY = 1 # 0 means no retry
+MAX_RETRY = 1 # 0 means no retry
# logger
logger = None
DEBUG_LOG = False
+
def enable_log(filename=None):
global logger
global DEBUG_LOG
timestamp = datetime.datetime.today()
- if filename is None:
- logfile = "./log/predictionio_%s.log" % timestamp.strftime("%Y-%m-%d_%H:%M:%S.%f")
+ if not filename:
+ logfile = "./log/predictionio_%s.log" % timestamp.strftime(
+ "%Y-%m-%d_%H:%M:%S.%f")
else:
logfile = filename
- logging.basicConfig(filename=logfile, \
- filemode='w', \
- level=logging.DEBUG, \
+ logging.basicConfig(filename=logfile,
+ filemode='w',
+ level=logging.DEBUG,
format='[%(levelname)s] %(name)s (%(threadName)s) %(message)s')
logger = logging.getLogger(__name__)
DEBUG_LOG = True
@@ -36,67 +36,79 @@
class PredictionIOAPIError(Exception):
pass
+
class NotSupportMethodError(PredictionIOAPIError):
pass
+
class ProgramError(PredictionIOAPIError):
pass
+
class AsyncRequest:
+
"""AsyncRequest object
-
+
"""
+
def __init__(self, method, path, **params):
self.method = method # "GET" "POST" etc
- self.path = path # the sub path eg. POST /v1/users.json GET /v1/users/1.json
- self.params = params # dictionary format eg. {"appkey" : 123, "id" : 3}
- self.response_q = Queue.Queue(1) # use queue to implement response, store AsyncResponse object
+ # the sub path eg. POST /v1/users.json GET /v1/users/1.json
+ self.path = path
+ # dictionary format eg. {"appkey" : 123, "id" : 3}
+ self.params = params
+ # use queue to implement response, store AsyncResponse object
+ self.response_q = Queue.Queue(1)
self.qpath = "%s?%s" % (self.path, urllib.urlencode(self.params))
self._response = None
- self.rfunc = None # response function to be called to handle the response
-
+ # response function to be called to handle the response
+ self.rfunc = None
+
def __str__(self):
- return "%s %s %s %s" % (self.method, self.path, self.params, self.qpath)
-
+ return "%s %s %s %s" % (self.method, self.path, self.params,
+ self.qpath)
+
def set_rfunc(self, func):
self.rfunc = func
-
+
def set_response(self, response):
""" store the response
-
+
NOTE: Must be only called once
"""
self.response_q.put(response)
-
+
def get_response(self):
"""get the response
-
+
"""
if self._response is None:
- self._response = self.response_q.get(True) # NOTE: blocking
-
- return self._response
-
+ self._response = self.response_q.get(True) # NOTE: blocking
+
+ return self._response
+
class AsyncResponse:
- """AsyncResponse object.
-
+
+ """AsyncResponse object.
+
Store the response of asynchronous request
-
- when get the response, user should check if error is None (which means no
+
+ when get the response, user should check if error is None (which means no
Exception happens)
if error is None, then should check if the status is expected
-
+
Attributes:
error: exception object if any happens
version: int
status: int
reason: str
headers: dict
- body: str (NOTE: not necessarily can be converted to JSON,
+ body: str (NOTE: not necessarily can be converted to JSON,
eg, for GET request to /v1/status)
request: the corresponding AsyncRequest object
"""
+
def __init__(self):
self.error = None
self.version = None
@@ -104,72 +116,76 @@
self.reason = None
self.headers = None
self.body = None
- self.request = None # point back to the request object
-
+ self.request = None # point back to the request object
+
def __str__(self):
- return "e:%s v:%s s:%s r:%s h:%s b:%s" % (self.error, self.version, self.status, \
- self.reason, self.headers, self.body)
-
+ return "e:%s v:%s s:%s r:%s h:%s b:%s" % (self.error, self.version,
+ self.status, self.reason,
+ self.headers, self.body)
+
def set_resp(self, version, status, reason, headers, body):
self.version = version
self.status = status
self.reason = reason
self.headers = headers
self.body = body
-
+
def set_error(self, error):
self.error = error
-
+
def set_request(self, request):
self.request = request
-
class PredictionIOHttpConnection():
+
def __init__(self, host, https=True):
- if https: # https connection
+ if https: # https connection
self._connection = httplib.HTTPSConnection(host)
else:
self._connection = httplib.HTTPConnection(host)
-
+
def connect(self):
self._connection.connect()
-
+
def close(self):
self._connection.close()
-
+
def request(self, method, url, body={}, headers={}):
"""
http request wrapper function, with retry capability in case of error.
catch error exception and store it in AsyncResponse object
return AsyncResponse object
-
+
Args:
method: http method, type str
url: url path, type str
body: http request body content, type dict
header: http request header , type dict
"""
-
+
response = AsyncResponse()
try:
- retry_limit = MAX_RETRY # number of retry in case of error (minimum 0 means no retry)
- mod_headers = dict(headers) # copy the headers
+ # number of retry in case of error (minimum 0 means no retry)
+ retry_limit = MAX_RETRY
+ mod_headers = dict(headers) # copy the headers
mod_headers["Connection"] = "keep-alive"
enc_body = None
- if body: # if body is not empty
+ if body: # if body is not empty
enc_body = urllib.urlencode(body)
- mod_headers["Content-type"] = "application/x-www-form-urlencoded"
+ mod_headers[
+ "Content-type"] = "application/x-www-form-urlencoded"
#mod_headers["Accept"] = "text/plain"
except Exception as e:
response.set_error(e)
return response
-
+
if DEBUG_LOG:
- logger.debug("Request m:%s u:%s h:%s b:%s", method, url, mod_headers, enc_body)
+ logger.debug("Request m:%s u:%s h:%s b:%s", method, url,
+ mod_headers, enc_body)
# retry loop
- for i in range(retry_limit+1):
+ for i in range(retry_limit + 1):
try:
if i != 0:
if DEBUG_LOG:
@@ -182,7 +198,7 @@
if i == retry_limit:
# new copy of e created everytime??
response.set_error(e)
- else: #NOTE: this is try's else clause
+ else: # NOTE: this is try's else clause
# connect() and request() OK
try:
resp = self._connection.getresponse()
@@ -190,50 +206,52 @@
self._connection.close()
if i == retry_limit:
response.set_error(e)
- else: #NOTE: this is try's else clause
+ else: # NOTE: this is try's else clause
# getresponse() OK
- resp_version = resp.version # int
- resp_status = resp.status # int
- resp_reason = resp.reason # str
+ resp_version = resp.version # int
+ resp_status = resp.status # int
+ resp_reason = resp.reason # str
# resp.getheaders() returns list of tuples
# converted to dict format
resp_headers = dict(resp.getheaders())
- # NOTE: have to read the response before sending out next http request
- resp_body = resp.read() # str
- response.set_resp(version=resp_version, status=resp_status, \
- reason=resp_reason, headers=resp_headers, \
+ # NOTE: have to read the response before sending out next
+ # http request
+ resp_body = resp.read() # str
+ response.set_resp(version=resp_version, status=resp_status,
+ reason=resp_reason, headers=resp_headers,
body=resp_body)
- break # exit retry loop
+ break # exit retry loop
# end of retry loop
if DEBUG_LOG:
logger.debug("Response %s", response)
- return response # AsyncResponse object
+ return response # AsyncResponse object
+
def connection_worker(host, request_queue, https=True, loop=True):
- """worker function which establishes connection and wait for request jobs
+ """worker function which establishes connection and wait for request jobs
from the request_queue
-
+
Args:
request_queue: the request queue storing the AsyncRequest object
- valid requests:
- GET
+ valid requests:
+ GET
POST
- DELETE
+ DELETE
KILL
https: HTTPS (True) or HTTP (False)
- loop: This worker function stays in a loop waiting for request
+ loop: This worker function stays in a loop waiting for request
For testing purpose only. should always be set to True.
"""
connect = PredictionIOHttpConnection(host, https)
-
+
# loop waiting for job form request queue
killed = not loop
-
+
while True:
- #print "thread %s waiting for request" % thread.get_ident()
- request = request_queue.get(True) # NOTE: blocking get
- #print "get request %s" % request
+ # print "thread %s waiting for request" % thread.get_ident()
+ request = request_queue.get(True) # NOTE: blocking get
+ # print "get request %s" % request
method = request.method
if method == "GET":
path = request.qpath
@@ -251,26 +269,29 @@
d = AsyncResponse()
else:
d = AsyncResponse()
- d.set_error(NotSupportMethodError("Don't Support the method %s" % method))
-
+ d.set_error(NotSupportMethodError(
+ "Don't Support the method %s" % method))
+
d.set_request(request)
request.set_response(d)
request_queue.task_done()
if killed:
break
-
+
# end of while loop
connect.close()
class Connection:
+
"""abstract object for connection with server
-
+
spawn multiple connection_worker threads to handle jobs in the queue q
"""
+
def __init__(self, host, threads=1, qsize=0, https=True):
"""constructor
-
+
Args:
host: host of the server.
threads: type int, number of threads to be spawn
@@ -279,23 +300,24 @@
"""
self.host = host
self.https = https
- self.q = Queue.Queue(qsize) # if qsize=0, means infinite
+ self.q = Queue.Queue(qsize) # if qsize=0, means infinite
self.threads = threads
# start thread based on threads number
- self.tid = {} # dictionary of thread object
-
+ self.tid = {} # dictionary of thread object
+
for i in range(threads):
- tname = "PredictionIOThread-%s" % i # thread name
- self.tid[i] = threading.Thread(target=connection_worker, name=tname, \
- args=(self.host, self.q, self.https))
+ tname = "PredictionIOThread-%s" % i # thread name
+ self.tid[i] = threading.Thread(
+ target=connection_worker, name=tname,
+ args=(self.host, self.q, self.https))
self.tid[i].setDaemon(True)
self.tid[i].start()
-
+
def make_request(self, request):
"""put the request into the q
"""
self.q.put(request)
-
+
def pending_requests(self):
"""number of pending requests in the queue
"""
@@ -307,11 +329,8 @@
# set kill message to q
for i in range(self.threads):
self.make_request(AsyncRequest("KILL", ""))
-
- self.q.join() # wait for q empty
-
- for i in range(self.threads): # wait for all thread finish
+
+ self.q.join() # wait for q empty
+
+ for i in range(self.threads): # wait for all thread finish
self.tid[i].join()
-
-
-
diff --git a/setup.py b/setup.py
index 2657910..7653cb4 100644
--- a/setup.py
+++ b/setup.py
@@ -9,7 +9,7 @@
author=__author__,
author_email=__email__,
packages=['predictionio'],
- url = 'http://prediction.io',
+ url='http://prediction.io',
license='LICENSE.txt',
description='PredictionIO Python SDK',
classifiers=['Programming Language :: Python',
@@ -44,4 +44,4 @@
PredictionIO API to Python programmers so that they
can focus on their application logic.
"""
-)
+ )