Merge branch 'develop' for 0.9.2 release
diff --git a/.DS_Store b/.DS_Store
new file mode 100644
index 0000000..af48d0a
--- /dev/null
+++ b/.DS_Store
Binary files differ
diff --git a/docs/source/conf.py b/docs/source/conf.py
index cbcb864..0d87655 100644
--- a/docs/source/conf.py
+++ b/docs/source/conf.py
@@ -52,9 +52,9 @@
# built documents.
#
# The short X.Y version.
-version = '0.8'
+version = '0.9'
# The full version, including alpha/beta/rc tags.
-release = '0.8.3'
+release = '0.9.2'
# The language for content autogenerated by Sphinx. Refer to documentation
# for a list of supported languages.
diff --git a/docs/source/predictionio.rst b/docs/source/predictionio.rst
index 265ff51..99ecbeb 100644
--- a/docs/source/predictionio.rst
+++ b/docs/source/predictionio.rst
@@ -3,15 +3,18 @@
.. automodule:: predictionio
-The SDK comprises of two clients:
+The SDK comprises of two clients:
-1. EventClient, it is for importing data into the PredictionIO platform.
+1. EventClient, it is for importing data into the PredictionIO platform.
2. EngineClient, it is for querying PredictionIO Engine Instance, submit query
and extract prediction results.
-Please read `PredictionIO Quick Start
-<http://docs.prediction.io/0.8.3/recommendation/quickstart.html>`_ for
-detailed explanation.
+The SDK also provides a FileExporter for you to write events to a JSON file
+in the same way as EventClient. The JSON file can be used by "pio import"
+for batch data import.
+
+Please read `PredictionIO Event API <http://docs.prediction.io/datacollection/eventapi/>`_ for explanation of
+how SDK can be used to import events.
predictionio.EventClient Class
------------------------------
@@ -38,7 +41,7 @@
handling concurrent requests (although setting "threads" to 1 will still
work). The optimal setting depends on your system and application
requirement.
-
+
predictionio.EngineClient Class
------------------------------
@@ -46,12 +49,21 @@
.. autoclass:: EngineClient
:members:
+
predictionio.AsyncRequest Class
------------------------------
.. autoclass:: AsyncRequest
:members:
+predictionio.FileExporter Class
+-------------------------------
+
+.. versionadded:: 0.9.2
+.. autoclass:: FileExporter
+ :members:
+
+
predictionio SDK Usage Notes
-------------------------
@@ -78,8 +90,7 @@
In some cases you may not care whether the request is successful for performance or application-specific reasons, then you can simply skip step 2.
.. note::
- If you do care about the request status or need to get the return data, then at a later time you will need to call :meth:`~Client.aresp` with the AsyncRequest object returned in step 1.
- Please refer to the documentation of :ref:`asynchronous request methods <async-methods-label>` for more details.
+ If you do care about the request status or need to get the return data, then at a later time you will need to call :meth:`~AsyncRequest.get_response` with the AsyncRequest object returned in step 1.
For example, the following code first generates an asynchronous request to
retrieve recommendations, then get the result at later time::
@@ -94,7 +105,7 @@
>>> <log the error>
-Batch Import Data
+Batch Import Data with EventClient
^^^^^^^^^^^^^^^^^^^^^
When you import large amount of data at once, you may also use asynchronous
@@ -121,4 +132,12 @@
>>> except:
>>> <log the error>
+Batch Import Data with FileExporter and "pio import"
+^^^^^^^^^^^^^^^^^^^^^^^
+.. versionadded:: 0.9.2
+
+You can use FileExporter to create events and write to a JSON file which can
+be used by "pio import". Pleas see `Importing Data in Batch <http://docs.prediction.io/datacollection/batchimport/>`_ for more details.
+
+Note that this method is much faster than batch import with EventClient.
diff --git a/examples/event_fileexport.py b/examples/event_fileexport.py
new file mode 100644
index 0000000..0f86d71
--- /dev/null
+++ b/examples/event_fileexport.py
@@ -0,0 +1,39 @@
+import predictionio
+from datetime import datetime
+import pytz
+
+exporter = predictionio.FileExporter(file_name="test.json")
+
+first_event_properties = {
+ "prop1" : 1,
+ "prop2" : "value2",
+ "prop3" : [1, 2, 3],
+ "prop4" : True,
+ "prop5" : ["a", "b", "c"],
+ "prop6" : 4.56 ,
+ }
+first_event_time = datetime(
+ 2004, 12, 13, 21, 39, 45, 618000, pytz.timezone('US/Mountain'))
+exporter.create_event(
+ event="my_event",
+ entity_type="user",
+ entity_id="uid",
+ properties=first_event_properties,
+ event_time=first_event_time,
+ )
+
+# Second event
+second_event_properties = {
+ "someProperty" : "value1",
+ "anotherProperty" : "value2",
+ }
+exporter.create_event(
+ event="my_event",
+ entity_type="user",
+ entity_id="uid",
+ target_entity_type="item",
+ target_entity_id="iid",
+ properties=second_event_properties,
+ event_time=datetime(2014, 12, 13, 21, 38, 45, 618000, pytz.utc))
+
+exporter.close()
diff --git a/examples/event_sample.py b/examples/event_sample.py
index 0a7a339..9c3fe25 100644
--- a/examples/event_sample.py
+++ b/examples/event_sample.py
@@ -98,7 +98,7 @@
print("Delete user")
print(client.delete_user("foo"))
-# The SDK also support specifying the eventTime. It is useful for importing
+# The SDK also support specifying the eventTime. It is useful for importing
# events happened in the past.
foo_time = datetime(2014, 8, 31, 4, 56, tzinfo=pytz.timezone('US/Pacific'))
print("Create user at " + str(foo_time))
diff --git a/examples/event_sample_channel.py b/examples/event_sample_channel.py
new file mode 100644
index 0000000..125086a
--- /dev/null
+++ b/examples/event_sample_channel.py
@@ -0,0 +1,82 @@
+# this is example of import events to a specific channel of an App
+
+from predictionio import EventClient
+from predictionio import NotFoundError
+from datetime import datetime
+import pytz
+import sys
+
+access_key = None
+channel = None
+
+assert access_key is not None, "Please create an access key with 'pio app new'"
+# Need to create channel first before
+assert channel is not None, "Please create new channel with 'pio app channel-new'"
+
+client = EventClient(access_key=access_key, url="http://localhost:7070",
+ channel=channel)
+
+# Check status
+print("Check status")
+print(client.get_status())
+
+# First event
+first_event_properties = {
+ "prop1" : 1,
+ "prop2" : "value2",
+ "prop3" : [1, 2, 3],
+ "prop4" : True,
+ "prop5" : ["a", "b", "c"],
+ "prop6" : 4.56 ,
+ }
+first_event_time = datetime(
+ 2004, 12, 13, 21, 39, 45, 618000, pytz.timezone('US/Mountain'))
+first_event_response = client.create_event(
+ event="my_event",
+ entity_type="user",
+ entity_id="uid",
+ properties=first_event_properties,
+ event_time=first_event_time,
+ )
+print("First Event response")
+print(first_event_response)
+print
+
+# Second event
+second_event_properties = {
+ "someProperty" : "value1",
+ "anotherProperty" : "value2",
+ }
+second_event_response = client.create_event(
+ event="my_event",
+ entity_type="user",
+ entity_id="uid",
+ target_entity_type="item",
+ target_entity_id="iid",
+ properties=second_event_properties,
+ event_time=datetime(2014, 12, 13, 21, 38, 45, 618000, pytz.utc))
+print("Second Event response")
+print(second_event_response)
+print
+
+
+# Get the first event from Event Server
+first_event_id = first_event_response.json_body["eventId"]
+print("Get Event")
+event = client.get_event(first_event_id)
+print(event)
+print
+
+# Delete the first event from Event Server
+print("Delete Event")
+delete_response = client.delete_event(first_event_id)
+print(delete_response)
+print
+
+# Delete the first event from Event Server again should yield exception.
+print("Delete Event Again")
+try:
+ delete_response = client.delete_event(first_event_id)
+except NotFoundError, ex:
+ print("The expected error: {0}".format(ex))
+print
diff --git a/predictionio/__init__.py b/predictionio/__init__.py
index 45a379f..8f40a4f 100644
--- a/predictionio/__init__.py
+++ b/predictionio/__init__.py
@@ -5,7 +5,7 @@
"""
-__version__ = "0.8.3"
+__version__ = "0.9.2"
# import deprecated libraries.
from predictionio.obsolete import Client
@@ -18,6 +18,13 @@
# pylint: disable=F0401
# http is a Python3 module, replacing httplib
from http import client as httplib
+
+try:
+ from urllib import urlencode
+except ImportError:
+ # pylint: disable=F0401,E0611
+ from urllib.parse import urlencode
+
import json
import urllib
@@ -165,11 +172,12 @@
:param timeout: timeout for HTTP connection attempts and requests in
seconds (optional).
Default value is 5.
+ :param channel: channel name (optional)
"""
- def __init__(self, access_key,
+ def __init__(self, access_key,
url="http://localhost:7070",
- threads=1, qsize=0, timeout=5):
+ threads=1, qsize=0, timeout=5, channel=None):
assert type(access_key) is str, ("access_key must be string. "
"Notice that app_id has been deprecated in Prediction.IO 0.8.2. "
"Please use access_key instead.")
@@ -183,6 +191,7 @@
"you may use an earlier version of this sdk.")
self.access_key = access_key
+ self.channel = channel
def acreate_event(self, event, entity_type, entity_id,
target_entity_type=None, target_entity_id=None, properties=None,
@@ -201,7 +210,7 @@
:param target_entity_id: target entity id. type str.
:param properties: a custom dict associated with an event. type dict.
:param event_time: the time of the event. type datetime, must contain
- timezone info.
+ timezone info.
:returns:
AsyncRequest object. You can call the get_response() method using this
@@ -227,8 +236,16 @@
# need to skip the last three digits.
et_str = et.strftime("%Y-%m-%dT%H:%M:%S.%f")[:-3] + et.strftime("%z")
data["eventTime"] = et_str
-
- path = "/events.json?accessKey=%s" % (self.access_key, )
+
+ qparam = {
+ "accessKey" : self.access_key
+ }
+
+ if self.channel is not None:
+ qparam["channel"] = self.channel
+
+ path = "/events.json?%s" % (urlencode(qparam), )
+
request = AsyncRequest("POST", path, **data)
request.set_rfunc(self._acreate_resp)
self._connection.make_request(request)
@@ -239,7 +256,7 @@
event_time=None):
"""Synchronously (blocking) create an event."""
return self.acreate_event(event, entity_type, entity_id,
- target_entity_type, target_entity_id, properties,
+ target_entity_type, target_entity_id, properties,
event_time).get_response()
def aget_event(self, event_id):
@@ -249,11 +266,18 @@
event.
:returns:
- AsyncRequest object.
+ AsyncRequest object.
"""
+ qparam = {
+ "accessKey" : self.access_key
+ }
+
+ if self.channel is not None:
+ qparam["channel"] = self.channel
+
enc_event_id = urllib.quote(event_id, "") # replace special char with %xx
- path = "/events/%s.json" % enc_event_id
- request = AsyncRequest("GET", path, accessKey=self.access_key)
+ path = "/events/%s.json" % (enc_event_id, )
+ request = AsyncRequest("GET", path, **qparam)
request.set_rfunc(self._aget_resp)
self._connection.make_request(request)
return request
@@ -269,11 +293,18 @@
event.
:returns:
- AsyncRequest object.
+ AsyncRequest object.
"""
+ qparam = {
+ "accessKey" : self.access_key
+ }
+
+ if self.channel is not None:
+ qparam["channel"] = self.channel
+
enc_event_id = urllib.quote(event_id, "") # replace special char with %xx
path = "/events/%s.json" % (enc_event_id, )
- request = AsyncRequest("DELETE", path, accessKey=self.access_key)
+ request = AsyncRequest("DELETE", path, **qparam)
request.set_rfunc(self._adelete_resp)
self._connection.make_request(request)
return request
@@ -286,7 +317,7 @@
def aset_user(self, uid, properties={}, event_time=None):
"""Set properties of a user.
-
+
Wrapper of acreate_event function, setting event to "$set" and entity_type
to "user".
"""
@@ -304,7 +335,7 @@
def aunset_user(self, uid, properties, event_time=None):
"""Unset properties of an user.
-
+
Wrapper of acreate_event function, setting event to "$unset" and entity_type
to "user".
"""
@@ -323,7 +354,7 @@
def adelete_user(self, uid, event_time=None):
"""Delete a user.
-
+
Wrapper of acreate_event function, setting event to "$delete" and entity_type
to "user".
"""
@@ -339,7 +370,7 @@
def aset_item(self, iid, properties={}, event_time=None):
"""Set properties of an item.
-
+
Wrapper of acreate_event function, setting event to "$set" and entity_type
to "item".
"""
@@ -356,7 +387,7 @@
def aunset_item(self, iid, properties={}, event_time=None):
"""Unset properties of an item.
-
+
Wrapper of acreate_event function, setting event to "$unset" and entity_type
to "item".
"""
@@ -373,7 +404,7 @@
def adelete_item(self, iid, event_time=None):
"""Delete an item.
-
+
Wrapper of acreate_event function, setting event to "$delete" and entity_type
to "item".
"""
@@ -413,7 +444,7 @@
class EngineClient(BaseClient):
"""Client for extracting prediction results from an PredictionIO Engine
Instance.
-
+
:param url: the url of the PredictionIO Engine Instance.
:param threads: number of threads to handle PredictionIO API requests.
Must be >= 1.
@@ -424,7 +455,7 @@
:param timeout: timeout for HTTP connection attempts and requests in
seconds (optional).
Default value is 5.
-
+
"""
def __init__(self, url="http://localhost:8000", threads=1,
qsize=0, timeout=5):
@@ -435,7 +466,7 @@
query.
:param data: the query: It is coverted to an json object using json.dumps
- method. type dict.
+ method. type dict.
:returns:
AsyncRequest object. You can call the get_response() method using this
@@ -449,10 +480,60 @@
def send_query(self, data):
"""Synchronously send a request.
-
+
:param data: the query: It is coverted to an json object using json.dumps
- method. type dict.
+ method. type dict.
:returns: the prediction.
"""
return self.asend_query(data).get_response()
+
+class FileExporter(object):
+ """File exporter to write events to JSON file for batch import
+
+ :param file_name: the destination file name
+ """
+ def __init__(self, file_name):
+ """Constructor of Exporter.
+
+ """
+ self._file = open(file_name, 'w')
+
+ def create_event(self, event, entity_type, entity_id,
+ target_entity_type=None, target_entity_id=None, properties=None,
+ event_time=None):
+ """Create an event and write to the file.
+
+ (please refer to EventClient's create_event())
+
+ """
+ data = {
+ "event": event,
+ "entityType": entity_type,
+ "entityId": entity_id,
+ }
+
+ if target_entity_type is not None:
+ data["targetEntityType"] = target_entity_type
+
+ if target_entity_id is not None:
+ data["targetEntityId"] = target_entity_id
+
+ if properties is not None:
+ data["properties"] = properties
+
+ et = event_time_validation(event_time)
+ # EventServer uses milliseconds, but python datetime class uses micro. Hence
+ # need to skip the last three digits.
+ et_str = et.strftime("%Y-%m-%dT%H:%M:%S.%f")[:-3] + et.strftime("%z")
+ data["eventTime"] = et_str
+
+ j = json.dumps(data)
+ self._file.write(j+"\n")
+
+ def close(self):
+ """Close the FileExporter
+
+ Call this method when you finish writing all events to JSON file
+ """
+ self._file.close()
diff --git a/predictionio/connection.py b/predictionio/connection.py
index 9609047..04fe7b5 100644
--- a/predictionio/connection.py
+++ b/predictionio/connection.py
@@ -152,8 +152,8 @@
self.body = body
# Try to extract the json.
try:
- self.json_body = json.loads(body)
- except ValueError, ex:
+ self.json_body = json.loads(body.decode('utf8'))
+ except ValueError as ex:
self.json_body = None
def set_error(self, error):
diff --git a/setup.py b/setup.py
index 24bfc03..15163d1 100644
--- a/setup.py
+++ b/setup.py
@@ -10,7 +10,7 @@
setup(
name='PredictionIO',
- version="0.8.3",
+ version="0.9.2",
author=__author__,
author_email=__email__,
packages=['predictionio'],