Merge PR #11, prep 0.9.6 . Clobbered the PR's predictionio/__init__.py conflicting changes, fixed the rest
diff --git a/.DS_Store b/.DS_Store
new file mode 100644
index 0000000..af48d0a
--- /dev/null
+++ b/.DS_Store
Binary files differ
diff --git a/docs/source/conf.py b/docs/source/conf.py
index 539cf04..d52d83e 100644
--- a/docs/source/conf.py
+++ b/docs/source/conf.py
@@ -52,9 +52,9 @@
# built documents.
#
# The short X.Y version.
-version = '0.8'
+version = '0.9'
# The full version, including alpha/beta/rc tags.
-release = '0.8.3'
+release = '0.9.6'
# The language for content autogenerated by Sphinx. Refer to documentation
# for a list of supported languages.
diff --git a/docs/source/predictionio.rst b/docs/source/predictionio.rst
index 265ff51..99ecbeb 100644
--- a/docs/source/predictionio.rst
+++ b/docs/source/predictionio.rst
@@ -3,15 +3,18 @@
.. automodule:: predictionio
-The SDK comprises of two clients:
+The SDK comprises of two clients:
-1. EventClient, it is for importing data into the PredictionIO platform.
+1. EventClient, it is for importing data into the PredictionIO platform.
2. EngineClient, it is for querying PredictionIO Engine Instance, submit query
and extract prediction results.
-Please read `PredictionIO Quick Start
-<http://docs.prediction.io/0.8.3/recommendation/quickstart.html>`_ for
-detailed explanation.
+The SDK also provides a FileExporter for you to write events to a JSON file
+in the same way as EventClient. The JSON file can be used by "pio import"
+for batch data import.
+
+Please read `PredictionIO Event API <http://docs.prediction.io/datacollection/eventapi/>`_ for explanation of
+how SDK can be used to import events.
predictionio.EventClient Class
------------------------------
@@ -38,7 +41,7 @@
handling concurrent requests (although setting "threads" to 1 will still
work). The optimal setting depends on your system and application
requirement.
-
+
predictionio.EngineClient Class
------------------------------
@@ -46,12 +49,21 @@
.. autoclass:: EngineClient
:members:
+
predictionio.AsyncRequest Class
------------------------------
.. autoclass:: AsyncRequest
:members:
+predictionio.FileExporter Class
+-------------------------------
+
+.. versionadded:: 0.9.2
+.. autoclass:: FileExporter
+ :members:
+
+
predictionio SDK Usage Notes
-------------------------
@@ -78,8 +90,7 @@
In some cases you may not care whether the request is successful for performance or application-specific reasons, then you can simply skip step 2.
.. note::
- If you do care about the request status or need to get the return data, then at a later time you will need to call :meth:`~Client.aresp` with the AsyncRequest object returned in step 1.
- Please refer to the documentation of :ref:`asynchronous request methods <async-methods-label>` for more details.
+ If you do care about the request status or need to get the return data, then at a later time you will need to call :meth:`~AsyncRequest.get_response` with the AsyncRequest object returned in step 1.
For example, the following code first generates an asynchronous request to
retrieve recommendations, then get the result at later time::
@@ -94,7 +105,7 @@
>>> <log the error>
-Batch Import Data
+Batch Import Data with EventClient
^^^^^^^^^^^^^^^^^^^^^
When you import large amount of data at once, you may also use asynchronous
@@ -121,4 +132,12 @@
>>> except:
>>> <log the error>
+Batch Import Data with FileExporter and "pio import"
+^^^^^^^^^^^^^^^^^^^^^^^
+.. versionadded:: 0.9.2
+
+You can use FileExporter to create events and write to a JSON file which can
+be used by "pio import". Pleas see `Importing Data in Batch <http://docs.prediction.io/datacollection/batchimport/>`_ for more details.
+
+Note that this method is much faster than batch import with EventClient.
diff --git a/examples/event_fileexport.py b/examples/event_fileexport.py
new file mode 100644
index 0000000..0f86d71
--- /dev/null
+++ b/examples/event_fileexport.py
@@ -0,0 +1,39 @@
+import predictionio
+from datetime import datetime
+import pytz
+
+exporter = predictionio.FileExporter(file_name="test.json")
+
+first_event_properties = {
+ "prop1" : 1,
+ "prop2" : "value2",
+ "prop3" : [1, 2, 3],
+ "prop4" : True,
+ "prop5" : ["a", "b", "c"],
+ "prop6" : 4.56 ,
+ }
+first_event_time = datetime(
+ 2004, 12, 13, 21, 39, 45, 618000, pytz.timezone('US/Mountain'))
+exporter.create_event(
+ event="my_event",
+ entity_type="user",
+ entity_id="uid",
+ properties=first_event_properties,
+ event_time=first_event_time,
+ )
+
+# Second event
+second_event_properties = {
+ "someProperty" : "value1",
+ "anotherProperty" : "value2",
+ }
+exporter.create_event(
+ event="my_event",
+ entity_type="user",
+ entity_id="uid",
+ target_entity_type="item",
+ target_entity_id="iid",
+ properties=second_event_properties,
+ event_time=datetime(2014, 12, 13, 21, 38, 45, 618000, pytz.utc))
+
+exporter.close()
diff --git a/examples/event_sample_channel.py b/examples/event_sample_channel.py
new file mode 100644
index 0000000..125086a
--- /dev/null
+++ b/examples/event_sample_channel.py
@@ -0,0 +1,82 @@
+# this is example of import events to a specific channel of an App
+
+from predictionio import EventClient
+from predictionio import NotFoundError
+from datetime import datetime
+import pytz
+import sys
+
+access_key = None
+channel = None
+
+assert access_key is not None, "Please create an access key with 'pio app new'"
+# Need to create channel first before
+assert channel is not None, "Please create new channel with 'pio app channel-new'"
+
+client = EventClient(access_key=access_key, url="http://localhost:7070",
+ channel=channel)
+
+# Check status
+print("Check status")
+print(client.get_status())
+
+# First event
+first_event_properties = {
+ "prop1" : 1,
+ "prop2" : "value2",
+ "prop3" : [1, 2, 3],
+ "prop4" : True,
+ "prop5" : ["a", "b", "c"],
+ "prop6" : 4.56 ,
+ }
+first_event_time = datetime(
+ 2004, 12, 13, 21, 39, 45, 618000, pytz.timezone('US/Mountain'))
+first_event_response = client.create_event(
+ event="my_event",
+ entity_type="user",
+ entity_id="uid",
+ properties=first_event_properties,
+ event_time=first_event_time,
+ )
+print("First Event response")
+print(first_event_response)
+print
+
+# Second event
+second_event_properties = {
+ "someProperty" : "value1",
+ "anotherProperty" : "value2",
+ }
+second_event_response = client.create_event(
+ event="my_event",
+ entity_type="user",
+ entity_id="uid",
+ target_entity_type="item",
+ target_entity_id="iid",
+ properties=second_event_properties,
+ event_time=datetime(2014, 12, 13, 21, 38, 45, 618000, pytz.utc))
+print("Second Event response")
+print(second_event_response)
+print
+
+
+# Get the first event from Event Server
+first_event_id = first_event_response.json_body["eventId"]
+print("Get Event")
+event = client.get_event(first_event_id)
+print(event)
+print
+
+# Delete the first event from Event Server
+print("Delete Event")
+delete_response = client.delete_event(first_event_id)
+print(delete_response)
+print
+
+# Delete the first event from Event Server again should yield exception.
+print("Delete Event Again")
+try:
+ delete_response = client.delete_event(first_event_id)
+except NotFoundError, ex:
+ print("The expected error: {0}".format(ex))
+print
diff --git a/predictionio/__init__.py b/predictionio/__init__.py
index f4eb87a..e5e5387 100644
--- a/predictionio/__init__.py
+++ b/predictionio/__init__.py
@@ -1,20 +1,30 @@
-"""PredictoinIO Python SDK
-
-The PredictoinIO Python SDK provides easy-to-use functions for integrating
+"""PredictionIO Python SDK
+The PredictionIO Python SDK provides easy-to-use functions for integrating
Python applications with PredictionIO REST API services.
"""
-__version__ = "0.8.5"
+
+__version__ = "0.9.6"
+
+# import deprecated libraries.
+from predictionio.obsolete import Client, InvalidArgumentError
# import packages
import re
+try:
+ import httplib
+except ImportError:
+ # pylint: disable=F0401
+ # http is a Python3 module, replacing httplib
+ from http import client as httplib
try:
- import httplib
+ from urllib import urlencode
except ImportError:
- # pylint: disable=F0401
- # http is a Python3 module, replacing httplib
- from http import client as httplib
+ # pylint: disable=F0401,E0611
+ from urllib.parse import urlencode
+
+import json
import urllib
from datetime import datetime
@@ -22,434 +32,526 @@
from predictionio.connection import Connection
from predictionio.connection import AsyncRequest
-from predictionio.connection import AsyncResponse # noqa
+from predictionio.connection import AsyncResponse
from predictionio.connection import PredictionIOAPIError
class NotCreatedError(PredictionIOAPIError):
- pass
+ pass
class NotFoundError(PredictionIOAPIError):
- pass
+ pass
def event_time_validation(t):
- """ Validate event_time according to EventAPI Specification.
- """
+ """ Validate event_time according to EventAPI Specification.
+ """
- if t is None:
- return datetime.now(pytz.utc)
+ if t is None:
+ return datetime.now(pytz.utc)
- if type(t) != datetime:
- raise AttributeError("event_time must be datetime.datetime")
+ if type(t) != datetime:
+ raise AttributeError("event_time must be datetime.datetime")
- if t.tzinfo is None:
- raise AttributeError("event_time must have tzinfo")
+ if t.tzinfo is None:
+ raise AttributeError("event_time must have tzinfo")
- return t
+ return t
class BaseClient(object):
- def __init__(self, url, threads=1, qsize=0, timeout=5):
- """Constructor of Client object.
+ def __init__(self, url, threads=1, qsize=0, timeout=5):
+ """Constructor of Client object.
+ """
+ self.threads = threads
+ self.url = url
+ self.qsize = qsize
+ self.timeout = timeout
- """
- self.threads = threads
- self.url = url
- self.qsize = qsize
- self.timeout = timeout
+ # check connection type
+ https_pattern = r'^https://(.*)'
+ http_pattern = r'^http://(.*)'
+ m = re.match(https_pattern, url)
+ self.https = True
+ if m is None: # not matching https
+ m = re.match(http_pattern, url)
+ self.https = False
+ if m is None: # not matching http either
+ raise InvalidArgumentError("url is not valid: %s" % url)
+ self.host = m.group(1)
- # check connection type
- https_pattern = r'^https://(.*)'
- http_pattern = r'^http://(.*)'
- m = re.match(https_pattern, url)
- self.https = True
- if m is None: # not matching https
- m = re.match(http_pattern, url)
- self.https = False
- if m is None: # not matching http either
- raise InvalidArgumentError("url is not valid: %s" % url) # noqa
- self.host = m.group(1)
+ self._uid = None # identified uid
+ self._connection = Connection(host=self.host, threads=self.threads,
+ qsize=self.qsize, https=self.https,
+ timeout=self.timeout)
- self._uid = None # identified uid
- self._connection = Connection(host=self.host, threads=self.threads,
- qsize=self.qsize, https=self.https,
- timeout=self.timeout)
+ def close(self):
+ """Close this client and the connection.
+ Call this method when you want to completely terminate the connection
+ with PredictionIO.
+ It will wait for all pending requests to finish.
+ """
+ self._connection.close()
- def close(self):
- """Close this client and the connection.
+ def pending_requests(self):
+ """Return the number of pending requests.
+ :returns:
+ The number of pending requests of this client.
+ """
+ return self._connection.pending_requests()
- Call this method when you want to completely terminate the connection
- with PredictionIO.
- It will wait for all pending requests to finish.
- """
- self._connection.close()
+ def get_status(self):
+ """Get the status of the PredictionIO API Server
+ :returns:
+ status message.
+ :raises:
+ ServerStatusError.
+ """
+ path = "/"
+ request = AsyncRequest("GET", path)
+ request.set_rfunc(self._aget_resp)
+ self._connection.make_request(request)
+ result = request.get_response()
+ return result
- def pending_requests(self):
- """Return the number of pending requests.
+ def _acreate_resp(self, response):
+ if response.error is not None:
+ raise NotCreatedError("Exception happened: %s for request %s" %
+ (response.error, response.request))
+ elif response.status != httplib.CREATED:
+ raise NotCreatedError("request: %s status: %s body: %s" %
+ (response.request, response.status,
+ response.body))
- :returns:
- The number of pending requests of this client.
- """
- return self._connection.pending_requests()
+ return response
- def get_status(self):
- """Get the status of the PredictionIO API Server
+ def _aget_resp(self, response):
+ if response.error is not None:
+ raise NotFoundError("Exception happened: %s for request %s" %
+ (response.error, response.request))
+ elif response.status != httplib.OK:
+ raise NotFoundError("request: %s status: %s body: %s" %
+ (response.request, response.status,
+ response.body))
- :returns:
- status message.
+ return response.json_body
- :raises:
- ServerStatusError.
- """
- path = "/"
- request = AsyncRequest("GET", path)
- request.set_rfunc(self._aget_resp)
- self._connection.make_request(request)
- result = request.get_response()
- return result
+ def _adelete_resp(self, response):
+ if response.error is not None:
+ raise NotFoundError("Exception happened: %s for request %s" %
+ (response.error, response.request))
+ elif response.status != httplib.OK:
+ raise NotFoundError("request: %s status: %s body: %s" %
+ (response.request, response.status,
+ response.body))
- def _acreate_resp(self, response):
- if response.error is not None:
- raise NotCreatedError("Exception happened: %s for request %s" %
- (response.error, response.request))
- elif response.status != httplib.CREATED:
- raise NotCreatedError("request: %s status: %s body: %s" %
- (response.request, response.status,
- response.body))
-
- return response
-
- def _aget_resp(self, response):
- if response.error is not None:
- raise NotFoundError("Exception happened: %s for request %s" %
- (response.error, response.request))
- elif response.status != httplib.OK:
- raise NotFoundError("request: %s status: %s body: %s" %
- (response.request, response.status,
- response.body))
-
- return response.json_body
-
- def _adelete_resp(self, response):
- if response.error is not None:
- raise NotFoundError("Exception happened: %s for request %s" %
- (response.error, response.request))
- elif response.status != httplib.OK:
- raise NotFoundError("request: %s status: %s body: %s" %
- (response.request, response.status,
- response.body))
-
- return response.body
+ return response.body
class EventClient(BaseClient):
- """Client for importing data into PredictionIO Event Server.
+ """Client for importing data into PredictionIO Event Server.
+ Notice that app_id has been deprecated as of 0.8.2. Please use access_token
+ instead.
+ :param access_key: the access key for your application.
+ :param url: the url of PredictionIO Event Server.
+ :param threads: number of threads to handle PredictionIO API requests.
+ Must be >= 1.
+ :param qsize: the max size of the request queue (optional).
+ The asynchronous request becomes blocking once this size has been
+ reached, until the queued requests are handled.
+ Default value is 0, which means infinite queue size.
+ :param timeout: timeout for HTTP connection attempts and requests in
+ seconds (optional).
+ Default value is 5.
+ :param channel: channel name (optional)
+ """
- Notice that app_id has been deprecated as of 0.8.2. Please use access_token
- instead.
+ def __init__(self, access_key,
+ url="http://localhost:7070",
+ threads=1, qsize=0, timeout=5, channel=None):
+ assert type(access_key) is str, ("access_key must be string. "
+ "Notice that app_id has been deprecated in Prediction.IO 0.8.2. "
+ "Please use access_key instead.")
- :param access_key: the access key for your application.
- :param url: the url of PredictionIO Event Server.
- :param threads: number of threads to handle PredictionIO API requests.
- Must be >= 1.
- :param qsize: the max size of the request queue (optional).
- The asynchronous request becomes blocking once this size has been
- reached, until the queued requests are handled.
- Default value is 0, which means infinite queue size.
- :param timeout: timeout for HTTP connection attempts and requests in
- seconds (optional).
- Default value is 5.
+ super(EventClient, self).__init__(url, threads, qsize, timeout)
+
+ if len(access_key) <= 8:
+ raise DeprecationWarning(
+ "It seems like you are specifying an app_id. It is deprecated in "
+ "Prediction.IO 0.8.2. Please use access_key instead. Or, "
+ "you may use an earlier version of this sdk.")
+
+ self.access_key = access_key
+ self.channel = channel
+
+ def acreate_event(self, event, entity_type, entity_id,
+ target_entity_type=None, target_entity_id=None, properties=None,
+ event_time=None):
+ """Asynchronously create an event.
+ :param event: event name. type str.
+ :param entity_type: entity type. It is the namespace of the entityId and
+ analogous to the table name of a relational database. The entityId must be
+ unique within same entityType. type str.
+ :param entity_id: entity id. *entity_type-entity_id* becomes the unique
+ identifier of the entity. For example, you may have entityType named user,
+ and different entity IDs, say 1 and 2. In this case, user-1 and user-2
+ uniquely identifies entities. type str
+ :param target_entity_type: target entity type. type str.
+ :param target_entity_id: target entity id. type str.
+ :param properties: a custom dict associated with an event. type dict.
+ :param event_time: the time of the event. type datetime, must contain
+ timezone info.
+ :returns:
+ AsyncRequest object. You can call the get_response() method using this
+ object to get the final resuls or status of this asynchronous request.
"""
-
- def __init__(self, access_key,
- url="http://localhost:7070",
- threads=1, qsize=0, timeout=5):
- assert type(access_key) is str, ("access_key must be string. "
- "Notice that app_id has been deprecated in Prediction.IO 0.8.2. "
- "Please use access_key instead.")
-
- super(EventClient, self).__init__(url, threads, qsize, timeout)
-
- if len(access_key) <= 8:
- raise DeprecationWarning(
- "It seems like you are specifying an app_id. It is deprecated in "
- "Prediction.IO 0.8.2. Please use access_key instead. Or, "
- "you may use an earlier version of this sdk.")
-
- self.access_key = access_key
-
- def acreate_event(self, event, entity_type, entity_id,
- target_entity_type=None, target_entity_id=None, properties=None,
- event_time=None):
- """Asynchronously create an event.
-
- :param event: event name. type str.
- :param entity_type: entity type. It is the namespace of the entityId and
- analogous to the table name of a relational database. The entityId must be
- unique within same entityType. type str.
- :param entity_id: entity id. *entity_type-entity_id* becomes the unique
- identifier of the entity. For example, you may have entityType named user,
- and different entity IDs, say 1 and 2. In this case, user-1 and user-2
- uniquely identifies entities. type str
- :param target_entity_type: target entity type. type str.
- :param target_entity_id: target entity id. type str.
- :param properties: a custom dict associated with an event. type dict.
- :param event_time: the time of the event. type datetime, must contain
- timezone info.
-
- :returns:
- AsyncRequest object. You can call the get_response() method using this
- object to get the final resuls or status of this asynchronous request.
- """
- data = {
- "event": event,
- "entityType": entity_type,
- "entityId": entity_id,
+ data = {
+ "event": event,
+ "entityType": entity_type,
+ "entityId": entity_id,
}
- if target_entity_type is not None:
- data["targetEntityType"] = target_entity_type
+ if target_entity_type is not None:
+ data["targetEntityType"] = target_entity_type
- if target_entity_id is not None:
- data["targetEntityId"] = target_entity_id
+ if target_entity_id is not None:
+ data["targetEntityId"] = target_entity_id
- if properties is not None:
- data["properties"] = properties
+ if properties is not None:
+ data["properties"] = properties
- et = event_time_validation(event_time)
- # EventServer uses milliseconds, but python datetime class uses micro. Hence
- # need to skip the last three digits.
- et_str = et.strftime("%Y-%m-%dT%H:%M:%S.%f")[:-3] + et.strftime("%z")
- data["eventTime"] = et_str
+ et = event_time_validation(event_time)
+ # EventServer uses milliseconds, but python datetime class uses micro. Hence
+ # need to skip the last three digits.
+ et_str = et.strftime("%Y-%m-%dT%H:%M:%S.%f")[:-3] + et.strftime("%z")
+ data["eventTime"] = et_str
- path = "/events.json?accessKey=%s" % (self.access_key, )
- request = AsyncRequest("POST", path, **data)
- request.set_rfunc(self._acreate_resp)
- self._connection.make_request(request)
- return request
+ qparam = {
+ "accessKey" : self.access_key
+ }
- def create_event(self, event, entity_type, entity_id,
- target_entity_type=None, target_entity_id=None, properties=None,
- event_time=None):
- """Synchronously (blocking) create an event."""
- return self.acreate_event(event, entity_type, entity_id,
- target_entity_type, target_entity_id, properties,
- event_time).get_response()
+ if self.channel is not None:
+ qparam["channel"] = self.channel
- def aget_event(self, event_id):
- """Asynchronouly get an event from Event Server.
+ path = "/events.json?%s" % (urlencode(qparam), )
- :param event_id: event id returned by the EventServer when creating the
- event.
+ request = AsyncRequest("POST", path, **data)
+ request.set_rfunc(self._acreate_resp)
+ self._connection.make_request(request)
+ return request
- :returns:
- AsyncRequest object.
- """
- enc_event_id = urllib.quote(event_id, "") # replace special char with %xx
- path = "/events/%s.json" % enc_event_id
- request = AsyncRequest("GET", path, accessKey=self.access_key)
- request.set_rfunc(self._aget_resp)
- self._connection.make_request(request)
- return request
+ def create_event(self, event, entity_type, entity_id,
+ target_entity_type=None, target_entity_id=None, properties=None,
+ event_time=None):
+ """Synchronously (blocking) create an event."""
+ return self.acreate_event(event, entity_type, entity_id,
+ target_entity_type, target_entity_id, properties,
+ event_time).get_response()
- def get_event(self, event_id):
- """Synchronouly get an event from Event Server."""
- return self.aget_event(event_id).get_response()
+ def aget_event(self, event_id):
+ """Asynchronouly get an event from Event Server.
+ :param event_id: event id returned by the EventServer when creating the
+ event.
+ :returns:
+ AsyncRequest object.
+ """
+ qparam = {
+ "accessKey" : self.access_key
+ }
- def adelete_event(self, event_id):
- """Asynchronouly delete an event from Event Server.
+ if self.channel is not None:
+ qparam["channel"] = self.channel
- :param event_id: event id returned by the EventServer when creating the
- event.
+ enc_event_id = urllib.quote(event_id, "") # replace special char with %xx
+ path = "/events/%s.json" % (enc_event_id, )
+ request = AsyncRequest("GET", path, **qparam)
+ request.set_rfunc(self._aget_resp)
+ self._connection.make_request(request)
+ return request
- :returns:
- AsyncRequest object.
- """
- enc_event_id = urllib.quote(event_id, "") # replace special char with %xx
- path = "/events/%s.json" % (enc_event_id, )
- request = AsyncRequest("DELETE", path, accessKey=self.access_key)
- request.set_rfunc(self._adelete_resp)
- self._connection.make_request(request)
- return request
+ def get_event(self, event_id):
+ """Synchronouly get an event from Event Server."""
+ return self.aget_event(event_id).get_response()
- def delete_event(self, event_id):
- """Synchronouly delete an event from Event Server."""
- return self.adelete_event(event_id).get_response()
+ def aget_events(self, startTime=None, untilTime=None, entityType=None, entityId=None, limit=None, reversed=False):
+ """Asynchronouly get events from Event Server. (Getting events through the Event Server API is used for debugging and not recommended for production)
+ :param startTime: time in ISO8601 format. Return events with eventTime >= startTime.
+ :param untilTime: time in ISO8601 format. Return events with eventTime < untilTime.
+ :param entityId: String. The entityId. Return events for this entityId only.
+ :param limit: Integer. The number of record events returned. Default is 20. -1 to get all.
+ :param reversed: Boolean. Must be used with both entityType and entityId specified,
+ returns events in reversed chronological order. Default is false.
+ :returns:
+ AsyncRequest object.
+ """
+ qparam = {
+ "accessKey" : self.access_key,
+ "reversed": reversed
+ }
- # Below are helper functions
+ if startTime is not None:
+ qparam["startTime"] = startTime
- def aset_user(self, uid, properties={}, event_time=None):
- """Set properties of a user.
+ if untilTime is not None:
+ qparam["untilTime"] = untilTime
- Wrapper of acreate_event function, setting event to "$set" and entity_type
- to "user".
- """
- return self.acreate_event(
- event="$set",
- entity_type="user",
- entity_id=uid,
- properties=properties,
- event_time=event_time,
+ if entityType is not None:
+ qparam["entityType"] = entityType
+
+ if entityId is not None:
+ qparam["entityId"] = entityId
+
+ if limit is not None:
+ qparam["limit"] = limit
+
+ if self.channel is not None:
+ qparam["channel"] = self.channel
+ path = "/events.json"
+ request = AsyncRequest("GET", path, **qparam)
+ request.set_rfunc(self._aget_resp)
+ self._connection.make_request(request)
+ return request
+
+ def get_events(self, startTime=None, untilTime=None, entityType=None, entityId=None, limit=None, reversed=False):
+ """Synchronouly get event from Event Server. (Getting events through the Event Server API is used for debugging and not recommended for production)"""
+ return self.aget_events(
+ startTime=startTime,
+ untilTime=untilTime,
+ entityType=entityType,
+ entityId=entityId,
+ limit=limit,
+ reversed=reversed
+ ).get_response()
+
+ def adelete_event(self, event_id):
+ """Asynchronouly delete an event from Event Server.
+ :param event_id: event id returned by the EventServer when creating the
+ event.
+ :returns:
+ AsyncRequest object.
+ """
+ qparam = {
+ "accessKey" : self.access_key
+ }
+
+ if self.channel is not None:
+ qparam["channel"] = self.channel
+
+ enc_event_id = urllib.quote(event_id, "") # replace special char with %xx
+ path = "/events/%s.json" % (enc_event_id, )
+ request = AsyncRequest("DELETE", path, **qparam)
+ request.set_rfunc(self._adelete_resp)
+ self._connection.make_request(request)
+ return request
+
+ def delete_event(self, event_id):
+ """Synchronouly delete an event from Event Server."""
+ return self.adelete_event(event_id).get_response()
+
+ ## Below are helper functions
+
+ def aset_user(self, uid, properties={}, event_time=None):
+ """Set properties of a user.
+ Wrapper of acreate_event function, setting event to "$set" and entity_type
+ to "user".
+ """
+ return self.acreate_event(
+ event="$set",
+ entity_type="user",
+ entity_id=uid,
+ properties=properties,
+ event_time=event_time,
+ )
+
+ def set_user(self, uid, properties={}, event_time=None):
+ """Set properties of a user"""
+ return self.aset_user(uid, properties, event_time).get_response()
+
+ def aunset_user(self, uid, properties, event_time=None):
+ """Unset properties of an user.
+ Wrapper of acreate_event function, setting event to "$unset" and entity_type
+ to "user".
+ """
+ # check properties={}, it cannot be empty
+ return self.acreate_event(
+ event="$unset",
+ entity_type="user",
+ entity_id=uid,
+ properties=properties,
+ event_time=event_time,
)
- def set_user(self, uid, properties={}, event_time=None):
- """Set properties of a user"""
- return self.aset_user(uid, properties, event_time).get_response()
+ def unset_user(self, uid, properties, event_time=None):
+ """Set properties of a user"""
+ return self.aunset_user(uid, properties, event_time).get_response()
- def aunset_user(self, uid, properties, event_time=None):
- """Unset properties of an user.
+ def adelete_user(self, uid, event_time=None):
+ """Delete a user.
+ Wrapper of acreate_event function, setting event to "$delete" and entity_type
+ to "user".
+ """
+ return self.acreate_event(
+ event="$delete",
+ entity_type="user",
+ entity_id=uid,
+ event_time=event_time)
- Wrapper of acreate_event function, setting event to "$unset" and entity_type
- to "user".
- """
- # check properties={}, it cannot be empty
- return self.acreate_event(
- event="$unset",
- entity_type="user",
- entity_id=uid,
- properties=properties,
- event_time=event_time,
- )
+ def delete_user(self, uid, event_time=None):
+ """Delete a user."""
+ return self.adelete_user(uid, event_time).get_response()
- def unset_user(self, uid, properties, event_time=None):
- """Set properties of a user"""
- return self.aunset_user(uid, properties, event_time).get_response()
+ def aset_item(self, iid, properties={}, event_time=None):
+ """Set properties of an item.
+ Wrapper of acreate_event function, setting event to "$set" and entity_type
+ to "item".
+ """
+ return self.acreate_event(
+ event="$set",
+ entity_type="item",
+ entity_id=iid,
+ properties=properties,
+ event_time=event_time)
- def adelete_user(self, uid, event_time=None):
- """Delete a user.
+ def set_item(self, iid, properties={}, event_time=None):
+ """Set properties of an item."""
+ return self.aset_item(iid, properties, event_time).get_response()
- Wrapper of acreate_event function, setting event to "$delete" and entity_type
- to "user".
- """
- return self.acreate_event(
- event="$delete",
- entity_type="user",
- entity_id=uid,
- event_time=event_time)
+ def aunset_item(self, iid, properties={}, event_time=None):
+ """Unset properties of an item.
+ Wrapper of acreate_event function, setting event to "$unset" and entity_type
+ to "item".
+ """
+ return self.acreate_event(
+ event="$unset",
+ entity_type="item",
+ entity_id=iid,
+ properties=properties,
+ event_time=event_time)
- def delete_user(self, uid, event_time=None):
- """Delete a user."""
- return self.adelete_user(uid, event_time).get_response()
+ def unset_item(self, iid, properties={}, event_time=None):
+ """Unset properties of an item."""
+ return self.aunset_item(iid, properties, event_time).get_response()
- def aset_item(self, iid, properties={}, event_time=None):
- """Set properties of an item.
+ def adelete_item(self, iid, event_time=None):
+ """Delete an item.
+ Wrapper of acreate_event function, setting event to "$delete" and entity_type
+ to "item".
+ """
+ return self.acreate_event(
+ event="$delete",
+ entity_type="item",
+ entity_id=iid,
+ event_time=event_time)
- Wrapper of acreate_event function, setting event to "$set" and entity_type
- to "item".
- """
- return self.acreate_event(
- event="$set",
- entity_type="item",
- entity_id=iid,
- properties=properties,
- event_time=event_time)
+ def delete_item(self, iid, event_time=None):
+ """Delete an item."""
+ return self.adelete_item(iid, event_time).get_response()
- def set_item(self, iid, properties={}, event_time=None):
- """Set properties of an item."""
- return self.aset_item(iid, properties, event_time).get_response()
+ def arecord_user_action_on_item(self, action, uid, iid, properties={},
+ event_time=None):
+ """Create a user-to-item action.
+ Wrapper of acreate_event function, setting entity_type to "user" and
+ target_entity_type to "item".
+ """
+ return self.acreate_event(
+ event=action,
+ entity_type="user",
+ entity_id=uid,
+ target_entity_type="item",
+ target_entity_id=iid,
+ properties=properties,
+ event_time=event_time)
- def aunset_item(self, iid, properties={}, event_time=None):
- """Unset properties of an item.
-
- Wrapper of acreate_event function, setting event to "$unset" and entity_type
- to "item".
- """
- return self.acreate_event(
- event="$unset",
- entity_type="item",
- entity_id=iid,
- properties=properties,
- event_time=event_time)
-
- def unset_item(self, iid, properties={}, event_time=None):
- """Unset properties of an item."""
- return self.aunset_item(iid, properties, event_time).get_response()
-
- def adelete_item(self, iid, event_time=None):
- """Delete an item.
-
- Wrapper of acreate_event function, setting event to "$delete" and entity_type
- to "item".
- """
- return self.acreate_event(
- event="$delete",
- entity_type="item",
- entity_id=iid,
- event_time=event_time)
-
- def delete_item(self, iid, event_time=None):
- """Delete an item."""
- return self.adelete_item(iid, event_time).get_response()
-
- def arecord_user_action_on_item(self, action, uid, iid, properties={},
- event_time=None):
- """Create a user-to-item action.
-
- Wrapper of acreate_event function, setting entity_type to "user" and
- target_entity_type to "item".
- """
- return self.acreate_event(
- event=action,
- entity_type="user",
- entity_id=uid,
- target_entity_type="item",
- target_entity_id=iid,
- properties=properties,
- event_time=event_time)
-
- def record_user_action_on_item(self, action, uid, iid, properties={},
- event_time=None):
- """Create a user-to-item action."""
- return self.arecord_user_action_on_item(
- action, uid, iid, properties, event_time).get_response()
+ def record_user_action_on_item(self, action, uid, iid, properties={},
+ event_time=None):
+ """Create a user-to-item action."""
+ return self.arecord_user_action_on_item(
+ action, uid, iid, properties, event_time).get_response()
class EngineClient(BaseClient):
- """Client for extracting prediction results from an PredictionIO Engine
- Instance.
+ """Client for extracting prediction results from an PredictionIO Engine
+ Instance.
+ :param url: the url of the PredictionIO Engine Instance.
+ :param threads: number of threads to handle PredictionIO API requests.
+ Must be >= 1.
+ :param qsize: the max size of the request queue (optional).
+ The asynchronous request becomes blocking once this size has been
+ reached, until the queued requests are handled.
+ Default value is 0, which means infinite queue size.
+ :param timeout: timeout for HTTP connection attempts and requests in
+ seconds (optional).
+ Default value is 5.
+ """
+ def __init__(self, url="http://localhost:8000", threads=1,
+ qsize=0, timeout=5):
+ super(EngineClient, self).__init__(url, threads, qsize, timeout)
- :param url: the url of the PredictionIO Engine Instance.
- :param threads: number of threads to handle PredictionIO API requests.
- Must be >= 1.
- :param qsize: the max size of the request queue (optional).
- The asynchronous request becomes blocking once this size has been
- reached, until the queued requests are handled.
- Default value is 0, which means infinite queue size.
- :param timeout: timeout for HTTP connection attempts and requests in
- seconds (optional).
- Default value is 5.
-
+ def asend_query(self, data):
+ """Asynchronously send a request to the engine instance with data as the
+ query.
+ :param data: the query: It is coverted to an json object using json.dumps
+ method. type dict.
+ :returns:
+ AsyncRequest object. You can call the get_response() method using this
+ object to get the final resuls or status of this asynchronous request.
"""
+ path = "/queries.json"
+ request = AsyncRequest("POST", path, **data)
+ request.set_rfunc(self._aget_resp)
+ self._connection.make_request(request)
+ return request
- def __init__(self, url="http://localhost:8000", threads=1,
- qsize=0, timeout=5):
- super(EngineClient, self).__init__(url, threads, qsize, timeout)
+ def send_query(self, data):
+ """Synchronously send a request.
+ :param data: the query: It is coverted to an json object using json.dumps
+ method. type dict.
+ :returns: the prediction.
+ """
+ return self.asend_query(data).get_response()
- def asend_query(self, data):
- """Asynchronously send a request to the engine instance with data as the
- query.
+class FileExporter(object):
+ """File exporter to write events to JSON file for batch import
+ :param file_name: the destination file name
+ """
+ def __init__(self, file_name):
+ """Constructor of Exporter.
+ """
+ self._file = open(file_name, 'w')
- :param data: the query: It is coverted to an json object using json.dumps
- method. type dict.
+ def create_event(self, event, entity_type, entity_id,
+ target_entity_type=None, target_entity_id=None, properties=None,
+ event_time=None):
+ """Create an event and write to the file.
+ (please refer to EventClient's create_event())
+ """
+ data = {
+ "event": event,
+ "entityType": entity_type,
+ "entityId": entity_id,
+ }
- :returns:
- AsyncRequest object. You can call the get_response() method using this
- object to get the final resuls or status of this asynchronous request.
- """
- path = "/queries.json"
- request = AsyncRequest("POST", path, **data)
- request.set_rfunc(self._aget_resp)
- self._connection.make_request(request)
- return request
+ if target_entity_type is not None:
+ data["targetEntityType"] = target_entity_type
- def send_query(self, data):
- """Synchronously send a request.
+ if target_entity_id is not None:
+ data["targetEntityId"] = target_entity_id
- :param data: the query: It is coverted to an json object using json.dumps
- method. type dict.
+ if properties is not None:
+ data["properties"] = properties
- :returns: the prediction.
- """
- return self.asend_query(data).get_response()
+ et = event_time_validation(event_time)
+ # EventServer uses milliseconds, but python datetime class uses micro. Hence
+ # need to skip the last three digits.
+ et_str = et.strftime("%Y-%m-%dT%H:%M:%S.%f")[:-3] + et.strftime("%z")
+ data["eventTime"] = et_str
+
+ j = json.dumps(data)
+ self._file.write(j+"\n")
+
+ def close(self):
+ """Close the FileExporter
+ Call this method when you finish writing all events to JSON file
+ """
+ self._file.close()
diff --git a/predictionio/connection.py b/predictionio/connection.py
index 8c231f4..8205bb2 100644
--- a/predictionio/connection.py
+++ b/predictionio/connection.py
@@ -157,9 +157,52 @@
def set_error(self, error):
self.error = error
- def set_request(self, request):
- self.request = request
+class AsyncResponse(object):
+ """Store the response of asynchronous request
+ When get the response, user should check if error is None (which means no
+ Exception happens).
+ If error is None, then should check if the status is expected.
+ """
+
+ def __init__(self):
+ #: exception object if any happens
+ self.error = None
+
+ self.version = None
+ self.status = None
+ self.reason = None
+ #: Response header. str
+ self.headers = None
+ #: Response body. str
+ self.body = None
+ #: Jsonified response body. Remains None if conversion is unsuccessful.
+ self.json_body = None
+ #: Point back to the AsyncRequest object
+ self.request = None
+
+ def __str__(self):
+ return "e:%s v:%s s:%s r:%s h:%s b:%s" % (self.error, self.version,
+ self.status, self.reason,
+ self.headers, self.body)
+
+ def set_resp(self, version, status, reason, headers, body):
+ self.version = version
+ self.status = status
+ self.reason = reason
+ self.headers = headers
+ self.body = body
+ # Try to extract the json.
+ try:
+ self.json_body = json.loads(body.decode('utf8'))
+ except ValueError as ex:
+ self.json_body = None
+
+ def set_error(self, error):
+ self.error = error
+
+ def set_request(self, request):
+ self.request = request
class PredictionIOHttpConnection(object):
def __init__(self, host, https=True, timeout=5):
diff --git a/setup.py b/setup.py
index f0e1038..67b12e3 100644
--- a/setup.py
+++ b/setup.py
@@ -10,7 +10,7 @@
setup(
name='PredictionIO',
- version="0.8.5",
+ version="0.9.6",
author=__author__,
author_email=__email__,
packages=['predictionio'],