Merge branch 'develop'
diff --git a/.gitignore b/.gitignore
index 6311c3e..66fa4a6 100644
--- a/.gitignore
+++ b/.gitignore
@@ -8,3 +8,4 @@
MANIFEST
docs/build/
tests_env/
+*.zip
diff --git a/docs/source/conf.py b/docs/source/conf.py
index 43be1a0..40647a0 100644
--- a/docs/source/conf.py
+++ b/docs/source/conf.py
@@ -17,7 +17,10 @@
# add these directories to sys.path here. If the directory is relative to the
# documentation root, use os.path.abspath to make it absolute, like shown here.
#sys.path.insert(0, os.path.abspath('.'))
-sys.path.append("..")
+#sys.path.append("..")
+# Use path in the SDK. Hence need to override system package path by inserting
+# to position 0.
+sys.path.insert(0, "..")
# -- General configuration -----------------------------------------------------
@@ -42,16 +45,16 @@
# General information about the project.
project = u'PredictionIO Python SDK'
-copyright = u'2013, TappingStone, Inc.'
+copyright = u'2014, TappingStone, Inc.'
# The version info for the project you're documenting, acts as replacement for
# |version| and |release|, also used in various other places throughout the
# built documents.
#
# The short X.Y version.
-version = '0.7'
+version = '0.8'
# The full version, including alpha/beta/rc tags.
-release = '0.7.0'
+release = '0.8.0'
# The language for content autogenerated by Sphinx. Refer to documentation
# for a list of supported languages.
diff --git a/docs/source/index.rst b/docs/source/index.rst
index dfad529..bc7a25b 100644
--- a/docs/source/index.rst
+++ b/docs/source/index.rst
@@ -43,6 +43,7 @@
:maxdepth: 1
predictionio
+ predictionio_legacy
Getting Help
diff --git a/docs/source/predictionio.rst b/docs/source/predictionio.rst
index 4249a99..e7b90e7 100644
--- a/docs/source/predictionio.rst
+++ b/docs/source/predictionio.rst
@@ -3,122 +3,74 @@
.. automodule:: predictionio
-predictionio.Client Usage Overview
---------------------------------------
+The SDK comprises of two clients:
-Before making any request through the PredictionIO API, you need to create a client object for your App.
+1. EventClient, it is for importing data into the PredictionIO platform.
+2. EngineClient, it is for querying PredictionIO Engine Instance, submit query
+ and extract prediction results.
- >>> client = predictionio.Client(<your App Key>)
+Please read `PredictionIO Quick Start
+<http://docs.prediction.io/0.8.0/tutorials/engines/quickstart.html>`_ for
+detailed explanation.
-.. note:: The App Key can be found in the PredictionIO Admin Server web control panel.
+predictionio.EventClient Class
+------------------------------
-Afterwards, you can import data or retrieve recommendations for your App by calling methods of this object. For example,
+.. autoclass:: EventClient
+ :members:
-**User**
+ .. note::
- To import a user record from you App with user id = "u100":
+ The "threads" parameter specifies the number of connection threads to
+ the PredictionIO server. Minimum is 1. The client object will spawn
+ out the specified number of threads. Each of them will establish a
+ connection with the PredictionIO server and handle requests
+ concurrently.
- >>> client.create_user("u100")
+ .. note::
- To import a user record with the optional predefined location attribute "pio_latlng":
+ If you ONLY use `blocking request methods`,
+ setting "threads" to 1 is enough (higher number will not improve
+ anything since every request will be blocking). However, if you want
+ to take full advantage of
+ `asynchronous request methods`, you should
+ specify a larger number for "threads" to increase the performance of
+ handling concurrent requests (although setting "threads" to 1 will still
+ work). The optimal setting depends on your system and application
+ requirement.
+
- >>> client.create_user("u100", { "pio_latlng" : [1.23, 4.56] })
+predictionio.EngineClient Class
+------------------------------
- You may also define your own custom attribute "custom" = "value":
+.. autoclass:: EngineClient
+ :members:
- >>> client.create_user("u100", { "pio_latlng" : [1.23, 4.56], "custom": "value" })
+predictionio.AsyncRequest Class
+------------------------------
- .. note:: custom attributes and values could be any string but all attribute names with prefix ``'pio_'`` are reserved. You should not use the prefix ``'pio_'`` when define your custom attributes to avoid conflicts.
+.. autoclass:: AsyncRequest
+ :members:
-**Item**
-
- To import an item record from your App with item id = "i200" and item type = "type3":
-
- >>> client.create_item("i200", ("type3",))
-
- To import an item record with the predefined optional attribute "pio_latlng":
-
- >>> client.create_item("i200", ("type3",), { "pio_latlng" : [1.23, 4.56] })
-
- You may also define your own custom attribute "custom" = "value":
-
- >>> client.create_item("i200", ("type3",), { "pio_latlng" : [1.23, 4.56], "custom": "value" })
-
- .. note:: custom attributes and values could be any string but all attribute names with prefix ``'pio_'`` are reserved. You should not use the prefix ``'pio_'`` when define your custom attributes to avoid conflicts.
-
-**User Action on Item**
-
- To import a user "rate" action record from your App with user id = "u100", item id = "i200" and rating = 2:
-
- >>> client.identify("u100")
- >>> client.record_action_on_item("rate", "i200", { "pio_rate": 2 })
-
- .. note:: the "pio_rate" attribute is required for "rate" action.
-
- To import a "view" action record from your App for the same user and item:
-
- >>> client.record_action_on_item("view", "i200" )
-
- To import a "like" record with predefined optional timestamp attribute "pio_t":
-
- >>> client.record_action_on_item("like", "i200", { "pio_t": 12345678 })
-
- .. note:: predefined actions: "like", "dislike", "rate", "view", "conversion"
-
-**Item Recommendation Engine**
-
- When there is enough data imported from your App and the prediction results are ready, you can get recommendations for a user.
-
- To get top 5 item recommendation for the same user id from the item recommendation engine "engine-1"
-
- >>> result = client.get_itemrec_topn("engine-1", 5)
-
-**Item Similarity Engine**
-
- To get top 5 similar items of the item i200 from the item similarity engine "engine-2"
-
- >>> result = client.get_itemsim_topn("engine-2", "i200", 5)
-
- To get top 5 similar items given a list of items i200,i300,i400 from the item similarity engine "engine-2"
-
- >>> result = client.get_itemsim_topn("engine-2", "i200,i300,i400", 5)
-
-**Item Rank Engine**
-
- To rank a list of items i100, i101, i102, i103 for user "u100" from the item rank engine "engine-3"
-
- >>> client.identify("u100")
- >>> result = client.get_itemrank_ranked("engine-3", ["i100","i101","i102", "i103"])
-
-
-Please refer to the documentation of the :class:`predictionio.Client` class for more details of all available methods.
-
-
-Error Handling
---------------
-
-An exception will be raised when an error occur during the request. Please refer to the documentation of the :class:`predictionio.Client` class for details.
-In general, you may want to catch the exception and decide what to do with the error (such as logging it).
-
-For example, the method :meth:`~Client.record_action_on_item` may raise ``U2IActionNotCreatedError``.
-
- >>> try:
- >>> client.record_action_on_item("view", "i200")
- >>> except:
- >>> <log the error>
-
+predictionio SDK Usage Notes
+-------------------------
Asynchronous Requests
----------------------
+^^^^^^^^^^^^^^^^^^^^^
-In addition to normal :ref:`blocking (synchronous) request methods <sync-methods-label>`, this SDK also provides :ref:`methods which can generate asynchronous requests <async-methods-label>`.
-All methods prefixed with 'a' are asynchronous (eg, :meth:`~Client.acreate_user`, :meth:`~Client.acreate_item`).
-Asynchronous requests are handled by separate threads in the background, so you can generate multiple requests at the same time without waiting for any of them to finish.
-These methods return immediately without waiting for results, allowing your code to proceed to work on something else.
-The concept is to break a normal blocking request (such as :meth:`~Client.create_user`) into two steps:
+In addition to normal `blocking (synchronous) request methods`,
+this SDK also provides `non-blocking (asynchronous) request methods`.
+All methods
+prefixed with 'a' are asynchronous (eg, :meth:`~EventClient.aset_user`,
+:meth:`~EventClient.aset_item`). Asynchronous requests are handled by separate
+threads in the background, so you can generate multiple requests at the same
+time without waiting for any of them to finish. These methods return
+immediately without waiting for results, allowing your code to proceed to work
+on something else. The concept is to break a normal blocking request (such as
+:meth:`~EventClient.set_user`) into two steps:
-1. generate the request (e.g., calling :meth:`~Client.acreate_user`);
-2. get the request status and return data (calling :meth:`~Client.aresp`);
+1. generate the request (e.g., calling :meth:`~EngineClient.asend_query`);
+2. get the request's response by calling :meth:`~AsyncRequest.get_response`.
This allows you to do other work between these two steps.
@@ -129,208 +81,44 @@
If you do care about the request status or need to get the return data, then at a later time you will need to call :meth:`~Client.aresp` with the AsyncRequest object returned in step 1.
Please refer to the documentation of :ref:`asynchronous request methods <async-methods-label>` for more details.
-For example, the following code first generates an asynchronous request to retrieve recommendations, then get the result at later time::
+For example, the following code first generates an asynchronous request to
+retrieve recommendations, then get the result at later time::
>>> # Generates asynchronous request and return an AsyncRequest object
- >>> request = client.aget_itemrec_topn("engine-1", 5)
+ >>> engine_client = EngineClient()
+ >>> request = engine_client.asend_query(data={"uid": "1", "n" : 3})
>>> <...you can do other things here...>
>>> try:
- >>> result = client.aresp(request) # check the request status and get the return data.
+ >>> result = request.get_response() # check the request status and get the return data.
>>> except:
>>> <log the error>
Batch Import Data
------------------
+^^^^^^^^^^^^^^^^^^^^^
-When you import large amount of data at once, you may also use asynchronous request methods to generate lots of requests in the beginning and then check the status at a later time to minimize run time.
+When you import large amount of data at once, you may also use asynchronous
+request methods to generate lots of requests in the beginning and then check the
+status at a later time to minimize run time.
For example, to import 100000 of user records::
>>> # generate 100000 asynchronous requests and store the AsyncRequest objects
- >>> req = {}
+ >>> event_client = EventClient(app_id=1)
>>> for i in range(100000):
- >>> req[i] = client.acreate_user(user_record[i].uid)
+ >>> event_client.aset_user(user_record[i].uid)
>>>
>>> <...you can do other things here...>
>>>
- >>> # now check the status of the previous asynchronous requests
- >>> for i in range(100000):
- >>> try:
- >>> result = client.aresp(req[i])
- >>> except:
- >>> <log the error>
+ >>> # calling close will block until all requests are processed
+ >>> event_client.close()
Alternatively, you can use blocking requests to import large amount of data, but this has significantly lower performance::
>>> for i in range(100000):
>>> try:
- >>> client.create_user(user_record[i].uid)
+ >>> client.set_user(user_record[i].uid)
>>> except:
>>> <log the error>
-|
-
-predictionio.Client Class
----------------------------------
-
-.. Autoclass:: Client
-
- .. note::
-
- The "threads" parameter specifies the number of connection threads to
- the PredictionIO API server. Minimum is 1. The client object will spawn
- out the specified number of threads. Each of them will establish a
- connection with the PredictionIO API server and handle requests
- concurrently.
-
- .. note::
-
- If you ONLY use :ref:`blocking request methods <sync-methods-label>`,
- setting "threads" to 1 is enough (higher number will not improve
- anything since every request will be blocking). However, if you want
- to take full advantage of
- :ref:`asynchronous request methods <async-methods-label>`, you should
- specify a larger number for "threads" to increase the performance of
- handling concurrent requests (although setting "threads" to 1 will still
- work). The optimal setting depends on your system and application
- requirement.
-
- .. automethod:: close
- .. automethod:: identify
-
- .. versionadded:: 0.5.0
-
- .. automethod:: pending_requests
-
- .. versionadded:: 0.6.1
-
- |
-
- .. _sync-methods-label:
-
- .. note:: The following is blocking (synchronous) request methods
-
- .. automethod:: get_status
- .. automethod:: create_user
- .. automethod:: get_user
- .. automethod:: delete_user
-
- .. automethod:: create_item
- .. automethod:: get_item
- .. automethod:: delete_item
-
- .. automethod:: record_action_on_item
-
- .. versionadded:: 0.5.0
-
- .. automethod:: get_itemrec_topn
-
- .. versionadded:: 0.5.0
-
- .. versionchanged:: 0.6.0
- Change the order of parameters.
-
- .. automethod:: get_itemsim_topn
-
- .. versionadded:: 0.6.0
-
- .. automethod:: get_itemrank_ranked
-
- .. versionadded:: 0.7.0
-
- .. automethod:: get_itemrec
-
- .. deprecated:: 0.5.0
- Use :func:`get_itemrec_topn` instead.
-
- .. automethod:: user_conversion_item
-
- .. deprecated:: 0.5.0
- Use :func:`record_action_on_item` instead.
-
- .. automethod:: user_dislike_item
-
- .. deprecated:: 0.5.0
- Use :func:`record_action_on_item` instead.
-
- .. automethod:: user_like_item
-
- .. deprecated:: 0.5.0
- Use :func:`record_action_on_item` instead.
-
- .. automethod:: user_rate_item
-
- .. deprecated:: 0.5.0
- Use :func:`record_action_on_item` instead.
-
- .. automethod:: user_view_item
-
- .. deprecated:: 0.5.0
- Use :func:`record_action_on_item` instead.
-
- |
-
- .. _async-methods-label:
-
- .. note:: The following is non-blocking (asynchronous) request methods
-
- .. automethod:: acreate_user
- .. automethod:: aget_user
- .. automethod:: adelete_user
-
- .. automethod:: acreate_item
- .. automethod:: aget_item
- .. automethod:: adelete_item
-
- .. automethod:: arecord_action_on_item
-
- .. versionadded:: 0.5.0
-
- .. automethod:: aget_itemrec_topn
-
- .. versionadded:: 0.5.0
-
- .. versionchanged:: 0.6.0
- Change the order of parameters.
-
- .. automethod:: aget_itemsim_topn
-
- .. versionadded:: 0.6.0
-
- .. automethod:: aget_itemrank_ranked
-
- .. versionadded:: 0.7.0
-
- .. automethod:: aget_itemrec
-
- .. deprecated:: 0.5.0
- Use :func:`aget_itemrec_topn` instead.
-
- .. automethod:: auser_conversion_item
-
- .. deprecated:: 0.5.0
- Use :func:`arecord_action_on_item` instead.
-
- .. automethod:: auser_dislike_item
-
- .. deprecated:: 0.5.0
- Use :func:`arecord_action_on_item` instead.
-
- .. automethod:: auser_like_item
-
- .. deprecated:: 0.5.0
- Use :func:`arecord_action_on_item` instead.
-
- .. automethod:: auser_rate_item
-
- .. deprecated:: 0.5.0
- Use :func:`arecord_action_on_item` instead.
-
- .. automethod:: auser_view_item
-
- .. deprecated:: 0.5.0
- Use :func:`arecord_action_on_item` instead.
-
- .. automethod:: aresp
diff --git a/docs/source/predictionio_legacy.rst b/docs/source/predictionio_legacy.rst
new file mode 100644
index 0000000..42e8edb
--- /dev/null
+++ b/docs/source/predictionio_legacy.rst
@@ -0,0 +1,325 @@
+predictionio Legacy Package Documentation
+====================================
+
+.. automodule:: predictionio
+
+predictionio.Client (Deprecated) Usage Overview
+--------------------------------------
+
+.. deprecated:: 0.8.0
+
+Before making any request through the PredictionIO API, you need to create a
+client object for your App.
+
+ >>> client = predictionio.Client(<your App Key>)
+
+.. note:: The App Key can be found in the PredictionIO Admin Server web control panel.
+
+Afterwards, you can import data or retrieve recommendations for your App by
+calling methods of this object. For example,
+
+**User**
+
+ To import a user record from you App with user id = "u100":
+
+ >>> client.create_user("u100")
+
+ To import a user record with the optional predefined location attribute "pio_latlng":
+
+ >>> client.create_user("u100", { "pio_latlng" : [1.23, 4.56] })
+
+ You may also define your own custom attribute "custom" = "value":
+
+ >>> client.create_user("u100", { "pio_latlng" : [1.23, 4.56], "custom": "value" })
+
+ .. note:: custom attributes and values could be any string but all attribute names with prefix ``'pio_'`` are reserved. You should not use the prefix ``'pio_'`` when define your custom attributes to avoid conflicts.
+
+**Item**
+
+ To import an item record from your App with item id = "i200" and item type = "type3":
+
+ >>> client.create_item("i200", ("type3",))
+
+ To import an item record with the predefined optional attribute "pio_latlng":
+
+ >>> client.create_item("i200", ("type3",), { "pio_latlng" : [1.23, 4.56] })
+
+ You may also define your own custom attribute "custom" = "value":
+
+ >>> client.create_item("i200", ("type3",), { "pio_latlng" : [1.23, 4.56], "custom": "value" })
+
+ .. note:: custom attributes and values could be any string but all attribute names with prefix ``'pio_'`` are reserved. You should not use the prefix ``'pio_'`` when define your custom attributes to avoid conflicts.
+
+**User Action on Item**
+
+ To import a user "rate" action record from your App with user id = "u100", item id = "i200" and rating = 2:
+
+ >>> client.identify("u100")
+ >>> client.record_action_on_item("rate", "i200", { "pio_rate": 2 })
+
+ .. note:: the "pio_rate" attribute is required for "rate" action.
+
+ To import a "view" action record from your App for the same user and item:
+
+ >>> client.record_action_on_item("view", "i200" )
+
+ To import a "like" record with predefined optional timestamp attribute "pio_t":
+
+ >>> client.record_action_on_item("like", "i200", { "pio_t": 12345678 })
+
+ .. note:: predefined actions: "like", "dislike", "rate", "view", "conversion"
+
+**Item Recommendation Engine**
+
+ When there is enough data imported from your App and the prediction results are ready, you can get recommendations for a user.
+
+ To get top 5 item recommendation for the same user id from the item recommendation engine "engine-1"
+
+ >>> result = client.get_itemrec_topn("engine-1", 5)
+
+**Item Similarity Engine**
+
+ To get top 5 similar items of the item i200 from the item similarity engine "engine-2"
+
+ >>> result = client.get_itemsim_topn("engine-2", "i200", 5)
+
+ To get top 5 similar items given a list of items i200,i300,i400 from the item similarity engine "engine-2"
+
+ >>> result = client.get_itemsim_topn("engine-2", "i200,i300,i400", 5)
+
+**Item Rank Engine**
+
+ To rank a list of items i100, i101, i102, i103 for user "u100" from the item rank engine "engine-3"
+
+ >>> client.identify("u100")
+ >>> result = client.get_itemrank_ranked("engine-3", ["i100","i101","i102", "i103"])
+
+
+Please refer to the documentation of the :class:`predictionio.Client` class for more details of all available methods.
+
+
+Error Handling
+--------------
+
+An exception will be raised when an error occur during the request. Please refer to the documentation of the :class:`predictionio.Client` class for details.
+In general, you may want to catch the exception and decide what to do with the error (such as logging it).
+
+For example, the method :meth:`~Client.record_action_on_item` may raise ``U2IActionNotCreatedError``.
+
+ >>> try:
+ >>> client.record_action_on_item("view", "i200")
+ >>> except:
+ >>> <log the error>
+
+
+Asynchronous Requests
+---------------------
+
+In addition to normal :ref:`blocking (synchronous) request methods <sync-methods-label>`, this SDK also provides :ref:`methods which can generate asynchronous requests <async-methods-label>`.
+All methods prefixed with 'a' are asynchronous (eg, :meth:`~Client.acreate_user`, :meth:`~Client.acreate_item`).
+Asynchronous requests are handled by separate threads in the background, so you can generate multiple requests at the same time without waiting for any of them to finish.
+These methods return immediately without waiting for results, allowing your code to proceed to work on something else.
+The concept is to break a normal blocking request (such as :meth:`~Client.create_user`) into two steps:
+
+1. generate the request (e.g., calling :meth:`~Client.acreate_user`);
+2. get the request status and return data (calling :meth:`~Client.aresp`);
+
+This allows you to do other work between these two steps.
+
+.. note::
+ In some cases you may not care whether the request is successful for performance or application-specific reasons, then you can simply skip step 2.
+
+.. note::
+ If you do care about the request status or need to get the return data, then at a later time you will need to call :meth:`~Client.aresp` with the AsyncRequest object returned in step 1.
+ Please refer to the documentation of :ref:`asynchronous request methods <async-methods-label>` for more details.
+
+For example, the following code first generates an asynchronous request to retrieve recommendations, then get the result at later time::
+
+ >>> # Generates asynchronous request and return an AsyncRequest object
+ >>> request = client.aget_itemrec_topn("engine-1", 5)
+ >>> <...you can do other things here...>
+ >>> try:
+ >>> result = client.aresp(request) # check the request status and get the return data.
+ >>> except:
+ >>> <log the error>
+
+
+Batch Import Data
+-----------------
+
+When you import large amount of data at once, you may also use asynchronous request methods to generate lots of requests in the beginning and then check the status at a later time to minimize run time.
+
+For example, to import 100000 of user records::
+
+ >>> # generate 100000 asynchronous requests and store the AsyncRequest objects
+ >>> req = {}
+ >>> for i in range(100000):
+ >>> req[i] = client.acreate_user(user_record[i].uid)
+ >>>
+ >>> <...you can do other things here...>
+ >>>
+ >>> # now check the status of the previous asynchronous requests
+ >>> for i in range(100000):
+ >>> try:
+ >>> result = client.aresp(req[i])
+ >>> except:
+ >>> <log the error>
+
+Alternatively, you can use blocking requests to import large amount of data, but this has significantly lower performance::
+
+ >>> for i in range(100000):
+ >>> try:
+ >>> client.create_user(user_record[i].uid)
+ >>> except:
+ >>> <log the error>
+
+
+
+
+predictionio.Client Class
+---------------------------------
+
+.. autoclass:: Client
+.. deprecated:: 0.8.0
+ Use :class:`EventClient` and :class:`EngineClient` instead.
+
+
+ .. automethod:: close
+ .. automethod:: identify
+
+ .. versionadded:: 0.5.0
+
+ .. automethod:: pending_requests
+
+ .. versionadded:: 0.6.1
+
+ |
+
+ .. _sync-methods-label:
+
+ .. note:: The following is blocking (synchronous) request methods
+
+ .. automethod:: get_status
+ .. automethod:: create_user
+ .. automethod:: get_user
+ .. automethod:: delete_user
+
+ .. automethod:: create_item
+ .. automethod:: get_item
+ .. automethod:: delete_item
+
+ .. automethod:: record_action_on_item
+
+ .. versionadded:: 0.5.0
+
+ .. automethod:: get_itemrec_topn
+
+ .. versionadded:: 0.5.0
+
+ .. versionchanged:: 0.6.0
+ Change the order of parameters.
+
+ .. automethod:: get_itemsim_topn
+
+ .. versionadded:: 0.6.0
+
+ .. automethod:: get_itemrank_ranked
+
+ .. versionadded:: 0.7.0
+
+ .. automethod:: get_itemrec
+
+ .. deprecated:: 0.5.0
+ Use :func:`get_itemrec_topn` instead.
+
+ .. automethod:: user_conversion_item
+
+ .. deprecated:: 0.5.0
+ Use :func:`record_action_on_item` instead.
+
+ .. automethod:: user_dislike_item
+
+ .. deprecated:: 0.5.0
+ Use :func:`record_action_on_item` instead.
+
+ .. automethod:: user_like_item
+
+ .. deprecated:: 0.5.0
+ Use :func:`record_action_on_item` instead.
+
+ .. automethod:: user_rate_item
+
+ .. deprecated:: 0.5.0
+ Use :func:`record_action_on_item` instead.
+
+ .. automethod:: user_view_item
+
+ .. deprecated:: 0.5.0
+ Use :func:`record_action_on_item` instead.
+
+ |
+
+ .. _async-methods-label:
+
+ .. note:: The following is non-blocking (asynchronous) request methods
+
+ .. automethod:: acreate_user
+ .. automethod:: aget_user
+ .. automethod:: adelete_user
+
+ .. automethod:: acreate_item
+ .. automethod:: aget_item
+ .. automethod:: adelete_item
+
+ .. automethod:: arecord_action_on_item
+
+ .. versionadded:: 0.5.0
+
+ .. automethod:: aget_itemrec_topn
+
+ .. versionadded:: 0.5.0
+
+ .. versionchanged:: 0.6.0
+ Change the order of parameters.
+
+ .. automethod:: aget_itemsim_topn
+
+ .. versionadded:: 0.6.0
+
+ .. automethod:: aget_itemrank_ranked
+
+ .. versionadded:: 0.7.0
+
+ .. automethod:: aget_itemrec
+
+ .. deprecated:: 0.5.0
+ Use :func:`aget_itemrec_topn` instead.
+
+ .. automethod:: auser_conversion_item
+
+ .. deprecated:: 0.5.0
+ Use :func:`arecord_action_on_item` instead.
+
+ .. automethod:: auser_dislike_item
+
+ .. deprecated:: 0.5.0
+ Use :func:`arecord_action_on_item` instead.
+
+ .. automethod:: auser_like_item
+
+ .. deprecated:: 0.5.0
+ Use :func:`arecord_action_on_item` instead.
+
+ .. automethod:: auser_rate_item
+
+ .. deprecated:: 0.5.0
+ Use :func:`arecord_action_on_item` instead.
+
+ .. automethod:: auser_view_item
+
+ .. deprecated:: 0.5.0
+ Use :func:`arecord_action_on_item` instead.
+
+ .. automethod:: aresp
+
+
diff --git a/examples/demo-movielens/README.md b/examples/demo-movielens/README.md
new file mode 100644
index 0000000..bcf5aed
--- /dev/null
+++ b/examples/demo-movielens/README.md
@@ -0,0 +1,16 @@
+PredictionIO Python SDK Example - MoveLens
+==========================================
+
+Please execute all commands from repository root.
+
+Step 1. Get sample data and unzip it.
+```
+$ curl -o ml-100k.zip http://www.grouplens.org/system/files/ml-100k.zip
+$ unzip ml-100k.zip
+```
+
+Step 2. Run this app:
+```
+$ python -m examples.demo-movielens.batch_import <app_id> <server_url>
+```
+
diff --git a/examples/itemrec/movies/__init__.py b/examples/demo-movielens/__init__.py
similarity index 100%
copy from examples/itemrec/movies/__init__.py
copy to examples/demo-movielens/__init__.py
diff --git a/examples/demo-movielens/appdata.py b/examples/demo-movielens/appdata.py
new file mode 100644
index 0000000..d9637ef
--- /dev/null
+++ b/examples/demo-movielens/appdata.py
@@ -0,0 +1,183 @@
+
+import datetime
+from operator import itemgetter, attrgetter
+
+# can get sample data here:
+# wget http://www.grouplens.org/system/files/ml-100k.zip
+# app data file config
+APPDATA_DIRNAME = "ml-100k"
+USERS_FILENAME = "u.user"
+USERS_FILE_DELIMITER = "|"
+ITEMS_FILENAME = "u.item"
+ITEMS_FILE_DELIMITER = "|"
+RATE_ACTIONS_FILENAME = "u.data"
+RATE_ACTIONS_DELIMITER = "\t"
+
+
+class User:
+ def __init__(self, uid):
+ self.uid = uid
+ self.rec = [] # recommendations, list of iid
+
+ def __str__(self):
+ return "User[uid=%s,rec=%s]" % (self.uid, self.rec)
+
+class Item:
+ def __init__(self, iid, name, release_date, genres, year):
+ self.iid = iid
+ self.name = name
+ self.release_date = release_date # datetime.datetime object
+ self.genres = genres
+ self.year = year
+
+ def __str__(self):
+ return "Item[iid=%s,name=%s,release_date=%s,genres=%s]" % (self.iid, self.name, self.release_date, self.genres)
+
+class RateAction:
+ def __init__(self, uid, iid, rating, t):
+ self.uid = uid
+ self.iid = iid
+ self.rating = rating
+ self.t = t
+
+ def __str__(self):
+ return "RateAction[uid=%s,iid=%s,rating=%s,t=%s]" % (self.uid, self.iid, self.rating, self.t)
+
+
+class AppData:
+
+ def __init__(self):
+ self._users = {} # dict of User obj
+ self._items = {} # dict of Item obj
+ self._rate_actions = [] # list of RateAction obj
+
+ self._users_file = "%s/%s" % (APPDATA_DIRNAME, USERS_FILENAME)
+ self._items_file = "%s/%s" % (APPDATA_DIRNAME, ITEMS_FILENAME)
+ self._rate_actions_file = "%s/%s" % (APPDATA_DIRNAME, RATE_ACTIONS_FILENAME)
+ self.__init_users()
+ self.__init_items()
+ self.__init_rate_actions()
+
+ def __init_users(self):
+ """
+ uid|
+ """
+ print "[Info] Initializing users..."
+ f = open(self._users_file, 'r')
+ for line in f:
+ data = line.rstrip('\r\n').split(USERS_FILE_DELIMITER)
+ self.add_user(User(data[0]))
+ f.close()
+ print "[Info] %s users were initialized." % len(self._users)
+
+ def __init_items(self):
+ """
+ movie id | movie title | release date | video release date |
+ IMDb URL | unknown | Action | Adventure | Animation |
+ Children's | Comedy | Crime | Documentary | Drama | Fantasy |
+ Film-Noir | Horror | Musical | Mystery | Romance | Sci-Fi |
+ Thriller | War | Western |
+ The last 19 fields are the genres, a 1 indicates the movie
+ is of that genre, a 0 indicates it is not; movies can be in
+ several genres at once.
+
+ """
+ genre_names = [ "unknown", "Action", "Adventure", "Animation",
+ "Children's", "Comedy", "Crime", "Documentary", "Drama", "Fantasy",
+ "Film-Noir", "Horror", "Musical", "Mystery", "Romance", "Sci-Fi",
+ "Thriller", "War", "Western"]
+
+ print "[Info] Initializing items..."
+ f = open(self._items_file, 'r')
+ for line in f:
+ data = line.rstrip('\r\n').split(ITEMS_FILE_DELIMITER)
+ genres_flags = data[5:24]
+
+ genres = () # tuple of genres
+ for g,flag in zip(genre_names, genres_flags):
+ if flag == '1':
+ genres = genres + (g,)
+
+ try:
+ # eg. 01-Jan-1994
+ release_date = datetime.datetime.strptime(data[2], "%d-%b-%Y").replace(microsecond=1)
+ (day, month, year) = data[2].split('-')
+ except:
+ print "[Note] item %s %s doesn't have release date. Skip it." % (data[0], data[1])
+ else:
+ self.add_item(Item(
+ iid=data[0],
+ name=data[1],
+ release_date=release_date,
+ genres=genres,
+ year=year))
+ f.close()
+ print "[Info] %s items were initialized." % len(self._items)
+
+ def __init_rate_actions(self):
+ """
+ uid|iid|rating|timestamp
+ """
+ print "[Info] Initializing rate actions..."
+ f = open(self._rate_actions_file, 'r')
+ for line in f:
+ data = line.rstrip('\r\n').split(RATE_ACTIONS_DELIMITER)
+ t = datetime.datetime.utcfromtimestamp(int(data[3])).replace(microsecond=1)
+ self.add_rate_action(RateAction(data[0], data[1], data[2], t))
+ f.close()
+ print "[Info] %s rate actions were initialized." % len(self._rate_actions)
+
+ def add_user(self, user):
+ self._users[user.uid] = user
+
+ def add_item(self, item):
+ self._items[item.iid] = item
+
+ def add_rate_action(self, action):
+ self._rate_actions.append(action)
+
+ def get_users(self):
+ return self._users
+
+ def get_items(self):
+ return self._items
+
+ def get_rate_actions(self):
+ return self._rate_actions
+
+ def get_user(self, uid):
+ """return single user
+ """
+ if uid in self._users:
+ return self._users[uid]
+ else:
+ return None
+
+ def get_item(self, iid):
+ """return single item
+ """
+ if iid in self._items:
+ return self._items[iid]
+ else:
+ return None
+
+ def get_top_rated_items(self, uid, n):
+ """get top n rated iids by this uid
+ """
+ if uid in self._users:
+ actions = filter(lambda u: u.uid==uid, self._rate_actions)
+ top = sorted(actions, key=attrgetter('rating'), reverse=True)
+ topn_iids = map(lambda a: a.iid, top[:n])
+ return topn_iids
+ else:
+ return None
+
+ def get_top_rate_actions(self, uid, n):
+ """get top n rated actions by this uid
+ """
+ if uid in self._users:
+ actions = filter(lambda u: u.uid==uid, self._rate_actions)
+ top = sorted(actions, key=attrgetter('rating'), reverse=True)
+ return top[:n]
+ else:
+ return None
diff --git a/examples/demo-movielens/batch_import.py b/examples/demo-movielens/batch_import.py
new file mode 100644
index 0000000..fa2e1c0
--- /dev/null
+++ b/examples/demo-movielens/batch_import.py
@@ -0,0 +1,91 @@
+
+from appdata import AppData
+import predictionio
+import sys
+import pytz
+
+def batch_import_task(app_id, app_data, client, all_info=False):
+
+ print "[Info] Importing users to PredictionIO..."
+ count = 0
+ for k, v in app_data.get_users().iteritems():
+ count += 1
+ if all_info:
+ print "[Info] Importing %s..." % v
+ else:
+ if (count % 32 == 0):
+ sys.stdout.write('\r[Info] %s' % count)
+ sys.stdout.flush()
+
+ client.aset_user(uid=v.uid)
+
+ sys.stdout.write('\r[Info] %s users were imported.\n' % count)
+ sys.stdout.flush()
+
+ print "[Info] Importing items to PredictionIO..."
+ count = 0
+ for k, v in app_data.get_items().iteritems():
+ count += 1
+ if all_info:
+ print "[Info] Importing %s..." % v
+ else:
+ if (count % 32 == 0):
+ sys.stdout.write('\r[Info] %s' % count)
+ sys.stdout.flush()
+
+ itypes = ("movie",) + v.genres
+ client.aset_item(iid=v.iid,
+ properties={
+ "pio_itypes" : list(itypes),
+ "pio_starttime" : v.release_date.isoformat() + 'Z',
+ "name" : v.name,
+ "year" : v.year } )
+
+ sys.stdout.write('\r[Info] %s items were imported.\n' % count)
+ sys.stdout.flush()
+
+ print "[Info] Importing rate actions to PredictionIO..."
+ count = 0
+ for v in app_data.get_rate_actions():
+ count += 1
+ if all_info:
+ print "[Info] Importing %s..." % v
+ else:
+ if (count % 32 == 0):
+ sys.stdout.write('\r[Info] %s' % count)
+ sys.stdout.flush()
+
+ properties = { "pio_rating" : int(v.rating) }
+ req = client.acreate_event(
+ event="rate",
+ entity_type="pio_user",
+ entity_id=v.uid,
+ target_entity_type="pio_item",
+ target_entity_id=v.iid,
+ properties=properties,
+ event_time=v.t.replace(tzinfo=pytz.utc),
+ )
+
+ sys.stdout.write('\r[Info] %s rate actions were imported.\n' % count)
+ sys.stdout.flush()
+
+
+if __name__ == '__main__':
+ if len(sys.argv) < 3:
+ sys.exit("Usage: python -m examples.demo-movielens.batch_import "
+ "<app_id> <url>")
+
+ app_id = int(sys.argv[1])
+
+ client = predictionio.EventClient(
+ app_id=app_id,
+ url=sys.argv[2],
+ threads=5,
+ qsize=500)
+
+ # Test connection
+ print "Status:", client.get_status()
+
+ app_data = AppData()
+ batch_import_task(app_id, app_data, client)
+ client.close()
diff --git a/examples/event_sample.py b/examples/event_sample.py
new file mode 100644
index 0000000..2ba2069
--- /dev/null
+++ b/examples/event_sample.py
@@ -0,0 +1,134 @@
+from predictionio import EventClient
+from predictionio import NotFoundError
+from datetime import datetime
+import pytz
+import sys
+
+client = EventClient(app_id=4, url="http://localhost:7070")
+
+# Check status
+print("Check status")
+print(client.get_status())
+
+# First event
+first_event_properties = {
+ "prop1" : 1,
+ "prop2" : "value2",
+ "prop3" : [1, 2, 3],
+ "prop4" : True,
+ "prop5" : ["a", "b", "c"],
+ "prop6" : 4.56 ,
+ }
+first_event_time = datetime(
+ 2004, 12, 13, 21, 39, 45, 618000, pytz.timezone('US/Mountain'))
+first_event_response = client.create_event(
+ event="my_event",
+ entity_type="user",
+ entity_id="uid",
+ properties=first_event_properties,
+ first_event_time=first_event_time,
+ )
+print("First Event response")
+print(first_event_response)
+print
+
+# Second event
+second_event_properties = {
+ "someProperty" : "value1",
+ "anotherProperty" : "value2",
+ }
+second_event_response = client.create_event(
+ event="my_event",
+ entity_type="user",
+ entity_id="uid",
+ target_entity_type="item",
+ target_entity_id="iid",
+ properties=second_event_properties,
+ event_time=datetime(2014, 12, 13, 21, 38, 45, 618000, pytz.utc))
+print("Second Event response")
+print(second_event_response)
+print
+
+
+# Get the first event from Event Server
+first_event_id = first_event_response.json_body["eventId"]
+print("Get Event")
+event = client.get_event(first_event_id)
+print(event)
+print
+
+# Delete the first event from Event Server
+print("Delete Event")
+delete_response = client.delete_event(first_event_id)
+print(delete_response)
+print
+
+# Delete the first event from Event Server again should yield exception.
+print("Delete Event Again")
+try:
+ delete_response = client.delete_event(first_event_id)
+except NotFoundError, ex:
+ print("The expected error: {0}".format(ex))
+print
+
+# "user"-helper methods
+
+# Set user properties implicitly create a user
+# This call creates a user "foo", and set the properties of "foo".
+print("Create user foo")
+foo_properties = {"city": "sunnyvale", "car": "honda fit"}
+print(client.set_user("foo", properties=foo_properties))
+
+# This call overrides the existing properties for user "foo", setting "car" to
+# a new "honda odyssey" and create a new property "food" to "seafood".
+print("Set new properties")
+foo_properties = {"car": "honda odyssey", "food": "seafood"}
+print(client.set_user("foo", properties=foo_properties))
+
+# This call removes the specified properties. It ignores the value of the dict.
+# After this call, the "city" will become an unset field.
+print("Unset properties")
+foo_properties = {"city": "x"}
+print(client.unset_user("foo", properties=foo_properties))
+
+# This call deletes a user
+print("Delete user")
+print(client.delete_user("foo"))
+
+# The SDK also support specifying the eventTime. It is useful for importing
+# events happened in the past.
+foo_time = datetime(2014, 8, 31, 4, 56, tzinfo=pytz.timezone('US/Pacific'))
+print("Create user at " + str(foo_time))
+print(client.set_user("Jarvis", {}, foo_time))
+
+# "item"-helper methods
+
+# Set item properties implicitly create a item
+# This call creates a item "bar", and set the properties of "bar".
+print("Create item bar")
+bar_properties = {"city": "santa clara", "weight": 6.9}
+print(client.set_item("bar", properties=bar_properties))
+
+# Similar to user-methods, we can do the same thing with item
+print("Set new properties")
+bar_properties = {"weight": 6.2}
+print(client.set_item("bar", properties=bar_properties))
+
+# This call removes the specified properties. It ignores the value of the dict.
+# After this call, the "city" will become an unset field.
+print("Unset properties")
+bar_properties = {"city": None}
+print(client.unset_item("bar", properties=bar_properties))
+
+# This call deletes a item
+print("Delete item")
+print(client.delete_item("bar"))
+
+
+# "record" action helper functions
+
+# This call creates a event between a user and an item. In particular, this set
+# the price of the action
+print("Record user action")
+action_properties = {"price": 10.0}
+print(client.record_user_action_on_item("buy", "foo", "bar", action_properties))
diff --git a/examples/import_yahoo.py b/examples/import_yahoo.py
new file mode 100644
index 0000000..69421d4
--- /dev/null
+++ b/examples/import_yahoo.py
@@ -0,0 +1,208 @@
+"""
+Import historical stock data from yahoo finance.
+"""
+
+import argparse
+from datetime import datetime
+import predictionio
+import pytz
+import time
+from pandas.io import data as pdata
+import numpy
+
+EPOCH = datetime(1970, 1, 1, tzinfo=pytz.utc)
+
+SP500_LIST = [
+ "A", "AA", "AAPL", "ABBV", "ABC", "ABT", "ACE", "ACN", "ACT", "ADBE", "ADI",
+ "ADM", "ADP", "ADS", "ADSK", "ADT", "AEE", "AEP", "AES", "AET", "AFL",
+ "AGN", "AIG", "AIV", "AIZ", "AKAM", "ALL", "ALLE", "ALTR", "ALXN", "AMAT",
+ "AME", "AMGN", "AMP", "AMT", "AMZN", "AN", "AON", "APA", "APC", "APD",
+ "APH", "ARG", "ATI", "AVB", "AVP", "AVY", "AXP", "AZO", "BA", "BAC", "BAX",
+ "BBBY", "BBT", "BBY", "BCR", "BDX", "BEAM", "BEN", "BF-B", "BHI", "BIIB",
+ "BK", "BLK", "BLL", "BMS", "BMY", "BRCM", "BRK-B", "BSX", "BTU", "BWA",
+ "BXP", "C", "CA", "CAG", "CAH", "CAM", "CAT", "CB", "CBG", "CBS", "CCE",
+ "CCI", "CCL", "CELG", "CERN", "CF", "CFN", "CHK", "CHRW", "CI", "CINF",
+ "CL", "CLX", "CMA", "CMCSA", "CME", "CMG", "CMI", "CMS", "CNP", "CNX",
+ "COF", "COG", "COH", "COL", "COP", "COST", "COV", "CPB", "CRM", "CSC",
+ "CSCO", "CSX", "CTAS", "CTL", "CTSH", "CTXS", "CVC", "CVS", "CVX", "D",
+ "DAL", "DD", "DE", "DFS", "DG", "DGX", "DHI", "DHR", "DIS", "DISCA", "DLPH",
+ "DLTR", "DNB", "DNR", "DO", "DOV", "DOW", "DPS", "DRI", "DTE", "DTV", "DUK",
+ "DVA", "DVN", "EA", "EBAY", "ECL", "ED", "EFX", "EIX", "EL", "EMC", "EMN",
+ "EMR", "EOG", "EQR", "EQT", "ESRX", "ESS", "ESV", "ETFC", "ETN", "ETR",
+ "EW", "EXC", "EXPD", "EXPE", "F", "FAST", "FB", "FCX", "FDO", "FDX", "FE",
+ "FFIV", "FIS", "FISV", "FITB", "FLIR", "FLR", "FLS", "FMC", "FOSL", "FOXA",
+ "FRX", "FSLR", "FTI", "FTR", "GAS", "GCI", "GD", "GE", "GGP", "GHC", "GILD",
+ "GIS", "GLW", "GM", "GMCR", "GME", "GNW", "GOOG", "GOOGL", "GPC", "GPS",
+ "GRMN", "GS", "GT", "GWW", "HAL", "HAR", "HAS", "HBAN", "HCBK", "HCN",
+ "HCP", "HD", "HES", "HIG", "HOG", "HON", "HOT", "HP", "HPQ", "HRB", "HRL",
+ "HRS", "HSP", "HST", "HSY", "HUM", "IBM", "ICE", "IFF", "IGT", "INTC",
+ "INTU", "IP", "IPG", "IR", "IRM", "ISRG", "ITW", "IVZ", "JBL", "JCI", "JEC",
+ "JNJ", "JNPR", "JOY", "JPM", "JWN", "K", "KEY", "KIM", "KLAC", "KMB", "KMI",
+ "KMX", "KO", "KORS", "KR", "KRFT", "KSS", "KSU", "L", "LB", "LEG", "LEN",
+ "LH", "LLL", "LLTC", "LLY", "LM", "LMT", "LNC", "LO", "LOW", "LRCX", "LSI",
+ "LUK", "LUV", "LYB", "M", "MA", "MAC", "MAR", "MAS", "MAT", "MCD", "MCHP",
+ "MCK", "MCO", "MDLZ", "MDT", "MET", "MHFI", "MHK", "MJN", "MKC", "MMC",
+ "MMM", "MNST", "MO", "MON", "MOS", "MPC", "MRK", "MRO", "MS", "MSFT", "MSI",
+ "MTB", "MU", "MUR", "MWV", "MYL", "NBL", "NBR", "NDAQ", "NE", "NEE", "NEM",
+ "NFLX", "NFX", "NI", "NKE", "NLSN", "NOC", "NOV", "NRG", "NSC", "NTAP",
+ "NTRS", "NU", "NUE", "NVDA", "NWL", "NWSA", "OI", "OKE", "OMC", "ORCL",
+ "ORLY", "OXY", "PAYX", "PBCT", "PBI", "PCAR", "PCG", "PCL", "PCLN", "PCP",
+ "PDCO", "PEG", "PEP", "PETM", "PFE", "PFG", "PG", "PGR", "PH", "PHM", "PKI",
+ "PLD", "PLL", "PM", "PNC", "PNR", "PNW", "POM", "PPG", "PPL", "PRGO", "PRU",
+ "PSA", "PSX", "PVH", "PWR", "PX", "PXD", "QCOM", "QEP", "R", "RAI", "RDC",
+ "REGN", "RF", "RHI", "RHT", "RIG", "RL", "ROK", "ROP", "ROST", "RRC", "RSG",
+ "RTN", "SBUX", "SCG", "SCHW", "SE", "SEE", "SHW", "SIAL", "SJM", "SLB",
+ "SLM", "SNA", "SNDK", "SNI", "SO", "SPG", "SPLS", "SRCL", "SRE", "STI",
+ "STJ", "STT", "STX", "STZ", "SWK", "SWN", "SWY", "SYK", "SYMC", "SYY", "T",
+ "TAP", "TDC", "TE", "TEG", "TEL", "TGT", "THC", "TIF", "TJX", "TMK", "TMO",
+ "TRIP", "TROW", "TRV", "TSCO", "TSN", "TSO", "TSS", "TWC", "TWX", "TXN",
+ "TXT", "TYC", "UNH", "UNM", "UNP", "UPS", "URBN", "USB", "UTX", "V", "VAR",
+ "VFC", "VIAB", "VLO", "VMC", "VNO", "VRSN", "VRTX", "VTR", "VZ", "WAG",
+ "WAT", "WDC", "WEC", "WFC", "WFM", "WHR", "WIN", "WLP", "WM", "WMB", "WMT",
+ "WU", "WY", "WYN", "WYNN", "X", "XEL", "XL", "XLNX", "XOM", "XRAY", "XRX",
+ "XYL", "YHOO", "YUM", "ZION", "ZMH", "ZTS"]
+
+ETF_LIST = ["QQQ", "SPY", "XLY", "XLP", "XLE", "XLF", "XLV",
+ "XLI", "XLB", "XLK", "XLU"]
+
+
+def since_epoch(dt):
+ return (dt - EPOCH).total_seconds()
+
+
+def import_data(client, app_id, ticker, start_time, end_time, event_time):
+ print "Importing:", ticker, start_time, end_time
+
+ try:
+ df = pdata.DataReader(ticker, 'yahoo', start_time, end_time)
+ print "Extracted:", df.index[0], df.index[-1]
+ except IOError, ex:
+ print ex
+ print "Data not exist. Returning"
+ return
+
+ # assume we only extract US data
+ eastern = pytz.timezone('US/Eastern')
+
+ columns = [
+ ('Open', 'open'),
+ ('High', 'high'),
+ ('Low', 'low'),
+ ('Close', 'close'),
+ ('Volume', 'volume'),
+ ('Adj Close', 'adjclose')]
+
+ yahoo_data = dict()
+ yahoo_data['ticker'] = ticker
+ yahoo_data['t'] = [
+ # hour=16 to indicate market close time
+ since_epoch(eastern.localize(date_.to_pydatetime().replace(hour=16)))
+ for date_ in df.index]
+
+ for column in columns:
+ yahoo_data[column[1]] = map(numpy.asscalar, df[column[0]].values)
+
+ properties = {'yahoo': yahoo_data}
+
+ response = client.create_event(
+ event='$set',
+ entity_type='yahoo',
+ entity_id=ticker,
+ properties=properties,
+ event_time=event_time.replace(tzinfo=pytz.utc))
+
+ print(response)
+
+
+def import_all(app_id):
+ """This method import all SP500 stocks and some SPDR ETFs."""
+ time_slices = [
+ (datetime(1999, 1, 1), datetime(2004, 1, 1), datetime(2004, 1, 2)),
+ (datetime(2003, 12, 1), datetime(2009, 1, 1), datetime(2009, 1, 2)),
+ (datetime(2008, 12, 1), datetime(2014, 9, 1), datetime(2014, 9, 2)),
+ ]
+
+ url = 'http://localhost:7070'
+ client = predictionio.EventClient(app_id=app_id, threads=1, url=url)
+
+ tickers = SP500_LIST + ETF_LIST
+
+ for ticker in tickers:
+ for time_slice in time_slices:
+ import_data(client, app_id, ticker,
+ time_slice[0], time_slice[1], time_slice[2])
+
+
+def import_data_with_gaps(app_id):
+ """This method import data with time gaps.
+
+ Data imported by this method is used by stock engine, it demonsrates how it
+ can handle time series data with gaps.
+ """
+
+ # time_slices is discontinuted
+ # startTime, endTime, eventDate
+ time_slices = [
+ (datetime(2013, 12, 1), datetime(2014, 2, 1), datetime(2014, 2, 2)),
+ (datetime(2014, 1, 1), datetime(2014, 1, 20), datetime(2014, 2, 10)),
+ (datetime(2014, 1, 10), datetime(2014, 2, 20), datetime(2014, 2, 28)),
+ (datetime(2014, 2, 10), datetime(2014, 3, 31), datetime(2014, 4, 2)),
+ (datetime(2014, 5, 1), datetime(2014, 6, 15), datetime(2014, 6, 20)),
+ (datetime(2014, 6, 1), datetime(2014, 7, 1), datetime(2014, 7, 15)),
+ ]
+
+ tickers = ['SPY', 'AAPL', 'IBM', 'MSFT']
+
+ url = 'http://localhost:7070'
+ client = predictionio.EventClient(app_id=app_id, threads=1, url=url)
+
+ for ticker in tickers:
+ for time_slice in time_slices:
+ import_data(client, app_id, ticker,
+ time_slice[0], time_slice[1], time_slice[2])
+
+ # below are data with holes
+ time_slices = [
+ (datetime(2014, 1, 1), datetime(2014, 1, 20), datetime(2014, 2, 10)),
+ (datetime(2014, 2, 10), datetime(2014, 3, 31), datetime(2014, 4, 2)),
+ (datetime(2014, 6, 1), datetime(2014, 7, 1), datetime(2014, 7, 15)),
+ ]
+
+ tickers = ['AMZN']
+ for ticker in tickers:
+ for time_slice in time_slices:
+ import_data(client, app_id, ticker,
+ time_slice[0], time_slice[1], time_slice[2])
+
+ time_slices = [
+ (datetime(2014, 1, 10), datetime(2014, 2, 20), datetime(2014, 2, 28)),
+ (datetime(2014, 2, 10), datetime(2014, 3, 31), datetime(2014, 4, 2)),
+ ]
+ tickers = ['FB']
+ for ticker in tickers:
+ for time_slice in time_slices:
+ import_data(client, app_id, ticker,
+ time_slice[0], time_slice[1], time_slice[2])
+
+
+def import_one(app_id):
+ """Import TSLA.
+
+ Import data with from 2014-01-01 until 2014-03-01. event_time specifies when
+ this data is extracted.
+ """
+ start_time = datetime(2014, 1, 1)
+ end_time = datetime(2014, 3, 1)
+ event_time = datetime(2014, 9, 1)
+ ticker = 'TSLA'
+
+ url = 'http://localhost:7070'
+ client = predictionio.EventClient(app_id=app_id, threads=1, url=url)
+
+ import_data(client, app_id, ticker, start_time, end_time, event_time)
+
+
+if __name__ == '__main__':
+ #import_all(app_id=2)
+ import_data_with_gaps(app_id=1)
+ #import_one(app_id=1)
diff --git a/examples/itemrank_quick_query.py b/examples/itemrank_quick_query.py
new file mode 100644
index 0000000..3c43713
--- /dev/null
+++ b/examples/itemrank_quick_query.py
@@ -0,0 +1,23 @@
+"""
+itemrank quickstart query
+"""
+
+import predictionio
+
+client = predictionio.EngineClient("http://localhost:8000")
+
+# Rank item 1 to 5 for each user
+item_ids = [str(i) for i in range(1, 6)]
+user_ids = [str(x) for x in range(1, 6)] + ["NOT_EXIST_USER"]
+for user_id in user_ids:
+ print "Rank item 1 to 5 for user", user_id
+ try:
+ response = client.send_query({
+ "uid": user_id,
+ "iids": item_ids
+ })
+ print response
+ except predictionio.PredictionIOAPIError as e:
+ print 'Caught exception:', e
+
+client.close()
diff --git a/examples/itemrank_quick_start.py b/examples/itemrank_quick_start.py
new file mode 100644
index 0000000..b76abc1
--- /dev/null
+++ b/examples/itemrank_quick_start.py
@@ -0,0 +1,42 @@
+"""
+itemrank quickstart import data
+"""
+
+import predictionio
+
+import random
+
+def import_itemrank(app_id):
+
+ random.seed()
+
+ client = predictionio.EventClient(app_id=app_id)
+
+ print client.get_status()
+
+ # generate 10 users, with user ids 1,2,....,10
+ user_ids = [str(i) for i in range(1, 11)]
+ for user_id in user_ids:
+ print "Set user", user_id
+ client.set_user(user_id)
+
+ # generate 50 items, with item ids 1,2,....,50
+ # assign type id 1 to all of them
+ item_ids = [str(i) for i in range(1, 51)]
+ for item_id in item_ids:
+ print "Set item", item_id
+ client.set_item(item_id, {
+ "pio_itypes" : ['1']
+ })
+
+ # each user randomly views 10 items
+ for user_id in user_ids:
+ for viewed_item in random.sample(item_ids, 10):
+ print "User", user_id ,"views item", viewed_item
+ client.record_user_action_on_item("view", user_id, viewed_item)
+
+ client.close()
+
+
+if __name__ == '__main__':
+ import_itemrank(7)
diff --git a/examples/itemrec/__init__.py b/examples/obsolete/__init__.py
similarity index 100%
copy from examples/itemrec/__init__.py
copy to examples/obsolete/__init__.py
diff --git a/examples/itemrec/__init__.py b/examples/obsolete/itemrec/__init__.py
similarity index 100%
rename from examples/itemrec/__init__.py
rename to examples/obsolete/itemrec/__init__.py
diff --git a/examples/itemrec/movies/.gitignore b/examples/obsolete/itemrec/movies/.gitignore
similarity index 100%
rename from examples/itemrec/movies/.gitignore
rename to examples/obsolete/itemrec/movies/.gitignore
diff --git a/examples/itemrec/movies/README.md b/examples/obsolete/itemrec/movies/README.md
similarity index 100%
rename from examples/itemrec/movies/README.md
rename to examples/obsolete/itemrec/movies/README.md
diff --git a/examples/itemrec/movies/__init__.py b/examples/obsolete/itemrec/movies/__init__.py
similarity index 100%
rename from examples/itemrec/movies/__init__.py
rename to examples/obsolete/itemrec/movies/__init__.py
diff --git a/examples/itemrec/movies/app_config.py b/examples/obsolete/itemrec/movies/app_config.py
similarity index 100%
rename from examples/itemrec/movies/app_config.py
rename to examples/obsolete/itemrec/movies/app_config.py
diff --git a/examples/itemrec/movies/appdata.py b/examples/obsolete/itemrec/movies/appdata.py
similarity index 100%
rename from examples/itemrec/movies/appdata.py
rename to examples/obsolete/itemrec/movies/appdata.py
diff --git a/examples/itemrec/movies/batch_import.py b/examples/obsolete/itemrec/movies/batch_import.py
similarity index 100%
rename from examples/itemrec/movies/batch_import.py
rename to examples/obsolete/itemrec/movies/batch_import.py
diff --git a/examples/itemrec/movies/movie_rec_app.py b/examples/obsolete/itemrec/movies/movie_rec_app.py
similarity index 100%
rename from examples/itemrec/movies/movie_rec_app.py
rename to examples/obsolete/itemrec/movies/movie_rec_app.py
diff --git a/predictionio/__init__.py b/predictionio/__init__.py
index 30e39e6..d7c369f 100644
--- a/predictionio/__init__.py
+++ b/predictionio/__init__.py
@@ -9,8 +9,10 @@
__copyright__ = "Copyright 2014, TappingStone, Inc."
__license__ = "Apache License, Version 2.0"
-__version__ = "0.7.0"
+__version__ = "0.8.0"
+# import deprecated libraries.
+from predictionio.obsolete import Client
# import packages
import re
@@ -23,138 +25,59 @@
import json
import urllib
+from datetime import datetime
+import pytz
+
from predictionio.connection import Connection
from predictionio.connection import AsyncRequest
+from predictionio.connection import AsyncResponse
from predictionio.connection import PredictionIOAPIError
-"""Error exception defined for this API
-Should be handled by the user
-"""
-
-
-class ServerStatusError(PredictionIOAPIError):
-
- "Error happened when tried to get status of the API server"
+class NotCreatedError(PredictionIOAPIError):
pass
-class UserNotCreatedError(PredictionIOAPIError):
-
- "Error happened when tried to create user"
+class NotFoundError(PredictionIOAPIError):
pass
-class UserNotFoundError(PredictionIOAPIError):
-
- "Error happened when tried to get user"
- pass
-
-
-class UserNotDeletedError(PredictionIOAPIError):
-
- "Error happened when tried to delete user"
- pass
-
-
-class ItemNotCreatedError(PredictionIOAPIError):
-
- "Error happened when tried to create item"
- pass
-
-
-class ItemNotFoundError(PredictionIOAPIError):
-
- "Error happened when tried to get item"
- pass
-
-
-class ItemNotDeletedError(PredictionIOAPIError):
-
- "Error happened when tried to delete item"
- pass
-
-
-class U2IActionNotCreatedError(PredictionIOAPIError):
-
- "Error happened when tried to create user-to-item action"
- pass
-
-
-class ItemRecNotFoundError(PredictionIOAPIError):
-
- "Error happened when tried to get item recommendation"
- pass
-
-
-class ItemSimNotFoundError(PredictionIOAPIError):
-
- "Error happened when tried to get similar items"
- pass
-
-class ItemRankNotFoundError(PredictionIOAPIError):
-
- "Error happened when treid to rank item"
- pass
-
-class InvalidArgumentError(PredictionIOAPIError):
-
- "Arguments are not valid"
- pass
-
-# map to API
-LIKE_API = "like"
-DISLIKE_API = "dislike"
-VIEW_API = "view"
-CONVERSION_API = "conversion"
-RATE_API = "rate"
-
-
-class Client(object):
-
- """PredictionIO client object.
-
- This is an object representing a PredictionIO's client. This object
- provides methods for making PredictionIO API requests.
-
- :param appkey: the App Key provided by PredictionIO.
- :param threads: number of threads to handle PredictionIO API requests.
- Must be >= 1.
- :param apiurl: the PredictionIO API URL path.
- :param apiversion: the PredictionIO API version. (optional)
- (eg. "", or "/v1")
- :param qsize: the max size of the request queue (optional).
- The asynchronous request becomes blocking once this size has been
- reached, until the queued requests are handled.
- Default value is 0, which means infinite queue size.
- :param timeout: timeout for HTTP connection attempts and requests in
- seconds (optional).
- Default value is 5.
-
+def event_time_validation(t):
+ """ Validate event_time according to EventAPI Specification.
"""
- def __init__(self, appkey, threads=1, apiurl="http://localhost:8000",
- apiversion="", qsize=0, timeout=5):
+ if t is None:
+ return datetime.now(pytz.utc)
+
+ if type(t) != datetime:
+ raise AttributeError("event_time must be datetime.datetime")
+
+ if t.tzinfo is None:
+ raise AttributeError("event_time must have tzinfo")
+
+ return t
+
+
+class BaseClient(object):
+ def __init__(self, url, threads=1, qsize=0, timeout=5):
"""Constructor of Client object.
"""
- self.appkey = appkey
self.threads = threads
- self.apiurl = apiurl
- self.apiversion = apiversion
+ self.url = url
self.qsize = qsize
self.timeout = timeout
# check connection type
https_pattern = r'^https://(.*)'
http_pattern = r'^http://(.*)'
- m = re.match(https_pattern, apiurl)
+ m = re.match(https_pattern, url)
self.https = True
if m is None: # not matching https
- m = re.match(http_pattern, apiurl)
+ m = re.match(http_pattern, url)
self.https = False
if m is None: # not matching http either
- raise InvalidArgumentError("apiurl is not valid: %s" % apiurl)
+ raise InvalidArgumentError("url is not valid: %s" % url)
self.host = m.group(1)
self._uid = None # identified uid
@@ -179,13 +102,6 @@
"""
return self._connection.pending_requests()
- def identify(self, uid):
- """Identify the uid
-
- :param uid: user id. type str.
- """
- self._uid = uid
-
def get_status(self):
"""Get the status of the PredictionIO API Server
@@ -197,1018 +113,337 @@
"""
path = "/"
request = AsyncRequest("GET", path)
- request.set_rfunc(self._aget_status_resp)
+ request.set_rfunc(self._aget_resp)
self._connection.make_request(request)
- result = self.aresp(request)
+ result = request.get_response()
return result
- def _aget_status_resp(self, response):
- """Handle the AsyncResponse of get status request"""
+ def _acreate_resp(self, response):
if response.error is not None:
- raise ServerStatusError("Exception happened: %s for request %s" %
+ raise NotCreatedError("Exception happened: %s for request %s" %
+ (response.error, response.request))
+ elif response.status != httplib.CREATED:
+ raise NotCreatedError("request: %s status: %s body: %s" %
+ (response.request, response.status,
+ response.body))
+
+ return response
+
+ def _aget_resp(self, response):
+ if response.error is not None:
+ raise NotFoundError("Exception happened: %s for request %s" %
(response.error, response.request))
elif response.status != httplib.OK:
- raise ServerStatusError("request: %s status: %s body: %s" %
+ raise NotFoundError("request: %s status: %s body: %s" %
(response.request, response.status,
response.body))
- # data = json.loads(response.body) # convert json string to dict
+ return response.json_body
+
+ def _adelete_resp(self, response):
+ if response.error is not None:
+ raise NotFoundError("Exception happened: %s for request %s" %
+ (response.error, response.request))
+ elif response.status != httplib.OK:
+ raise NotFoundError("request: %s status: %s body: %s" %
+ (response.request, response.status,
+ response.body))
+
return response.body
- def acreate_user(self, uid, params={}):
- """Asynchronously create a user.
- :param uid: user id. type str.
- :param params: optional attributes. type dictionary.
- For example,
- { 'custom': 'value', 'pio_inactive' : True,
- 'pio_latlng': [4.5,67.8] }
+class EventClient(BaseClient):
+ """Client for importing data into PredictionIO Event Server.
+
+ :param app_id: the id used to identify application data.
+ :param url: the url of PredictionIO Event Server.
+ :param threads: number of threads to handle PredictionIO API requests.
+ Must be >= 1.
+ :param qsize: the max size of the request queue (optional).
+ The asynchronous request becomes blocking once this size has been
+ reached, until the queued requests are handled.
+ Default value is 0, which means infinite queue size.
+ :param timeout: timeout for HTTP connection attempts and requests in
+ seconds (optional).
+ Default value is 5.
+
+ """
+
+ def __init__(self, app_id, url="http://localhost:7070",
+ threads=1, qsize=0, timeout=5):
+ super(EventClient, self).__init__(url, threads, qsize, timeout)
+ self.app_id = app_id
+
+ def acreate_event(self, event, entity_type, entity_id,
+ target_entity_type=None, target_entity_id=None, properties=None,
+ event_time=None):
+ """Asynchronously create an event.
+
+ :param event: event name. type str.
+ :param entity_type: entity type. It is the namespace of the entityId and
+ analogous to the table name of a relational database. The entityId must be
+ unique within same entityType. type str.
+ :param entity_id: entity id. *entity_type-entity_id* becomes the unique
+ identifier of the entity. For example, you may have entityType named user,
+ and different entity IDs, say 1 and 2. In this case, user-1 and user-2
+ uniquely identifies entities. type str
+ :param target_entity_type: target entity type. type str.
+ :param target_entity_id: target entity id. type str.
+ :param properties: a custom dict associated with an event. type dict.
+ :param event_time: the time of the event. type datetime, must contain
+ timezone info.
:returns:
- AsyncRequest object. You should call the aresp() method using this
- AsyncRequest object as argument to get the final result or status
- of this asynchronous request.
-
+ AsyncRequest object. You can call the get_response() method using this
+ object to get the final resuls or status of this asynchronous request.
"""
+ data = {
+ "appId": self.app_id,
+ "event": event,
+ "entityType": entity_type,
+ "entityId": entity_id,
+ }
- if "pio_latlng" in params:
- params["pio_latlng"] = ",".join(map(str, params["pio_latlng"]))
- if "pio_inactive" in params:
- params["pio_inactive"] = str(params["pio_inactive"]).lower()
+ if target_entity_type is not None:
+ data["targetEntityType"] = target_entity_type
- path = "%s/users.json" % self.apiversion
- request = AsyncRequest(
- "POST", path, pio_appkey=self.appkey, pio_uid=uid, **params)
- request.set_rfunc(self._acreate_user_resp)
- self._connection.make_request(request)
+ if target_entity_id is not None:
+ data["targetEntityId"] = target_entity_id
- return request
+ if properties is not None:
+ data["properties"] = properties
- def _acreate_user_resp(self, response):
- """Private function to handle the AsyncResponse of the acreate_user
- request.
-
- :param response: AsyncResponse object.
-
- :returns:
- None.
-
- :raises:
- UserNotCreatedError.
-
- """
- if response.error is not None:
- raise UserNotCreatedError("Exception happened: %s for request %s" %
- (response.error, response.request))
- elif response.status != httplib.CREATED:
- raise UserNotCreatedError("request: %s status: %s body: %s" %
- (response.request, response.status,
- response.body))
-
- return None
-
- def aget_user(self, uid):
- """Asynchronously get user.
-
- :param uid: user id. type str.
-
- :returns:
- AsyncRequest object. You should call the aresp() method using this
- AsyncRequest object as argument to get the final result or status
- of this asynchronous request.
- """
-
- enc_uid = urllib.quote(uid, "") # replace special char with %xx
- path = "%s/users/%s.json" % (self.apiversion, enc_uid)
- request = AsyncRequest("GET", path, pio_appkey=self.appkey)
- request.set_rfunc(self._aget_user_resp)
- self._connection.make_request(request)
-
- return request
-
- def _aget_user_resp(self, response):
- """Private function to handle the AsyncResponse of the aget_user
- request .
-
- :param response: AsyncResponse object.
-
- :returns:
- User data in Dictionary format.
-
- :rasies:
- UserNotFoundError.
-
- """
- if response.error is not None:
- raise UserNotFoundError("Exception happened: %s for request %s" %
- (response.error, response.request))
- elif response.status != httplib.OK:
- raise UserNotFoundError("request: %s status: %s body: %s" %
- (response.request, response.status,
- response.body))
-
- data = json.loads(response.body) # convert json string to dict
- return data
-
- def adelete_user(self, uid):
- """Asynchronously delete user.
-
- :param uid: user id. type str.
-
- :returns:
- AsyncRequest object. You should call the aresp() method using this
- AsyncRequest object as argument to get the final result or status
- of this asynchronous request.
- """
-
- enc_uid = urllib.quote(uid, "") # replace special char with %xx
- path = "%s/users/%s.json" % (self.apiversion, enc_uid)
- request = AsyncRequest("DELETE", path, pio_appkey=self.appkey)
- request.set_rfunc(self._adelete_user_resp)
- self._connection.make_request(request)
-
- return request
-
- def _adelete_user_resp(self, response):
- """Private function to handle the AsyncResponse of the adelete_user
- request.
-
- :param response: AsyncResponse object.
-
- :returns:
- None.
-
- :raises:
- UserNotDeletedError.
-
- """
- if response.error is not None:
- raise UserNotDeletedError("Exception happened: %s for request %s" %
- (response.error, response.request))
- elif response.status != httplib.OK:
- raise UserNotDeletedError("request: %s status: %s body: %s" %
- (response.request, response.status,
- response.body))
- return None
-
- def acreate_item(self, iid, itypes, params={}):
- """Asynchronously create item.
-
- :param iid: item id. type str.
- :param itypes: item types. Tuple of Str.
- For example, if this item belongs to item types "t1", "t2",
- "t3", "t4",then itypes=("t1", "t2", "t3", "t4").
- NOTE: if this item belongs to only one itype, use tuple of one
- element, eg. itypes=("t1",)
- :param params: optional attributes. type dictionary.
- For example,
- { 'custom': 'value', 'pio_inactive' : True,
- 'pio_latlng': [4.5,67.8] }
- :returns:
- AsyncRequest object. You should call the aresp() method using this
- AsyncRequest object as argument to get the final result or status
- of this asynchronous request.
- """
- itypes_str = ",".join(itypes) # join items with ","
-
- if "pio_latlng" in params:
- params["pio_latlng"] = ",".join(map(str, params["pio_latlng"]))
- if "pio_inactive" in params:
- params["pio_inactive"] = str(params["pio_inactive"]).lower()
-
- path = "%s/items.json" % self.apiversion
- request = AsyncRequest("POST", path, pio_appkey=self.appkey,
- pio_iid=iid, pio_itypes=itypes_str, **params)
- request.set_rfunc(self._acreate_item_resp)
+ et = event_time_validation(event_time)
+ # EventServer uses milliseconds, but python datetime class uses micro. Hence
+ # need to skip the last three digits.
+ et_str = et.strftime("%Y-%m-%dT%H:%M:%S.%f")[:-3] + et.strftime("%z")
+ data["eventTime"] = et_str
+
+ path = "/events.json"
+ request = AsyncRequest("POST", path, **data)
+ request.set_rfunc(self._acreate_resp)
self._connection.make_request(request)
return request
- def _acreate_item_resp(self, response):
- """Private function to handle the AsyncResponse of the acreate_item
- request
+ def create_event(self, event, entity_type, entity_id,
+ target_entity_type=None, target_entity_id=None, properties=None,
+ event_time=None):
+ """Synchronously (blocking) create an event."""
+ return self.acreate_event(event, entity_type, entity_id,
+ target_entity_type, target_entity_id, properties,
+ event_time).get_response()
- :param response: AsyncResponse object.
+ def aget_event(self, event_id):
+ """Asynchronouly get an event from Event Server.
+
+ :param event_id: event id returned by the EventServer when creating the
+ event.
:returns:
- None
- :raises:
- ItemNotCreatedError
-
+ AsyncRequest object.
"""
- if response.error is not None:
- raise ItemNotCreatedError("Exception happened: %s for request %s" %
- (response.error, response.request))
- elif response.status != httplib.CREATED:
- raise ItemNotCreatedError("request: %s status: %s body: %s" %
- (response.request, response.status,
- response.body))
- return None
-
- def aget_item(self, iid):
- """Asynchronously get item
-
- :param iid: item id. type str.
-
- :returns:
- AsyncRequest object. You should call the aresp() method using this
- AsyncRequest object as argument to get the final result or status
- of this asynchronous request.
- """
- enc_iid = urllib.quote(iid, "")
- path = "%s/items/%s.json" % (self.apiversion, enc_iid)
- request = AsyncRequest("GET", path, pio_appkey=self.appkey)
- request.set_rfunc(self._aget_item_resp)
+ enc_event_id = urllib.quote(event_id, "") # replace special char with %xx
+ path = "/events/%s.json" % enc_event_id
+ request = AsyncRequest("GET", path)
+ request.set_rfunc(self._aget_resp)
self._connection.make_request(request)
return request
- def _aget_item_resp(self, response):
- """Private function to handle the AsyncResponse of the aget_item
- request
+ def get_event(self, event_id):
+ """Synchronouly get an event from Event Server."""
+ return self.aget_event(event_id).get_response()
- :param response: AsyncResponse object.
+ def adelete_event(self, event_id):
+ """Asynchronouly delete an event from Event Server.
+
+ :param event_id: event id returned by the EventServer when creating the
+ event.
:returns:
- item data in dictionary format.
-
- :raises:
- ItemNotFoundError.
-
+ AsyncRequest object.
"""
- if response.error is not None:
- raise ItemNotFoundError("Exception happened: %s for request %s" %
- (response.error, response.request))
- elif response.status != httplib.OK:
- raise ItemNotFoundError("request: %s status: %s body: %s" %
- (response.request, response.status,
- response.body))
-
- data = json.loads(response.body) # convert json string to dict
- if "pio_itypes" in data:
- # convert from list to tuple
- data["pio_itypes"] = tuple(data["pio_itypes"])
-
- return data
-
- def adelete_item(self, iid):
- """Asynchronously delete item
-
- :param iid: item id. type str.
-
- :returns:
- AsyncRequest object. You should call the aresp() method using this
- AsyncRequest object as argument to get the final result or status
- of this asynchronous request.
- """
-
- enc_iid = urllib.quote(iid, "")
- path = "%s/items/%s.json" % (self.apiversion, enc_iid)
- request = AsyncRequest("DELETE", path, pio_appkey=self.appkey)
- request.set_rfunc(self._adelete_item_resp)
+ enc_event_id = urllib.quote(event_id, "") # replace special char with %xx
+ path = "/events/%s.json" % enc_event_id
+ request = AsyncRequest("DELETE", path)
+ request.set_rfunc(self._adelete_resp)
self._connection.make_request(request)
return request
- def _adelete_item_resp(self, response):
- """Private function to handle the AsyncResponse of the adelete_item
- request
+ def delete_event(self, event_id):
+ """Synchronouly delete an event from Event Server."""
+ return self.adelete_event(event_id).get_response()
- :param response: AsyncResponse object
+ ## Below are helper functions
+
+ def aset_user(self, uid, properties={}, event_time=None):
+ """Set properties of a user.
+
+ Wrapper of acreate_event function, setting event to "$set" and entity_type
+ to "pio_user".
+ """
+ return self.acreate_event(
+ event="$set",
+ entity_type="pio_user",
+ entity_id=uid,
+ properties=properties,
+ event_time=event_time,
+ )
+
+ def set_user(self, uid, properties={}, event_time=None):
+ """Set properties of a user"""
+ return self.aset_user(uid, properties, event_time).get_response()
+
+ def aunset_user(self, uid, properties, event_time=None):
+ """Unset properties of an user.
+
+ Wrapper of acreate_event function, setting event to "$unset" and entity_type
+ to "pio_user".
+ """
+ # check properties={}, it cannot be empty
+ return self.acreate_event(
+ event="$unset",
+ entity_type="pio_user",
+ entity_id=uid,
+ properties=properties,
+ event_time=event_time,
+ )
+
+ def unset_user(self, uid, properties, event_time=None):
+ """Set properties of a user"""
+ return self.aunset_user(uid, properties, event_time).get_response()
+
+ def adelete_user(self, uid, event_time=None):
+ """Delete a user.
+
+ Wrapper of acreate_event function, setting event to "$delete" and entity_type
+ to "pio_user".
+ """
+ return self.acreate_event(
+ event="$delete",
+ entity_type="pio_user",
+ entity_id=uid,
+ event_time=event_time)
+
+ def delete_user(self, uid, event_time=None):
+ """Delete a user."""
+ return self.adelete_user(uid, event_time).get_response()
+
+ def aset_item(self, iid, properties={}, event_time=None):
+ """Set properties of an item.
+
+ Wrapper of acreate_event function, setting event to "$set" and entity_type
+ to "pio_item".
+ """
+ return self.acreate_event(
+ event="$set",
+ entity_type="pio_item",
+ entity_id=iid,
+ properties=properties,
+ event_time=event_time)
+
+ def set_item(self, iid, properties={}, event_time=None):
+ """Set properties of an item."""
+ return self.aset_item(iid, properties, event_time).get_response()
+
+ def aunset_item(self, iid, properties={}, event_time=None):
+ """Unset properties of an item.
+
+ Wrapper of acreate_event function, setting event to "$unset" and entity_type
+ to "pio_item".
+ """
+ return self.acreate_event(
+ event="$unset",
+ entity_type="pio_item",
+ entity_id=iid,
+ properties=properties,
+ event_time=event_time)
+
+ def unset_item(self, iid, properties={}, event_time=None):
+ """Unset properties of an item."""
+ return self.aunset_item(iid, properties, event_time).get_response()
+
+ def adelete_item(self, iid, event_time=None):
+ """Delete an item.
+
+ Wrapper of acreate_event function, setting event to "$delete" and entity_type
+ to "pio_item".
+ """
+ return self.acreate_event(
+ event="$delete",
+ entity_type="pio_item",
+ entity_id=iid,
+ event_time=event_time)
+
+ def delete_item(self, iid, event_time=None):
+ """Delete an item."""
+ return self.adelete_item(iid, event_time).get_response()
+
+ def arecord_user_action_on_item(self, action, uid, iid, properties={},
+ event_time=None):
+ """Create a user-to-item action.
+
+ Wrapper of acreate_event function, setting entity_type to "pio_user" and
+ target_entity_type to "pio_item".
+ """
+ return self.acreate_event(
+ event=action,
+ entity_type="pio_user",
+ entity_id=uid,
+ target_entity_type="pio_item",
+ target_entity_id=iid,
+ properties=properties,
+ event_time=event_time)
+
+ def record_user_action_on_item(self, action, uid, iid, properties={},
+ event_time=None):
+ """Create a user-to-item action."""
+ return self.arecord_user_action_on_item(
+ action, uid, iid, properties, event_time).get_response()
+
+
+class EngineClient(BaseClient):
+ """Client for extracting prediction results from an PredictionIO Engine
+ Instance.
+
+ :param url: the url of the PredictionIO Engine Instance.
+ :param threads: number of threads to handle PredictionIO API requests.
+ Must be >= 1.
+ :param qsize: the max size of the request queue (optional).
+ The asynchronous request becomes blocking once this size has been
+ reached, until the queued requests are handled.
+ Default value is 0, which means infinite queue size.
+ :param timeout: timeout for HTTP connection attempts and requests in
+ seconds (optional).
+ Default value is 5.
+
+ """
+ def __init__(self, url="http://localhost:8000", threads=1,
+ qsize=0, timeout=5):
+ super(EngineClient, self).__init__(url, threads, qsize, timeout)
+
+ def asend_query(self, data):
+ """Asynchronously send a request to the engine instance with data as the
+ query.
+
+ :param data: the query: It is coverted to an json object using json.dumps
+ method. type dict.
:returns:
- None
-
- :raises:
- ItemNotDeletedError
+ AsyncRequest object. You can call the get_response() method using this
+ object to get the final resuls or status of this asynchronous request.
"""
- if response.error is not None:
- raise ItemNotDeletedError("Exception happened: %s for request %s" %
- (response.error, response.request))
- elif response.status != httplib.OK:
- raise ItemNotDeletedError("request: %s status: %s body: %s" %
- (response.request, response.status,
- response.body))
- return None
-
- def _aget_user_itemrec_topn(self, engine, uid, n, params={}):
- """Private function to asynchronously get recommendations for user
-
- :param engine: name of the prediction engine. type str.
- :param uid: user id. type str.
- :param n: number of recommendation. type int.
- :param params: optional parameters. type dictionary
- For example, { 'pio_itypes' : ("t1","t2") }
- :returns:
- AsyncRequest object. You should call the aresp() method using this
- AsyncRequest object as argument to get the final result or status
- of this asynchronous request.
- """
- if "pio_itypes" in params:
- params["pio_itypes"] = ",".join(params["pio_itypes"])
- if "pio_latlng" in params:
- params["pio_latlng"] = ",".join(map(str, params["pio_latlng"]))
- if "pio_attributes" in params:
- params["pio_attributes"] = ",".join(params["pio_attributes"])
-
- path = "%s/engines/itemrec/%s/topn.json" % (self.apiversion, engine)
- request = AsyncRequest("GET", path, pio_appkey=self.appkey,
- pio_uid=uid, pio_n=n, **params)
- request.set_rfunc(self._aget_user_itemrec_topn_resp)
+ path = "/queries.json"
+ request = AsyncRequest("POST", path, **data)
+ request.set_rfunc(self._aget_resp)
self._connection.make_request(request)
return request
- def _aget_user_itemrec_topn_resp(self, response):
- """Private function to handle the AsyncResponse of the aget_itemrec
- request
+ def send_query(self, data):
+ """Synchronously send a request.
+
+ :param data: the query: It is coverted to an json object using json.dumps
+ method. type dict.
- :param response: AsyncResponse object
-
- :returns:
- data in dictionary format.
-
- :raises:
- ItemRecNotFoundError.
+ :returns: the prediction.
"""
- if response.error is not None:
- raise ItemRecNotFoundError(
- "Exception happened: %s for request %s" %
- (response.error, response.request))
- elif response.status != httplib.OK:
- raise ItemRecNotFoundError("request: %s status: %s body: %s" %
- (response.request, response.status,
- response.body))
-
- data = json.loads(response.body) # convert json string to dict
- return data
-
- def aget_itemrec_topn(self, engine, n, params={}):
- """Asynchronously get recommendations for the identified user
-
- :param engine: name of the prediction engine. type str.
- :param n: number of recommendation. type int.
- :param params: optional parameters. type dictionary
- For example, { 'pio_itypes' : ("t1",) }
- :returns:
- AsyncRequest object. You should call the aresp() method using this
- AsyncRequest object as argument to get the final result or status
- of this asynchronous request.
- """
-
- if self._uid is None:
- raise InvalidArgumentError(
- "uid is not identified. Please call identify(uid) first.")
-
- request = self._aget_user_itemrec_topn(engine, self._uid, n, params)
- return request
-
- def aget_itemrec(self, uid, n, engine, **params):
- """Deprecated. Asynchronously get recommendations
-
- :param uid: user id. type str.
- :param n: number of recommendation. type int.
- :param engine: name of the prediction engine. type str.
- :param params: keyword arguments for optional attributes.
- For example, pio_latlng="123.4, 56.7"
- :returns:
- AsyncRequest object. You should call the aresp() method using this
- AsyncRequest object as argument to get the final result or status
- of this asynchronous request.
- """
- request = self._aget_user_itemrec_topn(engine, uid, n, params)
- return request
-
- def _aget_itemsim_topn(self, engine, iid, n, params={}):
- """Private function to asynchronously get top n similar items of the
- item
-
- :param engine: name of the prediction engine. type str.
- :param iid: item id. type str.
- :param n: number of similar items. type int.
- :param params: optional parameters. type dictionary
- For example, { 'pio_itypes' : ("t1","t2") }
- :returns:
- AsyncRequest object. You should call the aresp() method using this
- AsyncRequest object as argument to get the final result or status
- of this asynchronous request.
- """
- if "pio_itypes" in params:
- params["pio_itypes"] = ",".join(params["pio_itypes"])
- if "pio_latlng" in params:
- params["pio_latlng"] = ",".join(map(str, params["pio_latlng"]))
- if "pio_attributes" in params:
- params["pio_attributes"] = ",".join(params["pio_attributes"])
-
- path = "%s/engines/itemsim/%s/topn.json" % (self.apiversion, engine)
- request = AsyncRequest("GET", path, pio_appkey=self.appkey,
- pio_iid=iid, pio_n=n, **params)
- request.set_rfunc(self._aget_itemsim_topn_resp)
- self._connection.make_request(request)
- return request
-
- def _aget_itemsim_topn_resp(self, response):
- """Private function to handle the AsyncResponse of the aget_itemsim
- request
-
- :param response: AsyncResponse object
-
- :returns:
- data in dictionary format.
-
- :raises:
- ItemSimNotFoundError.
- """
- if response.error is not None:
- raise ItemSimNotFoundError(
- "Exception happened: %s for request %s" %
- (response.error, response.request))
- elif response.status != httplib.OK:
- raise ItemSimNotFoundError("request: %s status: %s body: %s" %
- (response.request, response.status,
- response.body))
-
- data = json.loads(response.body) # convert json string to dict
- return data
-
- def aget_itemsim_topn(self, engine, iid, n, params={}):
- """Asynchronously get top n similar items of the item
-
- :param engine: name of the prediction engine. type str.
- :param iid: item id. type str.
- :param n: number of similar items. type int.
- :param params: optional parameters. type dictionary
- For example, { 'pio_itypes' : ("t1",) }
- :returns:
- AsyncRequest object. You should call the aresp() method using this
- AsyncRequest object as argument to get the final result or status
- of this asynchronous request.
- """
-
- request = self._aget_itemsim_topn(engine, iid, n, params)
- return request
-
- def _aget_user_itemrank_ranked(self, engine, uid, iids, params={}):
- """Private function to asynchronously get ranked item for user
-
- :param engine: name of the prediction engine. type str.
- :param uid: user id. type str.
- :param iids: items to be ranked. type list of item ids.
- For example, ["i0", "i1", "i2"]
- :param params: optional parameters. type dictionary
- For example. { 'pio_attributes' : "name" }
- :returns:
- AsyncRequest object. You should call the aresp() method using this
- AsyncRequest object as argument to get the final result or status
- of this asynchronous request.
- """
- if "pio_attributes" in params:
- params["pio_attributes"] = ",".join(params["pio_attributes"])
-
- pio_iids = ",".join(iids)
-
- path = "%s/engines/itemrank/%s/ranked.json" % \
- (self.apiversion, engine)
- request = AsyncRequest("GET", path, pio_appkey=self.appkey,
- pio_uid=uid, pio_iids=pio_iids, **params)
- request.set_rfunc(self._aget_user_itemrank_ranked_resp)
- self._connection.make_request(request)
- return request
-
- def _aget_user_itemrank_ranked_resp(self, response):
- """Private function to handle the AsyncResponse of the aget_itemreoder
- request
-
- :param response: AsyncResponse object
-
- :returns:
- data in dictionary format.
-
- :raises:
- ItemRankNotFoundError.
- """
- if response.error is not None:
- raise ItemRankNotFoundError(
- "Exception happened: %s for request %s" %
- (response.error, response.request))
- elif response.status != httplib.OK:
- raise ItemRankNotFoundError("request: %s status: %s body: %s" %
- (response.request, response.status,
- response.body))
-
- data = json.loads(response.body) # convert json string to dict
- return data
-
- def aget_itemrank_ranked(self, engine, iids, params={}):
- """Asynchronously get ranked item for user
-
- :param engine: name of the prediction engine. type str.
- :param iids: items to be ranked. type list of item ids.
- For example, ["i0", "i1", "i2"]
- :param params: optional parameters. type dictionary
- For example. { 'pio_attributes' : "name" }
- :returns:
- AsyncRequest object. You should call the aresp() method using this
- AsyncRequest object as argument to get the final result or status
- of this asynchronous request.
- """
-
- if self._uid is None:
- raise InvalidArgumentError(
- "uid is not identified. Please call identify(uid) first.")
-
- request = self._aget_user_itemrank_ranked(engine,
- self._uid, iids, params)
- return request
-
- def _auser_action_on_item(self, action, uid, iid, params):
- """Private function to asynchronously create an user action on an item
-
- :param action: action type. type str. ("like", "dislike", "conversion",
- "rate", "view")
- :param uid: user id. type str or int.
- :param iid: item id. type str or int.
- :param params: optional attributes. type dictionary.
- For example, { 'pio_rate' : 4, 'pio_latlng' : [1.23,4.56] }
- NOTE: For "rate" action, pio_rate attribute is required.
- integer value of 1-5 (1 is least preferred and 5 is most
- preferred)
-
- :returns:
- AsyncRequest object. You should call the aresp() method using this
- AsyncRequest object as argument to get the final result or status
- of this asynchronous request.
- """
- if "pio_latlng" in params:
- params["pio_latlng"] = ",".join(map(str, params["pio_latlng"]))
-
- path = "%s/actions/u2i.json" % (self.apiversion)
- request = AsyncRequest("POST", path, pio_appkey=self.appkey,
- pio_action=action, pio_uid=uid, pio_iid=iid,
- **params)
- request.set_rfunc(self._auser_action_on_item_resp)
- self._connection.make_request(request)
- return request
-
- def _auser_action_on_item_resp(self, response):
- """Private function to handle the AsyncResponse of the
- _auser_action_on_item request
-
- :param response: AsyncResponse object
-
- :returns:
- None
-
- :raises:
- U2IActionNotCreatedError
- """
- if response.error is not None:
- raise U2IActionNotCreatedError(
- "Exception happened: %s for request %s" %
- (response.error, response.request))
- elif response.status != httplib.CREATED:
- raise U2IActionNotCreatedError("request: %s status: %s body: %s" %
- (response.request, response.status,
- response.body))
- return None
-
- def arecord_action_on_item(self, action, iid, params={}):
- """Asynchronously create action on item
-
- :param action: action name. type String. For example, "rate", "like",
- etc
- :param iid: item id. type str or int.
- :param params: optional attributes. type dictionary.
- For example, { 'pio_rate' : 4, 'pio_latlng': [4.5,67.8] }
- NOTE: For "rate" action, pio_rate attribute is required.
- integer value of 1-5 (1 is least preferred and 5 is most
- preferred)
-
- :returns:
- AsyncRequest object. You should call the aresp() method using this
- AsyncRequest object as argument to get the final result or status
- of this asynchronous request.
-
- :raises:
- U2IActionNotCreatedError
- """
-
- if self._uid is None:
- raise InvalidArgumentError(
- "uid is not identified. Please call identify(uid) first.")
-
- request = self._auser_action_on_item(action, self._uid, iid, params)
- return request
-
- def auser_conversion_item(self, uid, iid, **params):
- """Deprecated. Asynchronously create an user conversion action on an
- item
-
- :param uid: user id. type str.
- :param iid: item id. type str.
- :param params: keyword arguments for optional attributes.
- For example, pio_latlng=[123.4, 56.7]
- :returns:
- AsyncRequest object. You should call the aresp() method using this
- AsyncRequest object as argument to get the final result or status
- of this asynchronous request.
- """
- request = self._auser_action_on_item(CONVERSION_API, uid, iid, params)
- return request
-
- def auser_dislike_item(self, uid, iid, **params):
- """Deprecated. Asynchronously create an user dislike action on an item
-
- :param uid: user id. type str.
- :param iid: item id. type str.
- :param params: keyword arguments for optional attributes.
- For example, pio_latlng=[123.4, 56.7]
- :returns:
- AsyncRequest object. You should call the aresp() method using this
- AsyncRequest object as argument to get the final result or status
- of this asynchronous request.
- """
- request = self._auser_action_on_item(DISLIKE_API, uid, iid, params)
- return request
-
- def auser_like_item(self, uid, iid, **params):
- """Deprecated. Asynchronously create an user like action on an item
-
- :param uid: user id. type str.
- :param iid: item id. type str.
- :param params: keyword arguments for optional attributes.
- For example, pio_latlng=[123.4, 56.7]
- :returns:
- AsyncRequest object. You should call the aresp() method using this
- AsyncRequest object as argument to get the final result or status
- of this asynchronous request.
- """
- request = self._auser_action_on_item(LIKE_API, uid, iid, params)
- return request
-
- def auser_rate_item(self, uid, iid, rate, **params):
- """Deprecated. Asynchronously create an user rate action on an item
-
- :param uid: user id. type str.
- :param iid: item id. type str.
- :param rate: rating. integer value of 1-5 (1 is least preferred and 5
- is most preferred)
- :param params: keyword arguments for optional attributes.
- For example, pio_latlng=[123.4, 56.7]
- :returns:
- AsyncRequest object. You should call the aresp() method using this
- AsyncRequest object as argument to get the final result or status
- of this asynchronous request.
- """
-
- params['pio_rate'] = rate
- request = self._auser_action_on_item(RATE_API, uid, iid, params)
- return request
-
- def auser_view_item(self, uid, iid, **params):
- """Deprecated. Asynchronously create an user view action on an item
-
- :param uid: user id. type str.
- :param iid: item id. type str.
- :param params: keyword arguments for optional attributes.
- For example, pio_latlng=[123.4, 56.7]
- :returns:
- AsyncRequest object. You should call the aresp() method using this
- AsyncRequest object as argument to get the final result or status
- of this asynchronous request.
- """
- request = self._auser_action_on_item(VIEW_API, uid, iid, params)
- return request
-
- def aresp(self, request):
- """Get the result of the asynchronous request
-
- :param request: AsyncRequest object. This object must be returned by
- the asynchronous request function
- For example, to get the result of a aget_user()
- request, call this aresp() with the argument of
- AsyncRequest object returned by aget_user().
-
- :returns:
- The result of this AsyncRequest. The return type is the same as
- the return type of corresponding blocking request.
-
- For example,
-
- Calling aresp() with acreate_user() AsyncRequest returns the same
- type as create_user(), which is None.
-
- Calling aresp() with aget_user() AsyncRequest returns the same
- type as get_user(), which is dictionary data.
-
- :raises:
- Exception may be raised if there is error happened. The type of
- exception is the same as exception type of the correspdoning
- blocking request.
-
- For example,
-
- Calling aresp() with acreate_user() AsyncRequest may raise
- UserNotCreatedError exception.
-
- Calling aresp() with aget_user() AsyncRequest may raise
- UserNotFoundError exception.
-
- """
- response = request.get_response()
- result = request.rfunc(response)
- return result
-
- def create_user(self, uid, params={}):
- """Blocking request to create user
-
- :param uid: user id. type str.
- :param params: optional attributes. type dictionary.
- For example, { 'custom': 'value', 'pio_inactive' : True,
- 'pio_latlng': [4.5,67.8] }
-
- :returns:
- None.
-
- :raises:
- UserNotCreatedError.
-
- """
- request = self.acreate_user(uid, params)
- result = self.aresp(request)
- return result
-
- def get_user(self, uid):
- """Blocking request to get user
-
- :param uid: user id. type str or int.
-
- :returns:
- User data in Dictionary format.
-
- :rasies:
- UserNotFoundError.
-
- """
- request = self.aget_user(uid)
- result = self.aresp(request)
- return result
-
- def delete_user(self, uid):
- """Blocking request to delete the user
-
- :param uid: user id. type str.
-
- :returns:
- None.
-
- :raises:
- UserNotDeletedError.
-
- """
- request = self.adelete_user(uid)
- result = self.aresp(request)
- return result
-
- def create_item(self, iid, itypes, params={}):
- """Blocking request to create item
-
- :param iid: item id. type str.
- :param itypes: item types. Tuple of Str.
- For example, if this item belongs to item types "t1", "t2",
- "t3", "t4", then itypes=("t1", "t2", "t3", "t4").
- NOTE: if this item belongs to only one itype, use tuple of one
- element, eg. itypes=("t1",)
- :param params: optional attributes. type dictionary.
- For example,
- { 'custom': 'value', 'pio_inactive' : True,
- 'pio_latlng': [4.5,67.8] }
-
- :returns:
- None
-
- :raises:
- ItemNotCreatedError
-
- """
- request = self.acreate_item(iid, itypes, params)
- result = self.aresp(request)
- return result
-
- def get_item(self, iid):
- """Blocking request to get item
-
- :param iid: item id. type str.
-
- :returns:
- item data in dictionary format.
-
- :raises:
- ItemNotFoundError.
-
- """
- request = self.aget_item(iid)
- result = self.aresp(request)
- return result
-
- def delete_item(self, iid):
- """Blocking request to delete item
-
- :param iid: item id. type str.
-
- :returns:
- None
-
- :raises:
- ItemNotDeletedError
-
- """
- request = self.adelete_item(iid)
- result = self.aresp(request)
- return result
-
- def get_itemrec_topn(self, engine, n, params={}):
- """Blocking request to get recommendations for the identified user
-
- :param engine: name of the prediction engine. type str.
- :param n: number of recommendation. type int.
- :param params: optional parameters. type dictionary
- For example, { 'pio_itypes' : ("t1", "t2") }
- :returns:
- data in dictionary format.
-
- :raises:
- ItemRecNotFoundError.
- """
- request = self.aget_itemrec_topn(engine, n, params)
- result = self.aresp(request)
- return result
-
- def get_itemrec(self, uid, n, engine, **params):
- """Deprecated. Blocking request to get recommendations
-
- :param uid: user id. type str or int.
- :param n: number of recommendation. type int.
- :param engine: name of the prediction engine. type str.
- :param params: keyword arguments for optional attributes.
- For example, pio_latlng=[123.4, 56.7]
-
- :returns:
- data in dictionary format.
-
- :raises:
- ItemRecNotFoundError.
-
- """
- request = self.aget_itemrec(uid, n, engine, **params)
- result = self.aresp(request)
- return result
-
- def get_itemsim_topn(self, engine, iid, n, params={}):
- """Blocking request to get top n similar items of the item
-
- :param engine: name of the prediction engine. type str.
- :param iid: item id. type str.
- :param n: number of similar items. type int.
- :param params: optional parameters. type dictionary
- For example, { 'pio_itypes' : ("t1",) }
- :returns:
- data in dictionary format.
-
- :raises:
- ItemSimNotFoundError.
- """
-
- request = self.aget_itemsim_topn(engine, iid, n, params)
- result = self.aresp(request)
- return result
-
- def get_itemrank_ranked(self, engine, iids, params={}):
- """Blocking request to get ranked item for user
-
- :param engine: name of the prediction engine. type str.
- :param iids: items to be ranked. type list of item ids.
- For example, ["i0", "i1", "i2"]
- :param params: optional parameters. type dictionary
- For example. { 'pio_attributes' : "name" }
- :returns:
- data in dictionary format.
-
- :raises:
- ItemRankNotFoundError.
- """
- request = self.aget_itemrank_ranked(engine, iids, params)
- result = self.aresp(request)
- return result
-
- def record_action_on_item(self, action, iid, params={}):
- """Blocking request to create action on an item
-
- :param action: action name. type String. For example, "rate", "like",
- etc
- :param iid: item id. type str.
- :param params: optional attributes. type dictionary.
- For example, { 'pio_rate' : 4, 'pio_latlng' : [1.23,4.56] }
- NOTE: For "rate" action, pio_rate attribute is required.
- integer value of 1-5 (1 is least preferred and 5 is most
- preferred)
-
- :returns:
- None
-
- :raises:
- U2IActionNotCreatedError
- """
- request = self.arecord_action_on_item(action, iid, params)
- result = self.aresp(request)
- return result
-
- def user_conversion_item(self, uid, iid, **params):
- """Deprecated. Blocking request to create user conversion action on an
- item
-
- :param uid: user id. type str.
- :param iid: item id. type str.
- :param params: keyword arguments for optional attributes.
- For example, pio_latlng=[123.4, 56.7]
-
- :returns:
- None
-
- :raises:
- U2IActionNotCreatedError
- """
- request = self.auser_conversion_item(uid, iid, **params)
- result = self.aresp(request)
- return result
-
- def user_dislike_item(self, uid, iid, **params):
- """Deprecated. Blocking request to create user dislike action on an
- item
-
- :param uid: user id. type str.
- :param iid: item id. type str.
- :param params: keyword arguments for optional attributes.
- For example, pio_latlng=[123.4, 56.7]
-
- :returns:
- None
-
- :raises:
- U2IActionNotCreatedError
- """
- request = self.auser_dislike_item(uid, iid, **params)
- result = self.aresp(request)
- return result
-
- def user_like_item(self, uid, iid, **params):
- """Deprecated. Blocking request to create user like action on an item
-
- :param uid: user id. type str.
- :param iid: item id. type str.
- :param params: keyword arguments for optional attributes.
- For example, pio_latlng=[123.4, 56.7]
-
- :returns:
- None
-
- :raises:
- U2IActionNotCreatedError
- """
- request = self.auser_like_item(uid, iid, **params)
- result = self.aresp(request)
- return result
-
- def user_rate_item(self, uid, iid, rate, **params):
- """Deprecated. Blocking request to create user rate action on an item
-
- :param uid: user id. type str.
- :param iid: item id. type str.
- :param rate: rating. integer value of 1-5 (1 is least preferred and 5
- is most preferred)
- :param params: keyword arguments for optional attributes.
- For example, pio_latlng=[123.4, 56.7]
-
- :returns:
- None
-
- :raises:
- U2IActionNotCreatedError
- """
- request = self.auser_rate_item(uid, iid, rate, **params)
- result = self.aresp(request)
- return result
-
- def user_view_item(self, uid, iid, **params):
- """Deprecated. Blocking request to create user view action on an item
-
- :param uid: user id. type str.
- :param iid: item id. type str.
- :param params: keyword arguments for optional attributes.
- For example, pio_latlng=[123.4, 56.7]
-
- :returns:
- None
-
- :raises:
- U2IActionNotCreatedError
- """
- request = self.auser_view_item(uid, iid, **params)
- result = self.aresp(request)
- return result
+ return self.asend_query(data).get_response()
diff --git a/predictionio/connection.py b/predictionio/connection.py
index e124c5b..9609047 100644
--- a/predictionio/connection.py
+++ b/predictionio/connection.py
@@ -20,6 +20,7 @@
from urllib.parse import urlencode
import datetime
+import json
import logging
# use generators for python2 and python3
@@ -100,44 +101,43 @@
self.response_q.put(response)
def get_response(self):
- """get the response
+ """Get the response. Blocking.
+ :returns: self.rfunc's return type.
"""
if self._response is None:
- self._response = self.response_q.get(True) # NOTE: blocking
+ tmp_response = self.response_q.get(True) # NOTE: blocking
+ if self.rfunc is None:
+ self._response = tmp_response
+ else:
+ self._response = self.rfunc(tmp_response)
return self._response
class AsyncResponse(object):
+ """Store the response of asynchronous request
- """AsyncResponse object.
-
- Store the response of asynchronous request
-
- when get the response, user should check if error is None (which means no
- Exception happens)
- if error is None, then should check if the status is expected
-
- Attributes:
- error: exception object if any happens
- version: int
- status: int
- reason: str
- headers: dict
- body: str (NOTE: not necessarily can be converted to JSON,
- eg, for GET request to /v1/status)
- request: the corresponding AsyncRequest object
+ When get the response, user should check if error is None (which means no
+ Exception happens).
+ If error is None, then should check if the status is expected.
"""
def __init__(self):
+ #: exception object if any happens
self.error = None
+
self.version = None
self.status = None
self.reason = None
+ #: Response header. str
self.headers = None
+ #: Response body. str
self.body = None
- self.request = None # point back to the request object
+ #: Jsonified response body. Remains None if conversion is unsuccessful.
+ self.json_body = None
+ #: Point back to the AsyncRequest object
+ self.request = None
def __str__(self):
return "e:%s v:%s s:%s r:%s h:%s b:%s" % (self.error, self.version,
@@ -150,6 +150,11 @@
self.reason = reason
self.headers = headers
self.body = body
+ # Try to extract the json.
+ try:
+ self.json_body = json.loads(body)
+ except ValueError, ex:
+ self.json_body = None
def set_error(self, error):
self.error = error
@@ -194,9 +199,12 @@
mod_headers["Connection"] = "keep-alive"
enc_body = None
if body: # if body is not empty
- enc_body = urlencode(body)
+ #enc_body = urlencode(body)
+ #mod_headers[
+ # "Content-type"] = "application/x-www-form-urlencoded"
+ enc_body = json.dumps(body)
mod_headers[
- "Content-type"] = "application/x-www-form-urlencoded"
+ "Content-type"] = "application/json"
#mod_headers["Accept"] = "text/plain"
except Exception as e:
response.set_error(e)
diff --git a/predictionio/obsolete.py b/predictionio/obsolete.py
new file mode 100644
index 0000000..dc19a09
--- /dev/null
+++ b/predictionio/obsolete.py
@@ -0,0 +1,1205 @@
+# import packages
+import re
+try:
+ import httplib
+except ImportError:
+ # pylint: disable=F0401
+ # http is a Python3 module, replacing httplib
+ from http import client as httplib
+import json
+import urllib
+import warnings
+
+from predictionio.connection import Connection
+from predictionio.connection import AsyncRequest
+from predictionio.connection import PredictionIOAPIError
+
+"""Error exception defined for this API
+
+Should be handled by the user
+"""
+
+
+class ServerStatusError(PredictionIOAPIError):
+
+ "Error happened when tried to get status of the API server"
+ pass
+
+
+class UserNotCreatedError(PredictionIOAPIError):
+
+ "Error happened when tried to create user"
+ pass
+
+
+class UserNotFoundError(PredictionIOAPIError):
+
+ "Error happened when tried to get user"
+ pass
+
+
+class UserNotDeletedError(PredictionIOAPIError):
+
+ "Error happened when tried to delete user"
+ pass
+
+
+class ItemNotCreatedError(PredictionIOAPIError):
+
+ "Error happened when tried to create item"
+ pass
+
+
+class ItemNotFoundError(PredictionIOAPIError):
+
+ "Error happened when tried to get item"
+ pass
+
+
+class ItemNotDeletedError(PredictionIOAPIError):
+
+ "Error happened when tried to delete item"
+ pass
+
+
+class U2IActionNotCreatedError(PredictionIOAPIError):
+
+ "Error happened when tried to create user-to-item action"
+ pass
+
+
+class ItemRecNotFoundError(PredictionIOAPIError):
+
+ "Error happened when tried to get item recommendation"
+ pass
+
+
+class ItemSimNotFoundError(PredictionIOAPIError):
+
+ "Error happened when tried to get similar items"
+ pass
+
+class ItemRankNotFoundError(PredictionIOAPIError):
+
+ "Error happened when treid to rank item"
+ pass
+
+class InvalidArgumentError(PredictionIOAPIError):
+
+ "Arguments are not valid"
+ pass
+
+# map to API
+LIKE_API = "like"
+DISLIKE_API = "dislike"
+VIEW_API = "view"
+CONVERSION_API = "conversion"
+RATE_API = "rate"
+
+
+class Client(object):
+
+ """PredictionIO client object.
+
+ This is an object representing a PredictionIO's client. This object
+ provides methods for making PredictionIO API requests.
+
+ :param appkey: the App Key provided by PredictionIO.
+ :param threads: number of threads to handle PredictionIO API requests.
+ Must be >= 1.
+ :param apiurl: the PredictionIO API URL path.
+ :param apiversion: the PredictionIO API version. (optional)
+ (eg. "", or "/v1")
+ :param qsize: the max size of the request queue (optional).
+ The asynchronous request becomes blocking once this size has been
+ reached, until the queued requests are handled.
+ Default value is 0, which means infinite queue size.
+ :param timeout: timeout for HTTP connection attempts and requests in
+ seconds (optional).
+ Default value is 5.
+
+ """
+
+ def __init__(self, appkey, threads=1, apiurl="http://localhost:8000",
+ apiversion="", qsize=0, timeout=5):
+ """Constructor of Client object.
+
+ """
+ warnings.warn("predictionio.Client is deprecated. " +
+ "Please consider upgrading to our latest version.")
+
+ self.appkey = appkey
+ self.threads = threads
+ self.apiurl = apiurl
+ self.apiversion = apiversion
+ self.qsize = qsize
+ self.timeout = timeout
+
+ # check connection type
+ https_pattern = r'^https://(.*)'
+ http_pattern = r'^http://(.*)'
+ m = re.match(https_pattern, apiurl)
+ self.https = True
+ if m is None: # not matching https
+ m = re.match(http_pattern, apiurl)
+ self.https = False
+ if m is None: # not matching http either
+ raise InvalidArgumentError("apiurl is not valid: %s" % apiurl)
+ self.host = m.group(1)
+
+ self._uid = None # identified uid
+ self._connection = Connection(host=self.host, threads=self.threads,
+ qsize=self.qsize, https=self.https,
+ timeout=self.timeout)
+
+ def close(self):
+ """Close this client and the connection.
+
+ Call this method when you want to completely terminate the connection
+ with PredictionIO.
+ It will wait for all pending requests to finish.
+ """
+ self._connection.close()
+
+ def pending_requests(self):
+ """Return the number of pending requests.
+
+ :returns:
+ The number of pending requests of this client.
+ """
+ return self._connection.pending_requests()
+
+ def identify(self, uid):
+ """Identify the uid
+
+ :param uid: user id. type str.
+ """
+ self._uid = uid
+
+ def get_status(self):
+ """Get the status of the PredictionIO API Server
+
+ :returns:
+ status message.
+
+ :raises:
+ ServerStatusError.
+ """
+ path = "/"
+ request = AsyncRequest("GET", path)
+ request.set_rfunc(self._aget_status_resp)
+ self._connection.make_request(request)
+ result = self.aresp(request)
+ return result
+
+ def _aget_status_resp(self, response):
+ """Handle the AsyncResponse of get status request"""
+ if response.error is not None:
+ raise ServerStatusError("Exception happened: %s for request %s" %
+ (response.error, response.request))
+ elif response.status != httplib.OK:
+ raise ServerStatusError("request: %s status: %s body: %s" %
+ (response.request, response.status,
+ response.body))
+
+ # data = json.loads(response.body) # convert json string to dict
+ return response.body
+
+ def acreate_user(self, uid, params={}):
+ """Asynchronously create a user.
+
+ :param uid: user id. type str.
+ :param params: optional attributes. type dictionary.
+ For example,
+ { 'custom': 'value', 'pio_inactive' : True,
+ 'pio_latlng': [4.5,67.8] }
+
+ :returns:
+ AsyncRequest object. You should call the aresp() method using this
+ AsyncRequest object as argument to get the final result or status
+ of this asynchronous request.
+
+ """
+
+ if "pio_latlng" in params:
+ params["pio_latlng"] = ",".join(map(str, params["pio_latlng"]))
+ if "pio_inactive" in params:
+ params["pio_inactive"] = str(params["pio_inactive"]).lower()
+
+ path = "%s/users.json" % self.apiversion
+ request = AsyncRequest(
+ "POST", path, pio_appkey=self.appkey, pio_uid=uid, **params)
+ request.set_rfunc(self._acreate_user_resp)
+ self._connection.make_request(request)
+
+ return request
+
+ def _acreate_user_resp(self, response):
+ """Private function to handle the AsyncResponse of the acreate_user
+ request.
+
+ :param response: AsyncResponse object.
+
+ :returns:
+ None.
+
+ :raises:
+ UserNotCreatedError.
+
+ """
+ if response.error is not None:
+ raise UserNotCreatedError("Exception happened: %s for request %s" %
+ (response.error, response.request))
+ elif response.status != httplib.CREATED:
+ raise UserNotCreatedError("request: %s status: %s body: %s" %
+ (response.request, response.status,
+ response.body))
+
+ return None
+
+ def aget_user(self, uid):
+ """Asynchronously get user.
+
+ :param uid: user id. type str.
+
+ :returns:
+ AsyncRequest object. You should call the aresp() method using this
+ AsyncRequest object as argument to get the final result or status
+ of this asynchronous request.
+ """
+
+ enc_uid = urllib.quote(uid, "") # replace special char with %xx
+ path = "%s/users/%s.json" % (self.apiversion, enc_uid)
+ request = AsyncRequest("GET", path, pio_appkey=self.appkey)
+ request.set_rfunc(self._aget_user_resp)
+ self._connection.make_request(request)
+
+ return request
+
+ def _aget_user_resp(self, response):
+ """Private function to handle the AsyncResponse of the aget_user
+ request .
+
+ :param response: AsyncResponse object.
+
+ :returns:
+ User data in Dictionary format.
+
+ :rasies:
+ UserNotFoundError.
+
+ """
+ if response.error is not None:
+ raise UserNotFoundError("Exception happened: %s for request %s" %
+ (response.error, response.request))
+ elif response.status != httplib.OK:
+ raise UserNotFoundError("request: %s status: %s body: %s" %
+ (response.request, response.status,
+ response.body))
+
+ data = json.loads(response.body) # convert json string to dict
+ return data
+
+ def adelete_user(self, uid):
+ """Asynchronously delete user.
+
+ :param uid: user id. type str.
+
+ :returns:
+ AsyncRequest object. You should call the aresp() method using this
+ AsyncRequest object as argument to get the final result or status
+ of this asynchronous request.
+ """
+
+ enc_uid = urllib.quote(uid, "") # replace special char with %xx
+ path = "%s/users/%s.json" % (self.apiversion, enc_uid)
+ request = AsyncRequest("DELETE", path, pio_appkey=self.appkey)
+ request.set_rfunc(self._adelete_user_resp)
+ self._connection.make_request(request)
+
+ return request
+
+ def _adelete_user_resp(self, response):
+ """Private function to handle the AsyncResponse of the adelete_user
+ request.
+
+ :param response: AsyncResponse object.
+
+ :returns:
+ None.
+
+ :raises:
+ UserNotDeletedError.
+
+ """
+ if response.error is not None:
+ raise UserNotDeletedError("Exception happened: %s for request %s" %
+ (response.error, response.request))
+ elif response.status != httplib.OK:
+ raise UserNotDeletedError("request: %s status: %s body: %s" %
+ (response.request, response.status,
+ response.body))
+ return None
+
+ def acreate_item(self, iid, itypes, params={}):
+ """Asynchronously create item.
+
+ :param iid: item id. type str.
+ :param itypes: item types. Tuple of Str.
+ For example, if this item belongs to item types "t1", "t2",
+ "t3", "t4",then itypes=("t1", "t2", "t3", "t4").
+ NOTE: if this item belongs to only one itype, use tuple of one
+ element, eg. itypes=("t1",)
+ :param params: optional attributes. type dictionary.
+ For example,
+ { 'custom': 'value', 'pio_inactive' : True,
+ 'pio_latlng': [4.5,67.8] }
+ :returns:
+ AsyncRequest object. You should call the aresp() method using this
+ AsyncRequest object as argument to get the final result or status
+ of this asynchronous request.
+ """
+ itypes_str = ",".join(itypes) # join items with ","
+
+ if "pio_latlng" in params:
+ params["pio_latlng"] = ",".join(map(str, params["pio_latlng"]))
+ if "pio_inactive" in params:
+ params["pio_inactive"] = str(params["pio_inactive"]).lower()
+
+ path = "%s/items.json" % self.apiversion
+ request = AsyncRequest("POST", path, pio_appkey=self.appkey,
+ pio_iid=iid, pio_itypes=itypes_str, **params)
+ request.set_rfunc(self._acreate_item_resp)
+ self._connection.make_request(request)
+ return request
+
+ def _acreate_item_resp(self, response):
+ """Private function to handle the AsyncResponse of the acreate_item
+ request
+
+ :param response: AsyncResponse object.
+
+ :returns:
+ None
+ :raises:
+ ItemNotCreatedError
+
+ """
+ if response.error is not None:
+ raise ItemNotCreatedError("Exception happened: %s for request %s" %
+ (response.error, response.request))
+ elif response.status != httplib.CREATED:
+ raise ItemNotCreatedError("request: %s status: %s body: %s" %
+ (response.request, response.status,
+ response.body))
+ return None
+
+ def aget_item(self, iid):
+ """Asynchronously get item
+
+ :param iid: item id. type str.
+
+ :returns:
+ AsyncRequest object. You should call the aresp() method using this
+ AsyncRequest object as argument to get the final result or status
+ of this asynchronous request.
+ """
+ enc_iid = urllib.quote(iid, "")
+ path = "%s/items/%s.json" % (self.apiversion, enc_iid)
+ request = AsyncRequest("GET", path, pio_appkey=self.appkey)
+ request.set_rfunc(self._aget_item_resp)
+ self._connection.make_request(request)
+ return request
+
+ def _aget_item_resp(self, response):
+ """Private function to handle the AsyncResponse of the aget_item
+ request
+
+ :param response: AsyncResponse object.
+
+ :returns:
+ item data in dictionary format.
+
+ :raises:
+ ItemNotFoundError.
+
+ """
+ if response.error is not None:
+ raise ItemNotFoundError("Exception happened: %s for request %s" %
+ (response.error, response.request))
+ elif response.status != httplib.OK:
+ raise ItemNotFoundError("request: %s status: %s body: %s" %
+ (response.request, response.status,
+ response.body))
+
+ data = json.loads(response.body) # convert json string to dict
+ if "pio_itypes" in data:
+ # convert from list to tuple
+ data["pio_itypes"] = tuple(data["pio_itypes"])
+
+ return data
+
+ def adelete_item(self, iid):
+ """Asynchronously delete item
+
+ :param iid: item id. type str.
+
+ :returns:
+ AsyncRequest object. You should call the aresp() method using this
+ AsyncRequest object as argument to get the final result or status
+ of this asynchronous request.
+ """
+
+ enc_iid = urllib.quote(iid, "")
+ path = "%s/items/%s.json" % (self.apiversion, enc_iid)
+ request = AsyncRequest("DELETE", path, pio_appkey=self.appkey)
+ request.set_rfunc(self._adelete_item_resp)
+ self._connection.make_request(request)
+ return request
+
+ def _adelete_item_resp(self, response):
+ """Private function to handle the AsyncResponse of the adelete_item
+ request
+
+ :param response: AsyncResponse object
+
+ :returns:
+ None
+
+ :raises:
+ ItemNotDeletedError
+ """
+ if response.error is not None:
+ raise ItemNotDeletedError("Exception happened: %s for request %s" %
+ (response.error, response.request))
+ elif response.status != httplib.OK:
+ raise ItemNotDeletedError("request: %s status: %s body: %s" %
+ (response.request, response.status,
+ response.body))
+ return None
+
+ def _aget_user_itemrec_topn(self, engine, uid, n, params={}):
+ """Private function to asynchronously get recommendations for user
+
+ :param engine: name of the prediction engine. type str.
+ :param uid: user id. type str.
+ :param n: number of recommendation. type int.
+ :param params: optional parameters. type dictionary
+ For example, { 'pio_itypes' : ("t1","t2") }
+ :returns:
+ AsyncRequest object. You should call the aresp() method using this
+ AsyncRequest object as argument to get the final result or status
+ of this asynchronous request.
+ """
+ if "pio_itypes" in params:
+ params["pio_itypes"] = ",".join(params["pio_itypes"])
+ if "pio_latlng" in params:
+ params["pio_latlng"] = ",".join(map(str, params["pio_latlng"]))
+ if "pio_attributes" in params:
+ params["pio_attributes"] = ",".join(params["pio_attributes"])
+
+ path = "%s/engines/itemrec/%s/topn.json" % (self.apiversion, engine)
+ request = AsyncRequest("GET", path, pio_appkey=self.appkey,
+ pio_uid=uid, pio_n=n, **params)
+ request.set_rfunc(self._aget_user_itemrec_topn_resp)
+ self._connection.make_request(request)
+ return request
+
+ def _aget_user_itemrec_topn_resp(self, response):
+ """Private function to handle the AsyncResponse of the aget_itemrec
+ request
+
+ :param response: AsyncResponse object
+
+ :returns:
+ data in dictionary format.
+
+ :raises:
+ ItemRecNotFoundError.
+ """
+ if response.error is not None:
+ raise ItemRecNotFoundError(
+ "Exception happened: %s for request %s" %
+ (response.error, response.request))
+ elif response.status != httplib.OK:
+ raise ItemRecNotFoundError("request: %s status: %s body: %s" %
+ (response.request, response.status,
+ response.body))
+
+ data = json.loads(response.body) # convert json string to dict
+ return data
+
+ def aget_itemrec_topn(self, engine, n, params={}):
+ """Asynchronously get recommendations for the identified user
+
+ :param engine: name of the prediction engine. type str.
+ :param n: number of recommendation. type int.
+ :param params: optional parameters. type dictionary
+ For example, { 'pio_itypes' : ("t1",) }
+ :returns:
+ AsyncRequest object. You should call the aresp() method using this
+ AsyncRequest object as argument to get the final result or status
+ of this asynchronous request.
+ """
+
+ if self._uid is None:
+ raise InvalidArgumentError(
+ "uid is not identified. Please call identify(uid) first.")
+
+ request = self._aget_user_itemrec_topn(engine, self._uid, n, params)
+ return request
+
+ def aget_itemrec(self, uid, n, engine, **params):
+ """Deprecated. Asynchronously get recommendations
+
+ :param uid: user id. type str.
+ :param n: number of recommendation. type int.
+ :param engine: name of the prediction engine. type str.
+ :param params: keyword arguments for optional attributes.
+ For example, pio_latlng="123.4, 56.7"
+ :returns:
+ AsyncRequest object. You should call the aresp() method using this
+ AsyncRequest object as argument to get the final result or status
+ of this asynchronous request.
+ """
+ request = self._aget_user_itemrec_topn(engine, uid, n, params)
+ return request
+
+ def _aget_itemsim_topn(self, engine, iid, n, params={}):
+ """Private function to asynchronously get top n similar items of the
+ item
+
+ :param engine: name of the prediction engine. type str.
+ :param iid: item id. type str.
+ :param n: number of similar items. type int.
+ :param params: optional parameters. type dictionary
+ For example, { 'pio_itypes' : ("t1","t2") }
+ :returns:
+ AsyncRequest object. You should call the aresp() method using this
+ AsyncRequest object as argument to get the final result or status
+ of this asynchronous request.
+ """
+ if "pio_itypes" in params:
+ params["pio_itypes"] = ",".join(params["pio_itypes"])
+ if "pio_latlng" in params:
+ params["pio_latlng"] = ",".join(map(str, params["pio_latlng"]))
+ if "pio_attributes" in params:
+ params["pio_attributes"] = ",".join(params["pio_attributes"])
+
+ path = "%s/engines/itemsim/%s/topn.json" % (self.apiversion, engine)
+ request = AsyncRequest("GET", path, pio_appkey=self.appkey,
+ pio_iid=iid, pio_n=n, **params)
+ request.set_rfunc(self._aget_itemsim_topn_resp)
+ self._connection.make_request(request)
+ return request
+
+ def _aget_itemsim_topn_resp(self, response):
+ """Private function to handle the AsyncResponse of the aget_itemsim
+ request
+
+ :param response: AsyncResponse object
+
+ :returns:
+ data in dictionary format.
+
+ :raises:
+ ItemSimNotFoundError.
+ """
+ if response.error is not None:
+ raise ItemSimNotFoundError(
+ "Exception happened: %s for request %s" %
+ (response.error, response.request))
+ elif response.status != httplib.OK:
+ raise ItemSimNotFoundError("request: %s status: %s body: %s" %
+ (response.request, response.status,
+ response.body))
+
+ data = json.loads(response.body) # convert json string to dict
+ return data
+
+ def aget_itemsim_topn(self, engine, iid, n, params={}):
+ """Asynchronously get top n similar items of the item
+
+ :param engine: name of the prediction engine. type str.
+ :param iid: item id. type str.
+ :param n: number of similar items. type int.
+ :param params: optional parameters. type dictionary
+ For example, { 'pio_itypes' : ("t1",) }
+ :returns:
+ AsyncRequest object. You should call the aresp() method using this
+ AsyncRequest object as argument to get the final result or status
+ of this asynchronous request.
+ """
+
+ request = self._aget_itemsim_topn(engine, iid, n, params)
+ return request
+
+ def _aget_user_itemrank_ranked(self, engine, uid, iids, params={}):
+ """Private function to asynchronously get ranked item for user
+
+ :param engine: name of the prediction engine. type str.
+ :param uid: user id. type str.
+ :param iids: items to be ranked. type list of item ids.
+ For example, ["i0", "i1", "i2"]
+ :param params: optional parameters. type dictionary
+ For example. { 'pio_attributes' : "name" }
+ :returns:
+ AsyncRequest object. You should call the aresp() method using this
+ AsyncRequest object as argument to get the final result or status
+ of this asynchronous request.
+ """
+ if "pio_attributes" in params:
+ params["pio_attributes"] = ",".join(params["pio_attributes"])
+
+ pio_iids = ",".join(iids)
+
+ path = "%s/engines/itemrank/%s/ranked.json" % \
+ (self.apiversion, engine)
+ request = AsyncRequest("GET", path, pio_appkey=self.appkey,
+ pio_uid=uid, pio_iids=pio_iids, **params)
+ request.set_rfunc(self._aget_user_itemrank_ranked_resp)
+ self._connection.make_request(request)
+ return request
+
+ def _aget_user_itemrank_ranked_resp(self, response):
+ """Private function to handle the AsyncResponse of the aget_itemreoder
+ request
+
+ :param response: AsyncResponse object
+
+ :returns:
+ data in dictionary format.
+
+ :raises:
+ ItemRankNotFoundError.
+ """
+ if response.error is not None:
+ raise ItemRankNotFoundError(
+ "Exception happened: %s for request %s" %
+ (response.error, response.request))
+ elif response.status != httplib.OK:
+ raise ItemRankNotFoundError("request: %s status: %s body: %s" %
+ (response.request, response.status,
+ response.body))
+
+ data = json.loads(response.body) # convert json string to dict
+ return data
+
+ def aget_itemrank_ranked(self, engine, iids, params={}):
+ """Asynchronously get ranked item for user
+
+ :param engine: name of the prediction engine. type str.
+ :param iids: items to be ranked. type list of item ids.
+ For example, ["i0", "i1", "i2"]
+ :param params: optional parameters. type dictionary
+ For example. { 'pio_attributes' : "name" }
+ :returns:
+ AsyncRequest object. You should call the aresp() method using this
+ AsyncRequest object as argument to get the final result or status
+ of this asynchronous request.
+ """
+
+ if self._uid is None:
+ raise InvalidArgumentError(
+ "uid is not identified. Please call identify(uid) first.")
+
+ request = self._aget_user_itemrank_ranked(engine,
+ self._uid, iids, params)
+ return request
+
+ def _auser_action_on_item(self, action, uid, iid, params):
+ """Private function to asynchronously create an user action on an item
+
+ :param action: action type. type str. ("like", "dislike", "conversion",
+ "rate", "view")
+ :param uid: user id. type str or int.
+ :param iid: item id. type str or int.
+ :param params: optional attributes. type dictionary.
+ For example, { 'pio_rate' : 4, 'pio_latlng' : [1.23,4.56] }
+ NOTE: For "rate" action, pio_rate attribute is required.
+ integer value of 1-5 (1 is least preferred and 5 is most
+ preferred)
+
+ :returns:
+ AsyncRequest object. You should call the aresp() method using this
+ AsyncRequest object as argument to get the final result or status
+ of this asynchronous request.
+ """
+ if "pio_latlng" in params:
+ params["pio_latlng"] = ",".join(map(str, params["pio_latlng"]))
+
+ path = "%s/actions/u2i.json" % (self.apiversion)
+ request = AsyncRequest("POST", path, pio_appkey=self.appkey,
+ pio_action=action, pio_uid=uid, pio_iid=iid,
+ **params)
+ request.set_rfunc(self._auser_action_on_item_resp)
+ self._connection.make_request(request)
+ return request
+
+ def _auser_action_on_item_resp(self, response):
+ """Private function to handle the AsyncResponse of the
+ _auser_action_on_item request
+
+ :param response: AsyncResponse object
+
+ :returns:
+ None
+
+ :raises:
+ U2IActionNotCreatedError
+ """
+ if response.error is not None:
+ raise U2IActionNotCreatedError(
+ "Exception happened: %s for request %s" %
+ (response.error, response.request))
+ elif response.status != httplib.CREATED:
+ raise U2IActionNotCreatedError("request: %s status: %s body: %s" %
+ (response.request, response.status,
+ response.body))
+ return None
+
+ def arecord_action_on_item(self, action, iid, params={}):
+ """Asynchronously create action on item
+
+ :param action: action name. type String. For example, "rate", "like",
+ etc
+ :param iid: item id. type str or int.
+ :param params: optional attributes. type dictionary.
+ For example, { 'pio_rate' : 4, 'pio_latlng': [4.5,67.8] }
+ NOTE: For "rate" action, pio_rate attribute is required.
+ integer value of 1-5 (1 is least preferred and 5 is most
+ preferred)
+
+ :returns:
+ AsyncRequest object. You should call the aresp() method using this
+ AsyncRequest object as argument to get the final result or status
+ of this asynchronous request.
+
+ :raises:
+ U2IActionNotCreatedError
+ """
+
+ if self._uid is None:
+ raise InvalidArgumentError(
+ "uid is not identified. Please call identify(uid) first.")
+
+ request = self._auser_action_on_item(action, self._uid, iid, params)
+ return request
+
+ def auser_conversion_item(self, uid, iid, **params):
+ """Deprecated. Asynchronously create an user conversion action on an
+ item
+
+ :param uid: user id. type str.
+ :param iid: item id. type str.
+ :param params: keyword arguments for optional attributes.
+ For example, pio_latlng=[123.4, 56.7]
+ :returns:
+ AsyncRequest object. You should call the aresp() method using this
+ AsyncRequest object as argument to get the final result or status
+ of this asynchronous request.
+ """
+ request = self._auser_action_on_item(CONVERSION_API, uid, iid, params)
+ return request
+
+ def auser_dislike_item(self, uid, iid, **params):
+ """Deprecated. Asynchronously create an user dislike action on an item
+
+ :param uid: user id. type str.
+ :param iid: item id. type str.
+ :param params: keyword arguments for optional attributes.
+ For example, pio_latlng=[123.4, 56.7]
+ :returns:
+ AsyncRequest object. You should call the aresp() method using this
+ AsyncRequest object as argument to get the final result or status
+ of this asynchronous request.
+ """
+ request = self._auser_action_on_item(DISLIKE_API, uid, iid, params)
+ return request
+
+ def auser_like_item(self, uid, iid, **params):
+ """Deprecated. Asynchronously create an user like action on an item
+
+ :param uid: user id. type str.
+ :param iid: item id. type str.
+ :param params: keyword arguments for optional attributes.
+ For example, pio_latlng=[123.4, 56.7]
+ :returns:
+ AsyncRequest object. You should call the aresp() method using this
+ AsyncRequest object as argument to get the final result or status
+ of this asynchronous request.
+ """
+ request = self._auser_action_on_item(LIKE_API, uid, iid, params)
+ return request
+
+ def auser_rate_item(self, uid, iid, rate, **params):
+ """Deprecated. Asynchronously create an user rate action on an item
+
+ :param uid: user id. type str.
+ :param iid: item id. type str.
+ :param rate: rating. integer value of 1-5 (1 is least preferred and 5
+ is most preferred)
+ :param params: keyword arguments for optional attributes.
+ For example, pio_latlng=[123.4, 56.7]
+ :returns:
+ AsyncRequest object. You should call the aresp() method using this
+ AsyncRequest object as argument to get the final result or status
+ of this asynchronous request.
+ """
+
+ params['pio_rate'] = rate
+ request = self._auser_action_on_item(RATE_API, uid, iid, params)
+ return request
+
+ def auser_view_item(self, uid, iid, **params):
+ """Deprecated. Asynchronously create an user view action on an item
+
+ :param uid: user id. type str.
+ :param iid: item id. type str.
+ :param params: keyword arguments for optional attributes.
+ For example, pio_latlng=[123.4, 56.7]
+ :returns:
+ AsyncRequest object. You should call the aresp() method using this
+ AsyncRequest object as argument to get the final result or status
+ of this asynchronous request.
+ """
+ request = self._auser_action_on_item(VIEW_API, uid, iid, params)
+ return request
+
+ def aresp(self, request):
+ """Get the result of the asynchronous request
+
+ :param request: AsyncRequest object. This object must be returned by
+ the asynchronous request function
+ For example, to get the result of a aget_user()
+ request, call this aresp() with the argument of
+ AsyncRequest object returned by aget_user().
+
+ :returns:
+ The result of this AsyncRequest. The return type is the same as
+ the return type of corresponding blocking request.
+
+ For example,
+
+ Calling aresp() with acreate_user() AsyncRequest returns the same
+ type as create_user(), which is None.
+
+ Calling aresp() with aget_user() AsyncRequest returns the same
+ type as get_user(), which is dictionary data.
+
+ :raises:
+ Exception may be raised if there is error happened. The type of
+ exception is the same as exception type of the correspdoning
+ blocking request.
+
+ For example,
+
+ Calling aresp() with acreate_user() AsyncRequest may raise
+ UserNotCreatedError exception.
+
+ Calling aresp() with aget_user() AsyncRequest may raise
+ UserNotFoundError exception.
+
+ """
+ response = request.get_response()
+ result = request.rfunc(response)
+ return result
+
+ def create_user(self, uid, params={}):
+ """Blocking request to create user
+
+ :param uid: user id. type str.
+ :param params: optional attributes. type dictionary.
+ For example, { 'custom': 'value', 'pio_inactive' : True,
+ 'pio_latlng': [4.5,67.8] }
+
+ :returns:
+ None.
+
+ :raises:
+ UserNotCreatedError.
+
+ """
+ request = self.acreate_user(uid, params)
+ result = self.aresp(request)
+ return result
+
+ def get_user(self, uid):
+ """Blocking request to get user
+
+ :param uid: user id. type str or int.
+
+ :returns:
+ User data in Dictionary format.
+
+ :rasies:
+ UserNotFoundError.
+
+ """
+ request = self.aget_user(uid)
+ result = self.aresp(request)
+ return result
+
+ def delete_user(self, uid):
+ """Blocking request to delete the user
+
+ :param uid: user id. type str.
+
+ :returns:
+ None.
+
+ :raises:
+ UserNotDeletedError.
+
+ """
+ request = self.adelete_user(uid)
+ result = self.aresp(request)
+ return result
+
+ def create_item(self, iid, itypes, params={}):
+ """Blocking request to create item
+
+ :param iid: item id. type str.
+ :param itypes: item types. Tuple of Str.
+ For example, if this item belongs to item types "t1", "t2",
+ "t3", "t4", then itypes=("t1", "t2", "t3", "t4").
+ NOTE: if this item belongs to only one itype, use tuple of one
+ element, eg. itypes=("t1",)
+ :param params: optional attributes. type dictionary.
+ For example,
+ { 'custom': 'value', 'pio_inactive' : True,
+ 'pio_latlng': [4.5,67.8] }
+
+ :returns:
+ None
+
+ :raises:
+ ItemNotCreatedError
+
+ """
+ request = self.acreate_item(iid, itypes, params)
+ result = self.aresp(request)
+ return result
+
+ def get_item(self, iid):
+ """Blocking request to get item
+
+ :param iid: item id. type str.
+
+ :returns:
+ item data in dictionary format.
+
+ :raises:
+ ItemNotFoundError.
+
+ """
+ request = self.aget_item(iid)
+ result = self.aresp(request)
+ return result
+
+ def delete_item(self, iid):
+ """Blocking request to delete item
+
+ :param iid: item id. type str.
+
+ :returns:
+ None
+
+ :raises:
+ ItemNotDeletedError
+
+ """
+ request = self.adelete_item(iid)
+ result = self.aresp(request)
+ return result
+
+ def get_itemrec_topn(self, engine, n, params={}):
+ """Blocking request to get recommendations for the identified user
+
+ :param engine: name of the prediction engine. type str.
+ :param n: number of recommendation. type int.
+ :param params: optional parameters. type dictionary
+ For example, { 'pio_itypes' : ("t1", "t2") }
+ :returns:
+ data in dictionary format.
+
+ :raises:
+ ItemRecNotFoundError.
+ """
+ request = self.aget_itemrec_topn(engine, n, params)
+ result = self.aresp(request)
+ return result
+
+ def get_itemrec(self, uid, n, engine, **params):
+ """Deprecated. Blocking request to get recommendations
+
+ :param uid: user id. type str or int.
+ :param n: number of recommendation. type int.
+ :param engine: name of the prediction engine. type str.
+ :param params: keyword arguments for optional attributes.
+ For example, pio_latlng=[123.4, 56.7]
+
+ :returns:
+ data in dictionary format.
+
+ :raises:
+ ItemRecNotFoundError.
+
+ """
+ request = self.aget_itemrec(uid, n, engine, **params)
+ result = self.aresp(request)
+ return result
+
+ def get_itemsim_topn(self, engine, iid, n, params={}):
+ """Blocking request to get top n similar items of the item
+
+ :param engine: name of the prediction engine. type str.
+ :param iid: item id. type str.
+ :param n: number of similar items. type int.
+ :param params: optional parameters. type dictionary
+ For example, { 'pio_itypes' : ("t1",) }
+ :returns:
+ data in dictionary format.
+
+ :raises:
+ ItemSimNotFoundError.
+ """
+
+ request = self.aget_itemsim_topn(engine, iid, n, params)
+ result = self.aresp(request)
+ return result
+
+ def get_itemrank_ranked(self, engine, iids, params={}):
+ """Blocking request to get ranked item for user
+
+ :param engine: name of the prediction engine. type str.
+ :param iids: items to be ranked. type list of item ids.
+ For example, ["i0", "i1", "i2"]
+ :param params: optional parameters. type dictionary
+ For example. { 'pio_attributes' : "name" }
+ :returns:
+ data in dictionary format.
+
+ :raises:
+ ItemRankNotFoundError.
+ """
+ request = self.aget_itemrank_ranked(engine, iids, params)
+ result = self.aresp(request)
+ return result
+
+ def record_action_on_item(self, action, iid, params={}):
+ """Blocking request to create action on an item
+
+ :param action: action name. type String. For example, "rate", "like",
+ etc
+ :param iid: item id. type str.
+ :param params: optional attributes. type dictionary.
+ For example, { 'pio_rate' : 4, 'pio_latlng' : [1.23,4.56] }
+ NOTE: For "rate" action, pio_rate attribute is required.
+ integer value of 1-5 (1 is least preferred and 5 is most
+ preferred)
+
+ :returns:
+ None
+
+ :raises:
+ U2IActionNotCreatedError
+ """
+ request = self.arecord_action_on_item(action, iid, params)
+ result = self.aresp(request)
+ return result
+
+ def user_conversion_item(self, uid, iid, **params):
+ """Deprecated. Blocking request to create user conversion action on an
+ item
+
+ :param uid: user id. type str.
+ :param iid: item id. type str.
+ :param params: keyword arguments for optional attributes.
+ For example, pio_latlng=[123.4, 56.7]
+
+ :returns:
+ None
+
+ :raises:
+ U2IActionNotCreatedError
+ """
+ request = self.auser_conversion_item(uid, iid, **params)
+ result = self.aresp(request)
+ return result
+
+ def user_dislike_item(self, uid, iid, **params):
+ """Deprecated. Blocking request to create user dislike action on an
+ item
+
+ :param uid: user id. type str.
+ :param iid: item id. type str.
+ :param params: keyword arguments for optional attributes.
+ For example, pio_latlng=[123.4, 56.7]
+
+ :returns:
+ None
+
+ :raises:
+ U2IActionNotCreatedError
+ """
+ request = self.auser_dislike_item(uid, iid, **params)
+ result = self.aresp(request)
+ return result
+
+ def user_like_item(self, uid, iid, **params):
+ """Deprecated. Blocking request to create user like action on an item
+
+ :param uid: user id. type str.
+ :param iid: item id. type str.
+ :param params: keyword arguments for optional attributes.
+ For example, pio_latlng=[123.4, 56.7]
+
+ :returns:
+ None
+
+ :raises:
+ U2IActionNotCreatedError
+ """
+ request = self.auser_like_item(uid, iid, **params)
+ result = self.aresp(request)
+ return result
+
+ def user_rate_item(self, uid, iid, rate, **params):
+ """Deprecated. Blocking request to create user rate action on an item
+
+ :param uid: user id. type str.
+ :param iid: item id. type str.
+ :param rate: rating. integer value of 1-5 (1 is least preferred and 5
+ is most preferred)
+ :param params: keyword arguments for optional attributes.
+ For example, pio_latlng=[123.4, 56.7]
+
+ :returns:
+ None
+
+ :raises:
+ U2IActionNotCreatedError
+ """
+ request = self.auser_rate_item(uid, iid, rate, **params)
+ result = self.aresp(request)
+ return result
+
+ def user_view_item(self, uid, iid, **params):
+ """Deprecated. Blocking request to create user view action on an item
+
+ :param uid: user id. type str.
+ :param iid: item id. type str.
+ :param params: keyword arguments for optional attributes.
+ For example, pio_latlng=[123.4, 56.7]
+
+ :returns:
+ None
+
+ :raises:
+ U2IActionNotCreatedError
+ """
+ request = self.auser_view_item(uid, iid, **params)
+ result = self.aresp(request)
+ return result
+