In Advertisement, lookalike models are used to build larger audiences from original audience (seeds). The larger audience reflects the characteristics of the seeds. This helps advertiser to reach a large number of new prospects.
graph TD DB1[("Persona Table")]-->DP["Data Preparation"] DB2[("Click Table")]-->DP DB3[("Show Table")]-->DP DB4[("Keyword Table")]-->DP DP-->TR[("Train-Ready Table")] TR-->TF["TFRecords Generator"] TF-->Trainer["Lookalike Model Trainer"] Trainer-->Model{{"Lookalike Model"}} style Model fill:#f9f,stroke:#333,stroke-width:4px
graph TD style Model fill:#f9f,stroke:#333,stroke-width:4px Model{{"Lookalike Model"}} ---|REST CALL| SG["Score Generator"] DB4[("Keyword Table")] -->SG DB1[("Persona Table")] -->SG SG-->SCT[("Score Table")]
graph LR user("DMP Console")--> AH(["API Handler"]) style user fill:#f9f,stroke:#333,stroke-width:4px AH-->EAR[["Extended-Audience-Reader"]] EAR--->HDFS[("HDFS Extended Audience")] AH-->AE[["Audience-Extender"]] ST[("Score Table")]-->AE AE-->HDFS
Data needs to be cleaned and transformed prior training. The data that is used for building a lookalike model has to posses the following characteristics:
The Lookalike model targets to setup the correlation between user and ads based on logged user behavior. The log data got from product is usually noisy due to various reasons, such as: unstable slot_id, redundant records or missing data. The data pre-processing is then required to clean original logs data before the data can be feed into the model trainer. Cleaning steps include transforming the logs data to new data format, and generating new data tables. This document of the data frame transformations for logs data pre-processing is designed for clarifying the whole processing steps on logs data and guiding the coding logic on the data pre-processing of DIN model.
The input tables for Lookalike model are the following log files.
>>> df=sql('select * from ads_persona_0520') >>> df.count() 380000 >>> t1=df.take(1) >>> m=map(lambda x:(x[0],x[1],t1[0][x[0]]),df.dtypes) >>> for _ in m: ... print(_) ... ### USED Fields ('did', 'string', u'7eb9401a7ba0e377ad2fd81315b39da088b2943d69ff89c3ba56149d0a166d11') ('gender_new_dev', 'string', u'0') ('forecast_age_dev', 'string', u'3') ###
df=sql('select * from ads_showlog_0520') ### USED Fields ('did', 'string', u'fd939b9efd2d9c512bcd360c38d32eedf03f6a11a935295c817f8e61caa2f049') ('adv_id', 'string', u'40023087') ('adv_type', 'string', u'native') ('slot_id', 'string', u'l03493p0r3') ('spread_app_id', 'string', u'C10374976') ('device_name', 'string', u'MAR-AL00') ('net_type', 'string', u'4G') ('adv_bill_mode_cd', 'string', u'CPC') ('show_time', 'string', u'2020-01-07 16:22:18.129') ###
df=sql('select * from ads_clicklog_0520') ### USED Fields ('did', 'string', u'fd939b9efd2d9c512bcd360c38d32eedf03f6a11a935295c817f8e61caa2f049') ('adv_id', 'string', u'40023087') ('adv_type', 'string', u'native') ('slot_id', 'string', u'l03493p0r3') ('spread_app_id', 'string', u'C10374976') ('device_name', 'string', u'MAR-AL00') ('net_type', 'string', u'4G') ('adv_bill_mode_cd', 'string', u'CPC') ('click_time', 'string', u'2020-01-07 16:22:18.129') ###
Clean Persona Table
This operation is to clean persona table to:
Clean Show-log and Click-log Tables
The cleaning operation is performed in batches.
Every batch is cleaned according to following policies:
>>> sql('show partitions lookalike_02022021_limited_clicklog').show(100,False) +---------------------------+ |partition | +---------------------------+ |day=2019-12-19/did_bucket=0| |day=2019-12-19/did_bucket=1| |day=2019-12-20/did_bucket=0| |day=2019-12-20/did_bucket=1| +---------------------------+ >>> sql('show partitions lookalike_02022021_limited_showlog').show(100,False) +---------------------------+ |partition | +---------------------------+ |day=2019-12-19/did_bucket=0| |day=2019-12-19/did_bucket=1| |day=2019-12-20/did_bucket=0| |day=2019-12-20/did_bucket=1| >>> df=sql('select * from lookalike_02022021_limited_showlog') >>> ... >>> ('spread_app_id', 'string', u'C10608') ('did', 'string', u'029d383c0cfd36dbe45ed42b2b784f06c04a6554b61c3919ea7b3681bc0fda39') ('adv_id', 'string', u'106559') ('media', 'string', u'native') ('slot_id', 'string', u'l2d4ec6csv') ('device_name', 'string', u'LIO-AN00') ('net_type', 'string', u'4G') ('price_model', 'string', u'CPC') ('action_time', 'string', u'2019-12-19 19:23:47.894') ('media_category', 'string', u'Huawei Reading') ('gender', 'int', 1) ('age', 'int', 4) ('keyword', 'string', u'info') ('keyword_index', 'int', 14) ('day', 'string', u'2019-12-19') ('did_bucket', 'string', u'1')
Log tables should be **partitioned by DAY and DID **.
The log unification process is performed in batches.
It processes data for each user partition separately and uses load_logs_in_minutes to load specific amount of log.
Here is a pseudocode for the process.
for each user-partition: start-time <- config finish-time <- config batch-size <- config while start-time > finish-time: batch-finish-time = start-time+batch-size read logs between start-time and batch-finish-time union logs save or append logs in partitioned tables ('day','user-did') start-time += batch-finish-time
Every batch of logs are unified according to following policies:
Add is_click=1 to click log
Add is_click=0 to show log
Union show and click logs
Add interval_starting_time
This value shows the start_time of an interval.
For example for the following time series and for interval_time_in_seconds=5
[100,101,102,103,104,105,106, 107,108,109]
the interval_starting_time is
[100,100,100,100,100,105,105,105,105,105]
Here is a sample of this stage output.
>>> sql('show partitions lookalike_02022021_limited_logs').show(100,False) +---------------------------+ |partition | +---------------------------+ |day=2019-12-19/did_bucket=0| |day=2019-12-19/did_bucket=1| |day=2019-12-20/did_bucket=0| |day=2019-12-20/did_bucket=1| +---------------------------+ >>> df=sql('select * from lookalike_02022021_limited_logs') >>> ... ('did', 'string', u'02842b445b7779b9b6cd86a7cde292f15cd10128a8a80f19bea7c310c49160df') ('is_click', 'int', 0) ('action_time', 'string', u'2019-12-19 13:04:14.708') ('keyword', 'string', u'video') ('keyword_index', 'int', 29) ('media', 'string', u'native') ('media_category', 'string', u'Huawei Video') ('net_type', 'string', u'WIFI') ('gender', 'int', 0) ('age', 'int', 5) ('adv_id', 'string', u'40014545') ('interval_starting_time', 'int', 1576713600) ('action_time_seconds', 'int', 1576789454) ('day', 'string', u'2019-12-19') ('did_bucket', 'string', u'0')
TODO
The model trainer reads records from Tfrecords and train the model. The built model is saved in HDFS.
The built model is compared against the previous model. If the new model performs better than the old model then it gets deployed and its name is stored in Zookeeper.
Here are the steps to deploy the model.
1. Pull a serving image : docker pull tensorflow/serving:latest-gpu 2. Put the saved model in .../lookalike/<date>/tfserving directory. 3. Run the serving image:
docker run -p 8501:8501 --mount type=bind,source=/tmp/tfserving,target=/models/<model_name> -e MODEL_NAME=<model_name> -t tensorflow/serving &
This part consists of one process “Score Table Generator”.
This module uses Keyword Table, Persona Table and Lookalike Model to generate Score Table. The scores are normalized.
The score generator runs whenever a new lookalike model is generated. The name of the score table is stored in Zookeeper to be used by API services.
graph TD style Model fill:#f9f,stroke:#333,stroke-width:4px Model{{"Lookalike Model"}} ---|REST CALL| SG["Score Generator"] DB4[("Keyword Table")] -->SG DB1[("Persona Table")] -->SG SG--->|Normalized Scores|SCT[("Score Table")]
The score table has the following schema.
Keyword-Score | |
---|---|
User-1 | {kw1:norm-score-11, kw2:norm-score-12, kw3:norm-score-13} |
User-2 | {kw1:norm-score-21, kw2:norm-score-22, kw3:norm-score-23} |
User-3 | {kw1:norm-score-31, kw2:norm-score-32, kw3:norm-score-33} |
This part describes the requirements, API, and high-level architecture for the Lookalike services.
Acronym | Meaning |
---|---|
ES | Elasticsearch |
LS | Lookalike Service |
SG | Score Generator |
ST | Score Table |
The Lookalike Service is a real-time service whose purpose is to take an existing seed audience of users and return a group of additional users who have similar interests. This provides an expanded audience to which advertisers can target their ad campaigns.
Experienced advertisers will develop an understanding of the audience that they want to target with their campaigns. This audience will be the users where the advertiser experiences the highest conversion rate for their metric of success. However, this approach can exclude a large number of their potential audience that would be similarly receptive to their advertising campaign. The Lookalike Service is intended to help advertisers expand the audience of their campaigns to users that are similar to their existing audiences that they would not otherwise know to target in their campaigns.
The Lookalike Service builds on the work done on the DIN model. From the DIN model, a correlation can be developed between users and topic keywords by the Score Generator. The Score Generator is an offline service that draws data from the DIN model and builds a score for each user for each of a given list of keywords. The score is a numeric metric for the affinity of this user for the topic represented by the given keyword. The scores for all users is stored in the Score Table.
graph TD user("DMP Console")--> A(["API Handler"]) style user fill:#f9f,stroke:#333,stroke-width:4px A-->I[["Extended-Audience-Reader"]] I---J[("HDFS Extended Audience")] A-->|Asynchronism|A1[["Audience-Extender"]] A1-->A2[["Queue"]] A2-->B[["Audience-Extender-Worker"]] B-->|step 1|H["Seed-Data-Reader"] B-->|step 2|C["Seed-Clustering"] B-->|step 3|D["Similarity-Table-Generator"] B-->|step 4|F["User-Selection"] F---J D---G[("Normalized User-Keyword Score Table")] H---G
This module filters Score Table based on seeds and extracts keyword score for each seed user.
It also builds a keyword reference list which corresponds to score list in the result dataframe.
The result dataframe has the following structure.
Keyword-Score | |
---|---|
Seed-user-1 | [norm-score-11, norm-score-12, norm-score-13] |
Seed-user-2 | [norm-score-21, norm-score-22, norm-score-23] |
Seed-user-3 | [norm-score-31, norm-score-32, norm-score-33] |
This module is to cluster seeds. The output of this subsystem is a dataframe of seed-clusters and keyword scores. The number of the produced clusters shows how well the seeds share a same feature. If the number of the clusters ends up very high it means that the system is probably not able to extend seeds in a productive way.
Keyword-Score | |
---|---|
Seed-cluster-1 | [score-11, score-12, score-13] |
Seed-cluster-2 | [score-21, score-22, score-23] |
Seed-cluster-3 | [score-31, score-32, score-33] |
This module produces user similarity dataframe. The result dataframe has the following schema.
|| top-N-similarity-scores|final-score| |:-------------| :------------: | |user-1| [similarity-score-11, similarity-score-12, similarity-score-13] |final-score-1| |user-2| [similarity-score-21, similarity-score-22, similarity-score-23] |final-score-2| |user-3| [similarity-score-31, similarity-score-32, similarity-score-33] |final-score-3|
Similarity Generator has iterative process to produce top-N-similarity-scores. The following is the algorithm flow chart.
graph TD A[/Get user score df/] C[Add empty top-N-similarity-scores column] C1[For each seed batch] F[Add seed-score-array column] F1[Add similarity-score-array column] G[Upate top-N-similarity-scores column] G1[Remove seed-score-array, similarity-score-array columns] D{More seed batches?} H[Add final score column from top-N-similarity-scores] Start([Start])-->A-->C-->C1-->F-->F1-->G-->G1-->D-->C1 D-->H-->E([End])
This module selects the top M users in similarity dataframe and saves the result in HDFS. The result HDFS file carries the job-id in the path.
Use of the Lookalike Service is a multi-step process due to the size of the potential audiences and the scale of processing involved in generating the lookalike audiences.
Due to the scale of the computation required when generating the similarities between users (millions x 100millions), the processing of the Lookalike Service is divided into a REST endpoint to initiate the processing and a REST endpoint to poll the service on the completion of the processing.
sequenceDiagram DMP Console->>+Service: Upload seed audience Service-->>-DMP Console: Job ID DMP Console->>+Service: Query process with Job ID Service-->>-DMP Console: Running DMP Console->>+Service: Query process with Job ID Service-->>-DMP Console: Completed + result file URI
Method: POST Signature: /lookalike/extend-audience Input: Multipart list of seeds (user dids) Output: Job reference ID
{ "job-id":"3746AFBBF63" }
This is an asynchronous call that initiates the processing of a lookalike audience and returns with a generated UID that should be used to poll for the completion of the request.
In the asynchronous process that is started:
The base directory that uploads are stored is read from Zookeeper. A UID will be generated for each upload request. The UID name will be used to create a subdirectory of the storage directory. The files will be uploaded into the subdirectory. The user IDs of the seed audience are loaded from the seed audience files.
The implementation is carried out by Audience-Extender service.
The top M most similar non-seed users to seeds are filtered and stored in HDFS file. M is loaded from Zookeeper.
Method: GET Signature: /lookalike/status/<job-id> Input: Job-ID which is unique ID string for the Lookalike Service request. Output: Status of the request. Possible responses can be Running and Finished. If job is Finished then the URI of result is presented.
{ "statue":"Finished", "url": "../lookalike/extended-audience/<job-id>/exetened-audience-1p" }
This part is implemented by Extended-Audience-Reader service. This service checks HDFS for the URL of the extended audience. The service follows predefined template to construct url from job-id. If the url exits, it means job is finished and the module returns the url of the HDFS file.
Configuration of the Lookalike Service will be stored in Zookeeper. The configuration parameters are:
Low performance on Spark operations can caused by these factors:
Spark is about parallel computations. Too low parallelism means the job will be running longer too high parallelism would require a lots of resource. So defining the degree of parallelism depends on the number of cores available in the cluster. Best way to decide a number of spark partitions in an RDD is to make the number of partitions equal to the number of cores over the cluster.
There are 2 properties which can be used to increase the level of parallelism -
spark.default.parallelism spark.sql.shuffle.partitions
spark.sql.shuffle.partitions
is used when you are dealing with spark SQL or dataframe API.
A right level of Parallelism means that a partition can be fit into a memory of one node. To achieve right level of Parallelism follow these steps:
a. Identify right about of memory for each executer. b. Partition data so that each partition can be fit into memory of a node. c. Use right number of executers. d. Respect partitions in queries
Data locality can have a major impact on the performance of Spark jobs. If data and the code that operates on it are together, then computation tends to be fast. But if code and data are separated, one must move to the other. Typically it is faster to ship serialized code from place to place than a chunk of data because code size is much smaller than data. Spark builds its scheduling around this general principle of data locality.
Calling groupBy()
, groupByKey()
, reduceByKey()
, join()
and similar functions on dataframe results in shuffling data between multiple executors and even machines and finally repartitions data into 200 partitions by default. Pyspark default defines shuffling partition to 200 using spark.sql.shuffle.partitions
configuration.
The project was run on the spark cluster version 2.3 with Java 8.
The Hadoop cluster has the ‘600GB’ Memory and ‘200’ V-Cores. The following command was used for each step of the pipeline.
spark-submit --master yarn --num-executors 20 --executor-cores 5 --executor-memory 8G --driver-memory 8G --conf spark.driver.maxResultSize=5g --conf spark.hadoop.hive.exec.dynamic.partition=true --conf spark.hadoop.hive.exec.dynamic.partition.mode=nonstrict <python-file> config.yml
This command engages 50% of the cluster (110 V-Cores) to carry out the operation.
The following is the elapsed time for each step of the pipeline.
|STEP|INPUT TABLE NAME|TABLE SIZE RECORDS|PARTITIONS|ELAPSED| |:-------------| :------------: |:------------: |-------------- | |main_clean.py | ads_cleanlog_0520 |1,036,183|NONE|51mins, 18sec| | | ads_showlog_0520 |44,946,000|| | | ads_persona_0520 |380,000|| |main_logs.py|lookalike_02242021_clicklog|251,271|DAY,DID|4mins, 41sec| ||lookalike_02242021_showlog|12,165,993|| |main_trainready.py|lookalike_02242021_logs|12,417,264|DAY,DID|15mins, 0sec|
One way to find a bottleneck is to measure the elapsed time for an operation.
Use the following code after a specific operation to measure the elapsed time.
import timeit def get_elapsed_time(df): start = timeit.default_timer() df.take(1) end = timeit.default_timer() return end-start
For example in the following pyspark code, the get_elapsed_time(df)
is called in 2 different places. Note, that the time measurement is from the beginning of the code up to the place whereget_elapsed_time(df)
is called.
trainready_table_temp batched_round = 1 for did_bucket in range(did_bucket_num): command = """SELECT * FROM {} WHERE did_bucket= '{}' """ df = hive_context.sql(command.format(trainready_table_temp, did_bucket)) df = collect_trainready(df) print(get_elapsed_time(df)) df = build_feature_array(df) print(get_elapsed_time(df)) for i, feature_name in enumerate(['interval_starting_time', 'interval_keywords', 'kwi', 'kwi_show_counts', 'kwi_click_counts']): df = df.withColumn(feature_name, col('metrics_list').getItem(i)) # Add did_index df = df.withColumn('did_index', monotonically_increasing_id()) df = df.select('age', 'gender', 'did', 'did_index', 'interval_starting_time', 'interval_keywords', 'kwi', 'kwi_show_counts', 'kwi_click_counts', 'did_bucket') mode = 'overwrite' if batched_round == 1 else 'append' write_to_table_with_partition(df, trainready_table, partition=('did_bucket'), mode=mode) batched_round += 1 return
To run the application test run.sh should be run. By running it, 4 lines of code would be run one after each other.
spark-submit --executor-memory 16G --driver-memory 24G --num-executors 16 --executor-cores 5 --master yarn --conf spark.driver.maxResultSize=8g seed_user_selector.py config.yml "29" ; spark-submit --executor-memory 16G --driver-memory 24G --num-executors 16 --executor-cores 5 --master yarn --conf spark.driver.maxResultSize=8g score_generator.py config.yml ; spark-submit --executor-memory 16G --driver-memory 24G --num-executors 16 --executor-cores 5 --master yarn --conf spark.driver.maxResultSize=8g distance_table_list.py config.yml ; spark-submit --executor-memory 16G --driver-memory 24G --num-executors 16 --executor-cores 5 --master yarn --conf spark.driver.maxResultSize=8g validation.py config.yml "29";
A brief description for run.sh is as following:
a. The first line of the code in the run.sh gets the config.yml and keyword index as an argument, create a list of seed users and write it to a hive table. The number of seed users is configurable and can be changed.
b. The second line gets the config.yml in the argument and trainready table as an input. score_generator.py send the instances to the Rest API and write the responses to the score table.
c. The third line gets the config file in the argument and score table as an input and create a distance table.
d. The last line in the run.sh file gets config file and keywords index in the argument. The validation.py is calculating the number of clicks among the lookalike extended users, in the specific keywords and compare it with the number of click in the random selection.