| # Licensed to the Apache Software Foundation (ASF) under one |
| # or more contributor license agreements. See the NOTICE file |
| # distributed with this work for additional information |
| # regarding copyright ownership. The ASF licenses this file |
| # to you under the Apache License, Version 2.0 (the |
| # "License"); you may not use this file except in compliance |
| # with the License. You may obtain a copy of the License at |
| |
| # http://www.apache.org/licenses/LICENSE-2.0.html |
| |
| # Unless required by applicable law or agreed to in writing, software |
| # distributed under the License is distributed on an "AS IS" BASIS, |
| # WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. |
| # See the License for the specific language governing permissions and |
| # limitations under the License. |
| |
| import yaml |
| import argparse |
| from datetime import datetime, timedelta |
| import timeit |
| |
| from pyspark import SparkContext |
| from pyspark.sql.functions import col, udf |
| from pyspark.sql.types import BooleanType, IntegerType, StringType |
| from pyspark.sql import HiveContext |
| from util import load_config, load_batch_config, load_df |
| from util import write_to_table, write_to_table_with_partition, generate_add_keywords, print_batching_info, resolve_placeholder |
| import hashlib |
| |
| |
| def clean_persona(df, did_bucket_num): |
| """ |
| This cleans persona table to have distinct did, gender and age. |
| Every did is associated to only one age and gender |
| [Row(did=u'a1f5e194fa29ecb1a10a8c813731d7b289cf8b2a30cf8b7784cf5e8258e2bb9d', |
| gender_new_dev=u'1', forecast_age_dev=u'4')] |
| """ |
| df = df.select('did', 'gender', 'age').distinct() |
| df_did_count = df.groupBy('did').agg({'did': 'count'}) |
| df = df.join(df_did_count, on=['did'], how='left') |
| df = df.filter(col('count(did)') == 1) |
| df = df.drop(col('count(did)')) |
| df = df.withColumn("gender", df["gender"].cast(IntegerType())) |
| df = df.withColumn("age", df["age"].cast(IntegerType())) |
| df = add_did_bucket(df, did_bucket_num) |
| return df |
| |
| |
| def add_day(df): |
| # action_time has the format "%Y-%m-%d %H:%M:%S" |
| df = df.withColumn('day', udf(lambda x: x[:10], StringType())(col('action_time'))) |
| return df |
| |
| |
| def add_did_bucket(df, n): |
| def __hash_sha256(s): |
| hex_value = hashlib.sha256(s.encode('utf-8')).hexdigest() |
| return int(hex_value, 16) |
| _udf = udf(lambda x: __hash_sha256(x) % n) |
| df = df.withColumn('did_bucket', _udf(df.did)) |
| return df |
| |
| |
| def clean_batched_log(df, df_persona, conditions, df_keywords, did_bucket_num): |
| """ |
| df: show-log or click-log dataframe |
| df_persona: persona dataframe |
| conditions: list of slot-ids and associated media-category |
| df_keywords: keywords-spread-app-id dataframe |
| |
| This methods: |
| 1. Filters right slot-ids. |
| 2. Add gender and age from persona table to each record of log |
| 3. Add keyword to each row by looking to spread-app-id |
| """ |
| def filter_new_si(df, new_slot_id_list): |
| """ |
| This filters logs with pre-defined slot-ids. |
| """ |
| new_si_set = set(new_slot_id_list) |
| _udf = udf(lambda x: x in new_si_set, BooleanType()) |
| df = df.filter(_udf(df.slot_id)) |
| return df |
| |
| new_slot_id_list = conditions['new_slot_id_list'] |
| df = filter_new_si(df, new_slot_id_list) |
| df = df.join(df_persona, on=['did'], how='inner') |
| df = df.join(df_keywords, on=['spread_app_id'], how="inner") |
| df = add_day(df) |
| df = add_did_bucket(df, did_bucket_num) |
| return df |
| |
| |
| def clean_logs(cfg, df_persona, df_keywords, log_table_names): |
| sc = SparkContext.getOrCreate() |
| sc.setLogLevel(cfg['log']['level']) |
| hive_context = HiveContext(sc) |
| cfg_clean = cfg['pipeline']['main_clean'] |
| conditions = cfg_clean['conditions'] |
| start_date, end_date, load_minutes = load_batch_config(cfg) |
| did_bucket_num = cfg_clean['did_bucket_num'] |
| |
| timer_start = timeit.default_timer() |
| showlog_table, showlog_output_table, clicklog_table, clicklog_output_table = log_table_names |
| starting_time = datetime.strptime(start_date, "%Y-%m-%d") |
| ending_time = datetime.strptime(end_date, "%Y-%m-%d") |
| columns = ['spread_app_id', 'did', 'adv_id', 'media', 'slot_id', 'device_name', 'net_type', 'price_model', |
| 'action_time', 'gender', 'age', 'keyword', 'keyword_index', 'day', 'did_bucket'] |
| |
| batched_round = 1 |
| while starting_time < ending_time: |
| time_start = starting_time.strftime("%Y-%m-%d %H:%M:%S") |
| batch_time_end = starting_time + timedelta(minutes=load_minutes) |
| batch_time_end = min(batch_time_end, ending_time) |
| time_end = batch_time_end.strftime("%Y-%m-%d %H:%M:%S") |
| print_batching_info("Main clean", batched_round, time_start, time_end) |
| |
| # TODO: Add partitions in the query if the log files are partitioned by day. |
| command = """SELECT |
| did, |
| adv_id, |
| adv_type AS media, |
| slot_id, |
| spread_app_id, |
| device_name, |
| net_type, |
| adv_bill_mode_cd AS price_model, |
| {time} AS action_time |
| FROM {table} WHERE {time} >= '{time_start}' AND {time} < '{time_end}' """ |
| |
| # command = """SELECT |
| # did, |
| # adv_id, |
| # adv_type, |
| # slot_id, |
| # spread_app_id, |
| # device_name, |
| # net_type, |
| # adv_bill_mode_cd, |
| # {time} |
| # FROM {table} WHERE {time} >= '{time_start}' AND {time} < '{time_end}' """ |
| |
| df_showlog_batched = hive_context.sql(command.format(time='show_time', table=showlog_table, time_start=time_start, time_end=time_end)) |
| df_clicklog_batched = hive_context.sql(command.format(time='click_time', table=clicklog_table, time_start=time_start, time_end=time_end)) |
| |
| # write_to_table(df_showlog_batched, "ads_showlog_0520_2days", mode='overwrite') |
| # write_to_table(df_clicklog_batched, "ads_clicklog_0520_2days", mode='overwrite') |
| # return |
| |
| # Node: for mode='append' spark might throw socket closed exception, it was due to bug in spark and does not affect data and table. |
| mode = 'overwrite' if batched_round == 1 else 'append' |
| |
| df_showlog_batched = clean_batched_log(df_showlog_batched, df_persona, conditions, df_keywords, did_bucket_num=did_bucket_num) |
| df_showlog_batched = df_showlog_batched.select(columns) |
| write_to_table_with_partition(df_showlog_batched, showlog_output_table, partition=('day', 'did_bucket'), mode=mode) |
| |
| df_clicklog_batched = clean_batched_log(df_clicklog_batched, df_persona, conditions, df_keywords, did_bucket_num=did_bucket_num) |
| df_clicklog_batched = df_clicklog_batched.select(columns) |
| write_to_table_with_partition(df_clicklog_batched, clicklog_output_table, partition=('day', 'did_bucket'), mode=mode) |
| |
| batched_round += 1 |
| starting_time = batch_time_end |
| |
| timer_end = timeit.default_timer() |
| print('Total batching seconds: ' + str(timer_end - timer_start)) |
| |
| |
| def run(hive_context, cfg): |
| """ |
| # This cleans persona, clicklog and showlog tables, |
| # by having persona table with distinct (did,gender,age) and |
| # by removing unassociated slot-id and did in the log tables. |
| """ |
| cfg_clean = cfg['pipeline']['main_clean'] |
| |
| persona_table = cfg['persona_table_name'] |
| clicklog_table = cfg['clicklog_table_name'] |
| showlog_table = cfg['showlog_table_name'] |
| keywords_table = cfg['keywords_table'] |
| create_keywords = cfg_clean['create_keywords'] |
| |
| persona_new_table = cfg_clean['persona_output_table'] |
| clicklog_new_table = cfg_clean['clicklog_output_table'] |
| showlog_new_table = cfg_clean['showlog_output_table'] |
| |
| did_bucket_num = cfg_clean['did_bucket_num'] |
| |
| command = """SELECT did, |
| gender_new_dev AS gender, |
| forecast_age_dev AS age |
| FROM {}""".format(persona_table) |
| df_persona = hive_context.sql(command) |
| |
| df_persona = clean_persona(df_persona, did_bucket_num) |
| |
| # Use keywords to clean the clicklog and showlog which do not have any keyword association. |
| # Create ad keywords table if does not exist, else load the keywords. |
| if create_keywords: |
| df_keywords = generate_add_keywords(keywords_table) |
| else: |
| df_keywords = load_df(hive_context, keywords_table) |
| #[Row(keyword=u'education', keyword_index=1, spread_app_id=u'C100203741')] |
| |
| log_table_names = (showlog_table, showlog_new_table, clicklog_table, clicklog_new_table) |
| |
| clean_logs(cfg, df_persona, df_keywords, log_table_names) |
| |
| write_to_table_with_partition(df_persona, persona_new_table, partition=('did_bucket'), mode='overwrite') |
| |
| |
| if __name__ == "__main__": |
| |
| """ |
| main_clean is a process to generate cleaned persona, clicklog and showlog. |
| """ |
| sc, hive_context, cfg = load_config( |
| description="clean data of persona, clicklog and showlog.") |
| resolve_placeholder(cfg) |
| run(hive_context=hive_context, cfg=cfg) |
| sc.stop() |