blob: 5fed09f4ee289f3be0e71235ad1c5ae52795cdd4 [file] [log] [blame]
# Licensed to the Apache Software Foundation (ASF) under one
# or more contributor license agreements. See the NOTICE file
# distributed with this work for additional information
# regarding copyright ownership. The ASF licenses this file
# to you under the Apache License, Version 2.0 (the
# "License"); you may not use this file except in compliance
# with the License. You may obtain a copy of the License at
# http://www.apache.org/licenses/LICENSE-2.0.html
# Unless required by applicable law or agreed to in writing, software
# distributed under the License is distributed on an "AS IS" BASIS,
# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
# See the License for the specific language governing permissions and
# limitations under the License.
from datetime import datetime
from datetime import timedelta
import random
# The followings are for Spark/Hive, comment out if there is no push to Hive
from pyspark import SparkContext, SparkConf, Row
from pyspark.sql import HiveContext
import os
import random
import sys
import logging
logging.basicConfig(format='%(levelname)s:%(message)s', level=logging.INFO)
def read_users():
f = open('users.txt', 'r')
data = f.read()
users = eval(data)
f.close()
return users
def calculate_loop_size(datetime_record):
# 0 is sunday
day_of_week_dic = {0: 5, 1: 1, 2: 1, 3: 1, 4: 1, 5: 1, 6: 3}
hour_dic = {0: 1, 1: 1, 2: 1, 3: 1, 4: 1, 5: 1, 6: 2, 7: 3, 8: 4, 9: 5, 10: 5, 11: 5, 12: 6, 13: 6, 14: 5, 15: 5,
16: 4, 17: 4, 18: 4, 19: 4, 20: 3, 21: 3, 22: 3, 23: 2}
holidays = ['2018-01-01', '2018-01-15', '2018-02-19', '2018-03-31', '2018-05-28', '2018-07-04', '2018-09-03',
'2018-11-22', '2018-11-23', '2018-12-25']
base_loop_size_for_each_user = 2
day_of_week_factor = day_of_week_dic[datetime_record.weekday()]
hour_factor = hour_dic[datetime_record.hour]
holiday_str = datetime.strftime(datetime_record, '%Y-%m-%d')
holiday_factor = 1
if holiday_str in holidays:
holiday_factor = 3
return int(base_loop_size_for_each_user * day_of_week_factor * hour_factor * holiday_factor)
def is_user_active_enough(user):
r = random.randint(1, 20)
if (user['activity'] == 'h'):
return True
if (user['activity'] == 'm' and r % 2 == 0):
return True
if (user['activity'] == 'l' and r % 5 == 0):
return True
return False
def get_user_activity_value(user):
if (user['activity'] == 'h'):
return 10
if (user['activity'] == 'm'):
return 7
if (user['activity'] == 'l'):
return 3
return 1
def generate_traffic_for_user(user):
traffic = user.copy()
traffic.pop('id', None)
traffic.pop('activity', None)
r = random.randint(1, 20)
# smaller si has more chance
si = '1'
if r % 3 == 0:
si = '2'
if r % 5 == 0:
si = '3'
traffic['si'] = si
# if male or unknown, user uses m = magazinelock
traffic['m'] = m_list[random.randint(0, len(m_list) - 1)]
# More 4G usage than 3G
traffic['t'] = t_list[random.randint(0, len(t_list) - 1)]
return traffic
def create_new_ucdoc(uckey, traffic):
ucdoc = traffic.copy()
ucdoc['uckey'] = uckey
ucdoc['days'] = {}
return ucdoc
def create_empty_hours():
doc = {'h0': 0, 'h1': 0, 'h2': 0, 'h3': 0, 'total': 0}
#doc = {'total': 0}
hours_list = []
for _ in range(24):
hours_list.append(doc.copy())
return hours_list
def add_to_ucdocs(ucdocs, date, traffic, traffic_value):
uckey_list = [traffic['m'], traffic['si'], traffic['t'],
traffic['g'], traffic['a'], 'pt', traffic['r'], 'icc']
uckey = ','.join(uckey_list)
date_str = datetime.strftime(date, '%Y-%m-%d')
if uckey not in ucdocs:
ucdoc = create_new_ucdoc(uckey, traffic)
ucdocs[uckey] = ucdoc
ucdoc = ucdocs[uckey]
if date_str not in ucdoc['days']:
ucdoc['days'][date_str] = create_empty_hours()
ucdoc['days'][date_str][date.hour]['h0'] += 0
ucdoc['days'][date_str][date.hour]['h1'] += 1 + traffic_value
ucdoc['days'][date_str][date.hour]['h2'] += 2 + traffic_value
ucdoc['days'][date_str][date.hour]['h3'] += 3 + traffic_value
def push_ucdocs_to_hive(factdata_table_name, ucdocs, bucket_size):
data_list = []
for ucdoc in ucdocs:
for day in ucdoc['days'].keys():
for hour in range(24):
uckey = ucdoc['uckey']
bucket_id = hash(uckey) % (bucket_size)
count_array = []
for i in range(4):
count = ucdoc['days'][day][hour]["h"+str(i)]
cstr = str(i)+":"+str(count)
count_array.append(cstr)
# for python 2.7 use uckey=uckey.decode('utf-8')
row = Row(uckey=uckey,
bucket_id=bucket_id,
count_array=count_array,
hour=hour,
day=str(day))
data_list.append(row)
df = hive_context.createDataFrame(data_list)
# Spark 1.6
# df.select('uckey',
# 'bucket_id',
# 'count_array',
# 'hour',
# 'day'
# ).write.option("header", "true").option("encoding", "UTF-8").mode('append').partitionBy("day").saveAsTable(factdata_table_name)
# Spark 2.3
df.select('uckey',
'bucket_id',
'count_array',
'hour',
'day'
).write.format('hive').option("header", "true").option("encoding", "UTF-8").mode('append').insertInto(factdata_table_name)
if __name__ == '__main__':
users = read_users()
start_date_str = '2018-01-01'
end_date_str = '2018-04-01'
factdata_table_name = 'factdata3m2'
start_date = datetime.strptime(start_date_str, '%Y-%m-%d')
end_date = datetime.strptime(end_date_str, '%Y-%m-%d')
m_list = ['cloudFolder', 'magazinelock', 'minusonepage']
t_list = ['3G', '4G', '5G']
ucdocs = {}
# Loop over time
date = start_date
while date < end_date:
loop_size = calculate_loop_size(date)
logging.info(date)
for _ in range(loop_size):
for user in users:
traffic_value = get_user_activity_value(user)
traffic = generate_traffic_for_user(user)
add_to_ucdocs(ucdocs, date, traffic, traffic_value)
date = date + timedelta(hours=1)
# The following piece pushes the ucdocs to ES or Hive
sc = SparkContext()
hive_context = HiveContext(sc)
command = """
DROP TABLE IF EXISTS {}
""".format(factdata_table_name)
hive_context.sql(command)
command = """
CREATE TABLE IF NOT EXISTS {}
(
uckey string,
bucket_id int,
count_array array<string>,
hour int
)
partitioned by (day string)
""".format(factdata_table_name)
hive_context.setConf("hive.exec.dynamic.partition", "true")
hive_context.setConf("hive.exec.dynamic.partition.mode", "nonstrict")
hive_context.sql(command)
bucket_size = 64
push_ucdocs_to_hive(factdata_table_name, ucdocs.values(), bucket_size)