blob: f0fcb7ce738a87876ce3b34361e062cf3c9f6cfc [file] [log] [blame]
from appdata import AppData
import predictionio
import sys
import pytz
import datetime
def batch_import_task(app_data, client, all_info=False):
# event_time is an important properties used by the PredictionIO platform. It
# is particularly useful in generating training and testing set, which uses
# event_time for splitting. Hence, when we import data, better to make the
# event_time as approximate to fact as possible.
#
# However, in many cases, the data doesn't come with a time. Movie-lens' user
# data, for example, only reveals the age, gender, occupation, and zip code of
# a user. It doesn't report when the user is "created". Likewise, for items,
# it only reports the release date.
#
# To remedy this problem, we have to make some assumptions to the data. In
# this import script, the event_time for user is set to epoch=0, and the
# event_time for item is set to the release_date + 00:00:00 UTC.
print "[Info] Importing users to PredictionIO..."
user_create_time = datetime.datetime.fromtimestamp(0, tz=pytz.utc)
count = 0
set_user_request_list = []
for k, v in app_data.get_users().iteritems():
count += 1
if all_info:
print "[Info] Importing %s..." % v
else:
if (count % 32 == 0):
sys.stdout.write('\r[Info] %s' % count)
sys.stdout.flush()
set_user_request_list.append(
client.aset_user(uid=v.uid, event_time=user_create_time))
[r.get_response() for r in set_user_request_list]
sys.stdout.write('\r[Info] %s users were imported.\n' % count)
sys.stdout.flush()
print "[Info] Importing items to PredictionIO..."
count = 0
set_item_request_list = []
# event_time is a datetime, hence need to add a time component to the release
# date.
midnight_utc = datetime.time(0, 0, 0, tzinfo=pytz.utc)
epoch = datetime.datetime.fromtimestamp(0, tz=pytz.utc)
for k, v in app_data.get_items().iteritems():
count += 1
if all_info:
print "[Info] Importing %s..." % v
else:
if (count % 32 == 0):
sys.stdout.write('\r[Info] %s' % count)
sys.stdout.flush()
itypes = ("movie",) + v.genres
release_datetime = datetime.datetime.combine(
v.release_date,
midnight_utc)
# event_time must be after epoch.
event_time = release_datetime if release_datetime > epoch else epoch
utf8_name = v.name.decode('utf-8', 'ignore')
set_item_request = client.aset_item(
iid=v.iid,
event_time=event_time,
properties={
"itypes": list(itypes),
"starttime": release_datetime.isoformat(),
"name": utf8_name,
"year": v.year } )
set_item_request_list.append(set_item_request)
[r.get_response() for r in set_item_request_list]
sys.stdout.write('\r[Info] %s items were imported.\n' % count)
sys.stdout.flush()
print "[Info] Importing rate actions to PredictionIO..."
count = 0
create_event_request_list = []
for v in app_data.get_rate_actions():
count += 1
if all_info:
print "[Info] Importing %s..." % v
else:
if (count % 32 == 0):
sys.stdout.write('\r[Info] %s' % count)
sys.stdout.flush()
properties = { "rating" : int(v.rating) }
req = client.acreate_event(
event="rate",
entity_type="user",
entity_id=v.uid,
target_entity_type="item",
target_entity_id=v.iid,
properties=properties,
event_time=v.t.replace(tzinfo=pytz.utc),
)
create_event_request_list.append(req)
[r.get_response() for r in create_event_request_list]
sys.stdout.write('\r[Info] %s rate actions were imported.\n' % count)
sys.stdout.flush()
if __name__ == '__main__':
if len(sys.argv) < 3:
sys.exit("Usage: python -m examples.demo-movielens.batch_import "
"<access_key> <url>")
access_key = sys.argv[1]
client = predictionio.EventClient(
access_key=access_key,
url=sys.argv[2],
threads=5,
qsize=500)
# Test connection
print "Status:", client.get_status()
app_data = AppData()
batch_import_task(app_data, client)
client.close()