Update lookalike application
diff --git a/Model/lookalike-model/doc/SDD.md b/Model/lookalike-model/doc/SDD.md
deleted file mode 100644
index a757cac..0000000
--- a/Model/lookalike-model/doc/SDD.md
+++ /dev/null
@@ -1,679 +0,0 @@
-# Lookalike Model and Services
-
-
-
-## Background
-
-In Advertisement, lookalike models are used to build larger audiences from original audience (seeds). The larger audience reflects the characteristics of the seeds. This helps advertiser to reach a large number of new prospects.
-
-
-
-## System Diagram
-
-
-
-### Lookalike Data Preparation
-
-```mermaid
-graph TD
-    DB1[("Persona Table")]-->DP["Data Preparation"]
-    DB2[("Click Table")]-->DP
-    DB3[("Show Table")]-->DP
-    DB4[("Keyword Table")]-->DP
-      
-    DP-->TR[("Train-Ready Table")]
-    TR-->TF["TFRecords Generator"]
-    TF-->Trainer["Lookalike Model Trainer"]
-    
-    Trainer-->Model{{"Lookalike Model"}}
-    style Model fill:#f9f,stroke:#333,stroke-width:4px
-```
-
-### Lookalike Post Process
-
-```mermaid
-graph TD
-    style Model fill:#f9f,stroke:#333,stroke-width:4px
-		
-   	Model{{"Lookalike Model"}} ---|REST CALL| SG["Score Generator"]
-   	DB4[("Keyword Table")] -->SG
-   	DB1[("Persona Table")] -->SG
-   	SG-->SCT[("Score Table")]
-```
-
-### Lookalike Services
-
-```mermaid
-graph LR
- 	user("DMP Console")--> AH(["API Handler"])
-    style user fill:#f9f,stroke:#333,stroke-width:4px
-    
-    AH-->EAR[["Extended-Audience-Reader"]]
-    EAR--->HDFS[("HDFS Extended Audience")]
-    AH-->AE[["Audience-Extender"]]
-    ST[("Score Table")]-->AE
-    AE-->HDFS  
-```
-
-## Data Preparation
-
-### Overview
-
-Data needs to be cleaned and transformed prior training. The data that is used for building a lookalike model has to posses the following characteristics:
-
-+ User ID
-+ User interests
-
-The Lookalike model targets to setup the correlation between user and ads based on logged user behavior.  The log data got from product is usually noisy due to various reasons, such as: unstable slot_id, redundant records or missing data. The data pre-processing is then required to clean original logs data before the data can be feed into the model trainer. Cleaning steps include transforming the logs data to new data format, and generating new data tables. This document of the data frame transformations for logs data pre-processing is designed for clarifying the whole processing steps on logs data and guiding the coding logic on the data pre-processing of DIN model. 
-
-
-
-### Input data
-
-The input tables for Lookalike model are the following log files.
-
-+ Persona table
-
-```python
->>> df=sql('select * from ads_persona_0520')
->>> df.count()
-380000
->>> t1=df.take(1)
->>> m=map(lambda x:(x[0],x[1],t1[0][x[0]]),df.dtypes)
->>> for _ in m:
-...     print(_)
-...
-### USED Fields
-('did', 'string', u'7eb9401a7ba0e377ad2fd81315b39da088b2943d69ff89c3ba56149d0a166d11')
-('gender_new_dev', 'string', u'0')
-('forecast_age_dev', 'string', u'3')
-###
-```
-
-+ Show-Log table
-```python
-df=sql('select * from ads_showlog_0520')
-
-### USED Fields
-('did', 'string', u'fd939b9efd2d9c512bcd360c38d32eedf03f6a11a935295c817f8e61caa2f049')
-('adv_id', 'string', u'40023087')
-('adv_type', 'string', u'native')
-('slot_id', 'string', u'l03493p0r3')
-('spread_app_id', 'string', u'C10374976')
-('device_name', 'string', u'MAR-AL00')
-('net_type', 'string', u'4G')
-('adv_bill_mode_cd', 'string', u'CPC')
-('show_time', 'string', u'2020-01-07 16:22:18.129')
-###
-```
-
-+ Click-Log table
-
-```python
-df=sql('select * from ads_clicklog_0520')
-
-### USED Fields
-('did', 'string', u'fd939b9efd2d9c512bcd360c38d32eedf03f6a11a935295c817f8e61caa2f049')
-('adv_id', 'string', u'40023087')
-('adv_type', 'string', u'native')
-('slot_id', 'string', u'l03493p0r3')
-('spread_app_id', 'string', u'C10374976')
-('device_name', 'string', u'MAR-AL00')
-('net_type', 'string', u'4G')
-('adv_bill_mode_cd', 'string', u'CPC')
-('click_time', 'string', u'2020-01-07 16:22:18.129')
-###
-```
-
-
-
-### Clean Log Data
-
-1. **Clean Persona Table**
-
-	This operation is to clean persona table to:
-
-	+ Have distinct did, gender and age.
-	+ Have did associated to only one age and gender
-
-2. **Clean Show-log and Click-log Tables**
-
-	The cleaning operation is performed in batches.
-
-	Every batch is cleaned according to following policies:
-
-	+ Filter right slot-ids and add media-category
-	+ Add gender and age from persona table to each record of log
-	+ Add keyword to each row by using spread-app-id
-
-```python
->>> sql('show partitions lookalike_02022021_limited_clicklog').show(100,False)
-+---------------------------+
-|partition                  |
-+---------------------------+
-|day=2019-12-19/did_bucket=0|
-|day=2019-12-19/did_bucket=1|
-|day=2019-12-20/did_bucket=0|
-|day=2019-12-20/did_bucket=1|
-+---------------------------+
-
->>> sql('show partitions lookalike_02022021_limited_showlog').show(100,False)
-+---------------------------+
-|partition                  |
-+---------------------------+
-|day=2019-12-19/did_bucket=0|
-|day=2019-12-19/did_bucket=1|
-|day=2019-12-20/did_bucket=0|
-|day=2019-12-20/did_bucket=1|
-
->>> df=sql('select * from lookalike_02022021_limited_showlog')
->>> ...
->>> 
-('spread_app_id', 'string', u'C10608')
-('did', 'string', u'029d383c0cfd36dbe45ed42b2b784f06c04a6554b61c3919ea7b3681bc0fda39')
-('adv_id', 'string', u'106559')
-('media', 'string', u'native')
-('slot_id', 'string', u'l2d4ec6csv')
-('device_name', 'string', u'LIO-AN00')
-('net_type', 'string', u'4G')
-('price_model', 'string', u'CPC')
-('action_time', 'string', u'2019-12-19 19:23:47.894')
-('media_category', 'string', u'Huawei Reading')
-('gender', 'int', 1)
-('age', 'int', 4)
-('keyword', 'string', u'info')
-('keyword_index', 'int', 14)
-('day', 'string', u'2019-12-19')
-('did_bucket', 'string', u'1')
-```
-
-
-### Partitions
-
-Log tables should be **partitioned by DAY and DID **.
-
-
-
-### Log Unification
-
-The log unification process is performed in batches. 
-
-It processes data for each user partition separately and uses **load_logs_in_minutes** to load specific amount of log. 
-
-Here is a pseudocode for the process.
-
-```pseudocode
-for each user-partition:
-	start-time <- config
-	finish-time <- config
-	batch-size <- config
-	while start-time > finish-time:
-		batch-finish-time = start-time+batch-size
-		read logs between start-time and batch-finish-time
-		union logs
-		save or append logs in partitioned tables ('day','user-did')
-		start-time += batch-finish-time
-```
-
-Every batch of logs are unified according to following policies:
-
-+ Add is_click=1 to click log
-+ Add is_click=0 to show log
-+ Union show and click logs
-+ Add **interval_starting_time** 
-	
-	> This value shows the start_time of an interval. 
-	>
-	> For example for the following time series and for interval_time_in_seconds=5
-	>
-	> [100,101,102,103,104,105,106, 107,108,109]
-	>
-	> the interval_starting_time is
-	>
-	> [100,100,100,100,100,105,105,105,105,105]
-	
-
-Here is a sample of this stage output.
-
-```python
->>> sql('show partitions lookalike_02022021_limited_logs').show(100,False)
-+---------------------------+
-|partition                  |
-+---------------------------+
-|day=2019-12-19/did_bucket=0|
-|day=2019-12-19/did_bucket=1|
-|day=2019-12-20/did_bucket=0|
-|day=2019-12-20/did_bucket=1|
-+---------------------------+
-
->>> df=sql('select * from lookalike_02022021_limited_logs')
->>> ...
-
-('did', 'string', u'02842b445b7779b9b6cd86a7cde292f15cd10128a8a80f19bea7c310c49160df')
-('is_click', 'int', 0)
-('action_time', 'string', u'2019-12-19 13:04:14.708')
-('keyword', 'string', u'video')
-('keyword_index', 'int', 29)
-('media', 'string', u'native')
-('media_category', 'string', u'Huawei Video')
-('net_type', 'string', u'WIFI')
-('gender', 'int', 0)
-('age', 'int', 5)
-('adv_id', 'string', u'40014545')
-('interval_starting_time', 'int', 1576713600)
-('action_time_seconds', 'int', 1576789454)
-('day', 'string', u'2019-12-19')
-('did_bucket', 'string', u'0') 
-```
-
-### Build Train-Ready Data
-
-TODO
-
-
-
-## Model Training
-
-The model trainer reads records from Tfrecords and train the model. The built model is saved in HDFS.
-
-
-
-## Model Evaluation
-
-The built model is compared against the previous model. If the new model performs better than the old model then it gets deployed and its name is stored in Zookeeper.
-
-
-
-## Model Deployment
-
-Here are the steps to deploy the model.
-
-	1.	Pull a serving image : docker pull tensorflow/serving:latest-gpu
-	2.	Put the saved model in .../lookalike/<date>/tfserving directory. 
-	3.	Run the serving image: 
-
-```bash
-docker run -p 8501:8501 --mount type=bind,source=/tmp/tfserving,target=/models/<model_name> -e MODEL_NAME=<model_name> -t tensorflow/serving &
-```
-
-
-
-## Post Process
-
-This part consists of one process "Score Table Generator". 
-
-
-
-### Generate Score Table
-This module uses Keyword Table, Persona Table and Lookalike Model to generate Score Table. The scores are **normalized**.
-
-The score generator runs whenever a new lookalike model is generated.  The name of the score table is stored in Zookeeper to be used by API services.
-
-```mermaid
-graph TD
-    style Model fill:#f9f,stroke:#333,stroke-width:4px
-		
-   	Model{{"Lookalike Model"}} ---|REST CALL| SG["Score Generator"]
-   	DB4[("Keyword Table")] -->SG
-   	DB1[("Persona Table")] -->SG
-   	SG--->|Normalized Scores|SCT[("Score Table")]
-```
-
-The score table has the following schema. 
-
-|| Keyword-Score|
-|:-------------| :------------: |
-|User-1| {kw1:norm-score-11, kw2:norm-score-12, kw3:norm-score-13} |
-|User-2| {kw1:norm-score-21, kw2:norm-score-22, kw3:norm-score-23} |
-|User-3| {kw1:norm-score-31, kw2:norm-score-32, kw3:norm-score-33} |
-
-
-
-## API Services
-
-
-
-### Abstract
-
-This part describes the requirements, API, and high-level architecture for the Lookalike services.
-
-
-
-### List of Abbreviations
-
-| Acronym | Meaning           |
-| ------- | ----------------- |
-| ES      | Elasticsearch     |
-| LS      | Lookalike Service |
-| SG      | Score Generator   |
-| ST      | Score Table       |
-
-
-
-### Overview
-
-The Lookalike Service is a real-time service whose purpose is to take an existing seed audience of users and return a group of additional users who have similar interests.  This provides an expanded audience to which advertisers can target their ad campaigns.  
-
-Experienced advertisers will develop an understanding of the audience that they want to target with their campaigns.  This audience will be the users where the advertiser experiences the highest conversion rate for their metric of success.  However, this approach can exclude a large number of their potential audience that would be similarly receptive to their advertising campaign.  The Lookalike Service is intended to help advertisers expand the audience of their campaigns to users that are similar to their existing audiences that they would not otherwise know to target in their campaigns.
-
-The Lookalike Service builds on the work done on the DIN model.  From the DIN model, a correlation can be developed between users and topic keywords by the Score Generator.  The Score Generator is an offline service that draws data from the DIN model and builds a score for each user for each of a given list of keywords.  The score is a numeric metric for the affinity of this user for the topic represented by the given keyword.  The scores for all users is stored in the Score Table.
-
-
-
-##### System Entity Diagram
-
-```mermaid
-graph TD
- 	user("DMP Console")--> A(["API Handler"])
-    style user fill:#f9f,stroke:#333,stroke-width:4px
-    
-    A-->I[["Extended-Audience-Reader"]]
-    I---J[("HDFS Extended Audience")]
-    
-    A-->|Asynchronism|A1[["Audience-Extender"]]
-    A1-->A2[["Queue"]]
-    A2-->B[["Audience-Extender-Worker"]]
-    
-    B-->|step 1|H["Seed-Data-Reader"]
-    B-->|step 2|C["Seed-Clustering"]
-    B-->|step 3|D["Similarity-Table-Generator"]
-    B-->|step 4|F["User-Selection"]
-    F---J
-   
-    D---G[("Normalized User-Keyword Score Table")]
-    H---G
-```
-
-### Seed Data Reader
-
-This module filters Score Table based on seeds and extracts keyword score for each seed user. 
-
-It also builds a keyword reference list which corresponds to score list in the result dataframe.
-
-The result dataframe has the following structure.
-
-
-|| Keyword-Score|
-|:-------------| :------------: |
-|Seed-user-1| [norm-score-11, norm-score-12, norm-score-13] |
-|Seed-user-2| [norm-score-21, norm-score-22, norm-score-23] |
-|Seed-user-3| [norm-score-31, norm-score-32, norm-score-33] |
-
-
-
-
-### Seed Clustering
-
-This module is to cluster seeds. The output of this subsystem is a dataframe of seed-clusters and keyword scores. The number of the produced clusters shows how well the seeds share a same feature. If the number of the clusters ends up very high it means that the system is probably not able to extend seeds in a productive way.
-
-|| Keyword-Score|
-|:-------------| :------------: |
-|Seed-cluster-1| [score-11, score-12, score-13] |
-|Seed-cluster-2| [score-21, score-22, score-23] |
-|Seed-cluster-3| [score-31, score-32, score-33] |
-
-
-
-### Similarity Generator
-
-This module produces user similarity dataframe.  The result dataframe has the following schema.
-
-|| top-N-similarity-scores|final-score|
-|:-------------| :------------: |
-|user-1| [similarity-score-11, similarity-score-12, similarity-score-13] |final-score-1|
-|user-2| [similarity-score-21, similarity-score-22, similarity-score-23] |final-score-2|
-|user-3| [similarity-score-31, similarity-score-32, similarity-score-33] |final-score-3|
-
-
-
-Similarity Generator has iterative process to produce top-N-similarity-scores. The following is the algorithm flow chart. 
-
-```mermaid
-graph TD
-  A[/Get user score df/]
-  C[Add empty top-N-similarity-scores column]
-  C1[For each seed batch]
-  F[Add seed-score-array column]
-  F1[Add similarity-score-array column]
-  G[Upate top-N-similarity-scores column]
-  G1[Remove seed-score-array, similarity-score-array columns]
-  D{More seed batches?}
-  H[Add final score column from top-N-similarity-scores]
-  Start([Start])-->A-->C-->C1-->F-->F1-->G-->G1-->D-->C1
-  D-->H-->E([End])
-```
-
-
-
-### User Selector
-
-This module selects the top M users in similarity dataframe and saves the result in HDFS. The result HDFS file carries the job-id in the path.
-
-
-
-### APIs
-
-Use of the Lookalike Service is a multi-step process due to the size of the potential audiences and the scale of processing involved in generating the lookalike audiences.  
-
-Due to the scale of the computation required when generating the similarities between users (millions x 100millions), the processing of the Lookalike Service is divided into a REST endpoint to initiate the processing and a REST endpoint to poll the service on the completion of the processing.
-
-
-```mermaid
-sequenceDiagram
-    DMP Console->>+Service: Upload seed audience
-    Service-->>-DMP Console: Job ID
-    DMP Console->>+Service: Query process with Job ID
-    Service-->>-DMP Console: Running
-    DMP Console->>+Service: Query process with Job ID
-    Service-->>-DMP Console: Completed + result file URI 
-```
-
-#### Extend Audience API
-
-```API
-Method: POST
-Signature: /lookalike/extend-audience
-Input:
-	Multipart list of seeds (user dids)
-Output:
-	Job reference ID
-```
-+ Sample output
-```json
-{
-    "job-id":"3746AFBBF63"
-}
-```
-
-##### Implementation
-
-This is an asynchronous call that initiates the processing of a lookalike audience and returns with a generated UID that should be used to poll for the completion of the request. 
-
-In the asynchronous process that is started: 
-
-The base directory that uploads are stored is read from Zookeeper. A UID will be generated for each upload request.  The UID name will be used to create a subdirectory of the storage directory.  The files will be uploaded into the subdirectory.  The user IDs of the seed audience are loaded from the seed audience files.
-
-The implementation is carried out by **Audience-Extender** service.
-
-The top M most similar non-seed users to seeds are filtered and stored in HDFS file. M is loaded from Zookeeper.
-
-
-
-##### Query Process Completion
-
-```API
-Method: GET
-Signature: /lookalike/status/<job-id>
-Input:
-	Job-ID which is unique ID string for the Lookalike Service request.
-Output:
-	Status of the request. Possible responses can be Running and Finished. If job is Finished then the URI of result is presented.
-```
-
-+ Sample output
-```json
-{
-    "statue":"Finished",
-    "url": "../lookalike/extended-audience/<job-id>/exetened-audience-1p"
-}
-```
-
-##### Implementation
-
-This part is implemented by **Extended-Audience-Reader** service. This service checks HDFS for the URL of the extended audience. The service follows predefined template to construct url from job-id. If the url exits, it means job is finished and the module returns the url of the HDFS file.
-
-
-
-##### Configuration
-
-Configuration of the Lookalike Service will be stored in Zookeeper.  The configuration parameters are:
-
-- Hive table name for score table
-- Number of clusters to group existing audience
-- Number of highest similarities to include in average
-- Percentage of user extension
-
-
-
-# Spark Environment Tuning
-
-Low performance on Spark operations can caused by these factors:
-
-1. Level of Parallelism
-2. Data Locality
-
-
-
-### Level of Parallelism
-
-Spark is about parallel computations. Too low parallelism means the job will be running longer too high parallelism would require a lots of resource. So defining the degree of parallelism depends on the number of cores available in the cluster. **Best way to decide a number of spark partitions in an RDD is to make the number of partitions equal to the number of cores over the cluster.**
-
-There are 2 properties which can be used to increase the level of parallelism -
-
-```
-spark.default.parallelism
-spark.sql.shuffle.partitions
-```
-
-```spark.sql.shuffle.partitions``` is used when you are dealing with spark SQL or dataframe API.
-
-
-
-A right level of Parallelism means that a partition can be fit into a memory of one node. To achieve right level of Parallelism follow these steps:
-
-> a. Identify right about of memory for each executer.
-> b. Partition data so that each partition can be fit into memory of a node.
-> c. Use right number of executers.
-> d. Respect partitions in queries
-
-
-
-### Data Locality
-
-Data locality can have a major impact on the performance of Spark jobs. If data and the code that operates on it are together, then computation tends to be fast. But if code and data are separated, one must move to the other. Typically it is faster to ship serialized code from place to place than a chunk of data because code size is much smaller than data. Spark builds its scheduling around this general principle of data locality.
-
-Calling `groupBy()`, `groupByKey()`, `reduceByKey()`, `join()` and similar functions on dataframe results in shuffling data between multiple executors and even machines and finally repartitions data into 200 partitions by default. Pyspark default defines shuffling partition to 200 using `spark.sql.shuffle.partitions` configuration.
-
-
-
-### Experiment 
-
-The project was run on the spark cluster version 2.3 with Java 8.
-
-
-
-#### Spark Environment Settings
-
-The Hadoop cluster has the '600GB' Memory and '200' V-Cores.
-The following command was used for each step of the pipeline.
-
-```shell
-spark-submit --master yarn --num-executors 20 --executor-cores 5 --executor-memory 8G --driver-memory 8G --conf spark.driver.maxResultSize=5g --conf spark.hadoop.hive.exec.dynamic.partition=true --conf spark.hadoop.hive.exec.dynamic.partition.mode=nonstrict <python-file> config.yml
-```
-
-This command engages 50% of the cluster (110 V-Cores) to carry out the operation.
-
-
-
-#### Elapsed Time
-
-The following is the elapsed time for each step of the pipeline.
-
-
-
-|STEP|INPUT TABLE NAME|TABLE SIZE RECORDS|PARTITIONS|ELAPSED|
-|:-------------| :------------: |:------------: |-------------- |
-|main_clean.py  | ads_cleanlog_0520 |1,036,183|NONE|51mins, 18sec|
-|				| ads_showlog_0520 |44,946,000||
-|				| ads_persona_0520 |380,000||
-|main_logs.py|lookalike_02242021_clicklog|251,271|DAY,DID|4mins, 41sec|
-||lookalike_02242021_showlog|12,165,993||
-|main_trainready.py|lookalike_02242021_logs|12,417,264|DAY,DID|15mins, 0sec|
-
-
-
-#### Debugging for Performance Bottlenecks
-
-One way to find a bottleneck is to measure the elapsed time for an operation.
-
-Use the following code after a specific operation to measure the elapsed time.
-
-```python
-import timeit
-def get_elapsed_time(df):
-	start = timeit.default_timer()
-    df.take(1)
-    end = timeit.default_timer()
-    return end-start
-```
-
-For example in the following pyspark code, the `get_elapsed_time(df)` is called in 2 different places.  Note, that the time measurement is from the beginning of the code up to the place where`get_elapsed_time(df)` is called.
-
-```spark
- trainready_table_temp
-    batched_round = 1
-    for did_bucket in range(did_bucket_num):
-        command = """SELECT * 
-                        FROM {} 
-                        WHERE 
-                        did_bucket= '{}' """
-        df = hive_context.sql(command.format(trainready_table_temp, did_bucket))
-        df = collect_trainready(df)
-        print(get_elapsed_time(df))
-        
-        df = build_feature_array(df)
-        print(get_elapsed_time(df))
-        
-        for i, feature_name in enumerate(['interval_starting_time', 'interval_keywords', 'kwi', 'kwi_show_counts', 'kwi_click_counts']):
-            df = df.withColumn(feature_name, col('metrics_list').getItem(i))
-
-        # Add did_index
-        df = df.withColumn('did_index', monotonically_increasing_id())
-        df = df.select('age', 'gender', 'did', 'did_index', 'interval_starting_time', 'interval_keywords',
-                       'kwi', 'kwi_show_counts', 'kwi_click_counts', 'did_bucket')
-
-        mode = 'overwrite' if batched_round == 1 else 'append'
-        write_to_table_with_partition(df, trainready_table, partition=('did_bucket'), mode=mode)
-        batched_round += 1
-
-    return
-```
-
-#### Application testing
-
-To run the application test run.sh should be run. By running it, 4 lines of code would be run one after each other.
-
-```shell
-spark-submit --executor-memory 16G --driver-memory 24G  --num-executors 16 --executor-cores 5 --master yarn --conf spark.driver.maxResultSize=8g seed_user_selector.py config.yml "29" ;
-spark-submit --executor-memory 16G --driver-memory 24G  --num-executors 16 --executor-cores 5 --master yarn --conf spark.driver.maxResultSize=8g score_generator.py config.yml ;
-spark-submit --executor-memory 16G --driver-memory 24G  --num-executors 16 --executor-cores 5 --master yarn --conf spark.driver.maxResultSize=8g distance_table_list.py config.yml ;
-spark-submit --executor-memory 16G --driver-memory 24G  --num-executors 16 --executor-cores 5 --master yarn --conf spark.driver.maxResultSize=8g validation.py config.yml "29";
-```
-
-A brief description for run.sh is as following:
-
-> a. The first line of the code in the run.sh gets the config.yml and keyword index as an argument, create a list of seed users and write it to a hive table. The number of seed users is configurable and can be changed.  
-> b. The second line gets the config.yml in the argument and trainready table as an input. score_generator.py send the instances to the Rest API and write the responses to the score table.  
-> c. The third line gets the config file in the argument and score table as an input and create a distance table.  
-> d. The last line in the run.sh file gets config file and keywords index in the argument. The validation.py is calculating the number of clicks among the lookalike extended users, in the specific keywords and compare it with the number of click in the random selection.
-
diff --git a/Model/lookalike-model/lookalike_model/application/__init__.py b/Model/lookalike-model/lookalike_model/application/__init__.py
deleted file mode 100644
index 4fd2638..0000000
--- a/Model/lookalike-model/lookalike_model/application/__init__.py
+++ /dev/null
@@ -1,15 +0,0 @@
-#  Licensed to the Apache Software Foundation (ASF) under one
-#  or more contributor license agreements.  See the NOTICE file
-#  distributed with this work for additional information
-#  regarding copyright ownership.  The ASF licenses this file
-#  to you under the Apache License, Version 2.0 (the
-#  "License"); you may not use this file except in compliance
-#  with the License.  You may obtain a copy of the License at
-
-#  http://www.apache.org/licenses/LICENSE-2.0.html
-
-#  Unless required by applicable law or agreed to in writing, software
-#  distributed under the License is distributed on an "AS IS" BASIS,
-#  WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
-#  See the License for the specific language governing permissions and
-#  limitations under the License.
diff --git a/Model/lookalike-model/lookalike_model/application/config.yml b/Model/lookalike-model/lookalike_model/application/config.yml
deleted file mode 100644
index 239ea31..0000000
--- a/Model/lookalike-model/lookalike_model/application/config.yml
+++ /dev/null
@@ -1,14 +0,0 @@
-input:
-    log_table : "lookalike_03042021_logs"
-    did_table: "lookalike_03042021_trainready"
-    keywords_table: "din_ad_keywords_09172020"
-    test_table: "lookalike_trainready_jimmy_test"
-    din_model_tf_serving_url: "http://10.193.217.105:8506/v1/models/lookalike3:predict"
-    din_model_length: 20
-    seeduser_table : "lookalike_seeduser"
-    number_of_seeduser: 1000
-    extend: 2000
-output:
-    did_score_table: "lookalike_score_01112021"
-    did_score_table_norm:  "lookalike_score_norm_01112021"
-    similarity_table: "lookalike_similarity"
\ No newline at end of file
diff --git a/Model/lookalike-model/lookalike_model/application/distance_table.py b/Model/lookalike-model/lookalike_model/application/legacy_files/distance_table.py
similarity index 98%
rename from Model/lookalike-model/lookalike_model/application/distance_table.py
rename to Model/lookalike-model/lookalike_model/application/legacy_files/distance_table.py
index fcb9792..e82c018 100644
--- a/Model/lookalike-model/lookalike_model/application/distance_table.py
+++ b/Model/lookalike-model/lookalike_model/application/legacy_files/distance_table.py
@@ -42,10 +42,13 @@
     return _udf_distance
 
 def run(hive_context, cfg):
-    # load dataframes
-    lookalike_loaded_table_norm = cfg['output']['gucdocs_loaded_table_norm']
+    
+    # input tables
     keywords_table = cfg["input"]["keywords_table"]
     seeduser_table = cfg["input"]["seeduser_table"]
+    lookalike_loaded_table_norm = cfg['output']['gucdocs_loaded_table_norm']
+
+    # output dataframes
     lookalike_score_table = cfg["output"]["score_table"]
 
     command = "SELECT * FROM {}"
diff --git a/Model/lookalike-model/lookalike_model/application/distance_table_list.py b/Model/lookalike-model/lookalike_model/application/legacy_files/distance_table_list.py
similarity index 79%
rename from Model/lookalike-model/lookalike_model/application/distance_table_list.py
rename to Model/lookalike-model/lookalike_model/application/legacy_files/distance_table_list.py
index 89cb4e8..2217994 100644
--- a/Model/lookalike-model/lookalike_model/application/distance_table_list.py
+++ b/Model/lookalike-model/lookalike_model/application/legacy_files/distance_table_list.py
@@ -21,6 +21,21 @@
 import argparse
 from pyspark.sql.functions import udf
 import time
+import math
+
+'''
+spark-submit --executor-memory 16G --driver-memory 24G  --num-executors 16 --executor-cores 5 --master yarn --conf spark.driver.maxResultSize=8g distance_table_list.py config.yml
+'''
+
+
+def euclidean(l1):
+    def _euclidean(l2):
+        list = []
+        for item in l1:
+            similarity = 1 - (math.sqrt(sum([(item[i]-l2[i]) ** 2 for i in range(len(item))]))/math.sqrt(len(item)))
+            list.append(similarity)
+        return list
+    return _euclidean
 
 
 def dot(l1):
@@ -33,9 +48,11 @@
     return _dot
 
 
-
 def ux(l1):
-    _udf_similarity = udf(dot(l1), ArrayType(FloatType()) )
+    if alg == "euclidean":
+        _udf_similarity = udf(euclidean(l1), ArrayType(FloatType()))
+    if alg =="dot":
+        _udf_similarity = udf(dot(l1), ArrayType(FloatType()))
     return _udf_similarity
 
 
@@ -65,8 +82,9 @@
 udf_mean = udf(_mean, FloatType())
 
 def run(hive_context, cfg):
-    # load dataframes
-    lookalike_score_table_norm = cfg['output']['did_score_table_norm']
+
+    ## load dataframes
+    lookalike_score_table_norm = cfg['output']['score_norm_table']
     keywords_table = cfg["input"]["keywords_table"]
     seeduser_table = cfg["input"]["seeduser_table"]
     lookalike_similarity_table = cfg["output"]["similarity_table"]
@@ -78,7 +96,10 @@
 
 
     #### creating a tuple of did and kws for seed users
-    df = df.withColumn('kws_norm_list', udf_tolist(col('kws_norm')))
+    if alg == "dot":
+        df = df.withColumn('kws_norm_list', udf_tolist(col('kws_norm')))
+    if alg == "euclidean":
+        df = df.withColumn('kws_norm_list', udf_tolist(col('kws')))
     df_seed_user = df_seed_user.join(df.select('did','kws_norm_list'), on=['did'], how='left')
     seed_user_list = df_seed_user.select('did', 'kws_norm_list').collect()
 
@@ -115,6 +136,8 @@
     sc.setLogLevel('WARN')
     hive_context = HiveContext(sc)
 
+    ## select similarity algorithm
+    alg = cfg["input"]["alg"]
     run(hive_context=hive_context, cfg=cfg)
     sc.stop()
     end = time.time()
diff --git a/Model/lookalike-model/lookalike_model/application/pipeline/config.yml b/Model/lookalike-model/lookalike_model/application/pipeline/config.yml
new file mode 100644
index 0000000..7f8cb1d
--- /dev/null
+++ b/Model/lookalike-model/lookalike_model/application/pipeline/config.yml
@@ -0,0 +1,31 @@
+score_generator:
+    input:
+        log_table : "lookalike_03042021_logs"
+        did_table: "lookalike_03042021_trainready"
+        keywords_table: "din_ad_keywords_09172020"
+        test_table: "lookalike_trainready_jimmy_test"
+        din_model_tf_serving_url: "http://10.193.217.105:8506/v1/models/lookalike3:predict"
+        din_model_length: 20
+        seeduser_table : "lookalike_seeduser"
+        number_of_seeduser: 1000
+        extend: 2000
+        alg: "euclidean" ##### currently just support "euclideand" and "dot"
+    output:
+        did_score_table: "lookalike_score_01112021"
+        score_norm_table:  "lookalike_score_norm_01112021"
+        
+score_vector:
+    keywords_table: "din_ad_keywords_09172020"
+    score_norm_table:  "lookalike_score_norm_01112021"   
+    score_vector_table: "lookalike_score_vector_01112021" 
+    did_bucket_size: 2
+    did_bucket_step: 2
+score_vector_rebucketing:
+    did_bucket_size: 2
+    did_bucket_step: 2
+    alpha_did_bucket_size: 1000
+    score_vector_alpha_table: 'lookalike_score_vector_alpha_01112021'
+top_n_similarity:
+    alpha_did_bucket_step: 100
+    top_n: 100
+    similarity_table: "lookalike_similarity_01112021"
\ No newline at end of file
diff --git a/Model/lookalike-model/lookalike_model/application/pipeline/run.sh b/Model/lookalike-model/lookalike_model/application/pipeline/run.sh
new file mode 100644
index 0000000..6f2c804
--- /dev/null
+++ b/Model/lookalike-model/lookalike_model/application/pipeline/run.sh
@@ -0,0 +1,14 @@
+#!/bin/bash
+
+spark-submit --executor-memory 16G --driver-memory 24G  --num-executors 16 --executor-cores 5 --master yarn --conf spark.driver.maxResultSize=8g seed_user_selector.py config.yml "29"
+
+spark-submit --executor-memory 16G --driver-memory 24G  --num-executors 16 --executor-cores 5 --master yarn --conf spark.driver.maxResultSize=8g score_generator.py config.yml
+
+spark-submit --master yarn --num-executors 20 --executor-cores 5 --executor-memory 8G --driver-memory 8G --conf spark.driver.maxResultSize=5g --conf spark.hadoop.hive.exec.dynamic.partition=true --conf spark.hadoop.hive.exec.dynamic.partition.mode=nonstrict score_vector_table.py config.yml
+
+spark-submit --master yarn --num-executors 20 --executor-cores 5 --executor-memory 8G --driver-memory 8G --conf spark.driver.maxResultSize=5g --conf spark.hadoop.hive.exec.dynamic.partition=true --conf spark.hadoop.hive.exec.dynamic.partition.mode=nonstrict score_vector_rebucketing.py config.
+
+spark-submit --master yarn --num-executors 20 --executor-cores 5 --executor-memory 16G --driver-memory 16G --conf spark.driver.maxResultSize=5g --conf spark.hadoop.hive.exec.dynamic.partition=true --conf spark.hadoop.hive.exec.dynamic.partition.mode=nonstrict top_n_similarity_table_generator.py config.yml
+
+spark-submit --executor-memory 16G --driver-memory 24G  --num-executors 16 --executor-cores 5 --master yarn --conf spark.driver.maxResultSize=8g validation.py config.yml "29"
+
diff --git a/Model/lookalike-model/lookalike_model/application/score_generator.py b/Model/lookalike-model/lookalike_model/application/pipeline/score_generator.py
similarity index 77%
rename from Model/lookalike-model/lookalike_model/application/score_generator.py
rename to Model/lookalike-model/lookalike_model/application/pipeline/score_generator.py
index 16a0220..5248b10 100644
--- a/Model/lookalike-model/lookalike_model/application/score_generator.py
+++ b/Model/lookalike-model/lookalike_model/application/pipeline/score_generator.py
@@ -26,6 +26,15 @@
 import argparse
 from math import sqrt
 
+'''
+This process generates the score-norm-table with the following format.
+
+DataFrame[age: int, gender: int, did: string, did_index: bigint, 
+interval_starting_time: array<string>, interval_keywords: array<string>, 
+kwi: array<string>, kwi_show_counts: array<string>, kwi_click_counts: array<string>, 
+did_bucket: string, kws: map<string,float>, kws_norm: map<string,float>]
+
+'''
 
 
 def flatten(lst):
@@ -69,12 +78,11 @@
     return predictions
 
 
-
 def gen_mappings_media(hive_context, cfg):
     # this function generates mappings between the media category and the slots.
-    media_category_list = cfg["mapping"]["new_slot_id_media_category_list"]
+    media_category_list = cfg['score_generator']["mapping"]["new_slot_id_media_category_list"]
     media_category_set = set(media_category_list)
-    slot_id_list = cfg["mapping"]["new_slot_id_list"]
+    slot_id_list = cfg['score_generator']["mapping"]["new_slot_id_list"]
     # 1 vs 1: slot_id : media_category
     media_slot_mapping = dict()
     for media_category in media_category_set:
@@ -90,6 +98,7 @@
     df = hive_context.createDataFrame(media_slot_mapping_rows, schema)
     return df
 
+
 def normalize(x):
     c = 0
     for key, value in x.items():
@@ -100,6 +109,7 @@
         result[keyword] = value / C
     return result
 
+
 udf_normalize = udf(normalize, MapType(StringType(), FloatType()))
 
 
@@ -112,8 +122,6 @@
         self.df_did_loaded = None
         self.keyword_index_list, self.keyword_list = self.get_keywords()
 
-
-
     def get_keywords(self):
         keyword_index_list, keyword_list = list(), list()
         for dfk in self.df_keywords.collect():
@@ -143,22 +151,16 @@
 
                 return did_kw_scores
 
-
             return __helper
 
         self.df_did_loaded = self.df_did.withColumn('kws',
-                                                            udf(predict_udf(din_model_length=self.din_model_length,
-                                                                            din_model_tf_serving_url=self.din_model_tf_serving_url,
-                                                                            keyword_index_list=self.keyword_index_list,
-                                                                            keyword_list=self.keyword_list),
-                                                                MapType(StringType(), FloatType()))
-                                                            (col('did_index'), col('kwi_show_counts'),
-                                                             col('age'), col('gender')))
-
-
-
-
-
+                                                    udf(predict_udf(din_model_length=self.din_model_length,
+                                                                    din_model_tf_serving_url=self.din_model_tf_serving_url,
+                                                                    keyword_index_list=self.keyword_index_list,
+                                                                    keyword_list=self.keyword_list),
+                                                        MapType(StringType(), FloatType()))
+                                                    (col('did_index'), col('kwi_show_counts'),
+                                                     col('age'), col('gender')))
 
 
 if __name__ == "__main__":
@@ -168,23 +170,25 @@
     with open(args.config_file, 'r') as yml_file:
         cfg = yaml.safe_load(yml_file)
 
-
     sc = SparkContext.getOrCreate()
     sc.setLogLevel('WARN')
     hive_context = HiveContext(sc)
 
     # load dataframes
-    did_table, keywords_table, din_tf_serving_url, length = cfg["input"]["did_table"], cfg["input"]["keywords_table"],cfg["input"]["din_model_tf_serving_url"],cfg["input"]["din_model_length"]
+    did_table, keywords_table, din_tf_serving_url, length = cfg['score_generator']["input"]["did_table"],
+    cfg['score_generator']["input"]["keywords_table"],
+    cfg['score_generator']["input"]["din_model_tf_serving_url"],
+    cfg['score_generator']["input"]["din_model_length"]
 
     command = "SELECT * FROM {}"
     df_did = hive_context.sql(command.format(did_table))
     df_keywords = hive_context.sql(command.format(keywords_table))
-    ###### temporary adding to filter based on active keywords
-    df_keywords = df_keywords.filter( (df_keywords.keyword =="video") | (df_keywords.keyword =="shopping") | (df_keywords.keyword == "info") |
-                                      (df_keywords.keyword =="social") | (df_keywords.keyword =="reading") | (df_keywords.keyword =="travel") |
-                                      (df_keywords.keyword =="entertainment") )
-    did_loaded_table = cfg['output']['did_score_table']
-    did_score_table_norm = cfg['output']['did_score_table_norm']
+    # temporary adding to filter based on active keywords
+    df_keywords = df_keywords.filter((df_keywords.keyword == "video") | (df_keywords.keyword == "shopping") | (df_keywords.keyword == "info") |
+                                     (df_keywords.keyword == "social") | (df_keywords.keyword == "reading") | (df_keywords.keyword == "travel") |
+                                     (df_keywords.keyword == "entertainment"))
+    did_loaded_table = cfg['score_generator']['output']['did_score_table']
+    score_norm_table = cfg['score_generator']['output']['score_norm_table']
 
     # create a CTR score generator instance and run to get the loaded did
     ctr_score_generator = CTRScoreGenerator(df_did, df_keywords, din_tf_serving_url, length)
@@ -192,6 +196,6 @@
     df_did_loaded = ctr_score_generator.df_did_loaded
     df_did_loaded_norm = df_did_loaded.withColumn('kws_norm', udf_normalize(col('kws')))
 
-     # save the loaded did to hive table
+    # save the loaded did to hive table
     df_did_loaded_norm.write.option("header", "true").option(
-        "encoding", "UTF-8").mode("overwrite").format('hive').saveAsTable(did_score_table_norm)
+        "encoding", "UTF-8").mode("overwrite").format('hive').saveAsTable(score_norm_table)
diff --git a/Model/lookalike-model/lookalike_model/application/pipeline/score_vector_rebucketing.py b/Model/lookalike-model/lookalike_model/application/pipeline/score_vector_rebucketing.py
new file mode 100644
index 0000000..a947795
--- /dev/null
+++ b/Model/lookalike-model/lookalike_model/application/pipeline/score_vector_rebucketing.py
@@ -0,0 +1,103 @@
+#  Licensed to the Apache Software Foundation (ASF) under one
+#  or more contributor license agreements.  See the NOTICE file
+#  distributed with this work for additional information
+#  regarding copyright ownership.  The ASF licenses this file
+#  to you under the Apache License, Version 2.0 (the
+#  "License"); you may not use this file except in compliance
+#  with the License.  You may obtain a copy of the License at
+
+#  http://www.apache.org/licenses/LICENSE-2.0.html
+
+#  Unless required by applicable law or agreed to in writing, software
+#  distributed under the License is distributed on an "AS IS" BASIS,
+#  WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+#  See the License for the specific language governing permissions and
+#  limitations under the License.
+
+import yaml
+import argparse
+from pyspark import SparkContext
+from pyspark.sql import HiveContext
+from pyspark.sql.functions import lit, col, udf
+from pyspark.sql.types import FloatType, StringType, StructType, StructField, ArrayType, MapType, IntegerType
+# from rest_client import predict, str_to_intlist
+import requests
+import json
+import argparse
+from pyspark.sql.functions import udf
+from math import sqrt
+import time
+import hashlib
+
+'''
+
+To run, execute the following in application folder.
+spark-submit --master yarn --num-executors 20 --executor-cores 5 --executor-memory 8G --driver-memory 8G --conf spark.driver.maxResultSize=5g --conf spark.hadoop.hive.exec.dynamic.partition=true --conf spark.hadoop.hive.exec.dynamic.partition.mode=nonstrict score_vector_rebucketing.py config.yml
+
+This process generates added secondary buckects ids (alpha-did-bucket).
+
+'''
+
+
+def __save_as_table(df, table_name, hive_context, create_table):
+
+    if create_table:
+        command = """
+            DROP TABLE IF EXISTS {}
+            """.format(table_name)
+
+        hive_context.sql(command)
+
+        df.createOrReplaceTempView("r907_temp_table")
+
+        command = """
+            CREATE TABLE IF NOT EXISTS {} as select * from r907_temp_table
+            """.format(table_name)
+
+        hive_context.sql(command)
+
+
+def assign_new_bucket_id(df, n, new_column_name):
+    def __hash_sha256(s):
+        hex_value = hashlib.sha256(s.encode('utf-8')).hexdigest()
+        return int(hex_value, 16)
+    _udf = udf(lambda x: __hash_sha256(x) % n, IntegerType())
+    df = df.withColumn(new_column_name, _udf(df.did))
+    return df
+
+
+def run(hive_context, cfg):
+
+    score_vector_table = cfg['score_vector']['score_vector_table']
+    bucket_size = cfg['score_vector_rebucketing']['did_bucket_size']
+    bucket_step = cfg['score_vector_rebucketing']['did_bucket_step']
+    alpha_bucket_size = cfg['score_vector_rebucketing']['alpha_did_bucket_size']
+    score_vector_alpha_table = cfg['score_vector_rebucketing']['score_vector_alpha_table']
+
+    first_round = True
+    for start_bucket in range(0, bucket_size, bucket_step):
+        command = "SELECT did, did_bucket, score_vector FROM {} WHERE did_bucket BETWEEN {} AND {}".format(score_vector_table, start_bucket, start_bucket+bucket_size-1)
+
+        df = hive_context.sql(command)
+        df = assign_new_bucket_id(df, alpha_bucket_size, 'alpha_did_bucket')
+        df = df.select('did', 'did_bucket', 'score_vector', 'alpha_did_bucket')
+        __save_as_table(df, table_name=score_vector_alpha_table, hive_context=hive_context, create_table=first_round)
+        first_round = False
+
+
+if __name__ == "__main__":
+    start = time.time()
+    parser = argparse.ArgumentParser(description='')
+    parser.add_argument('config_file')
+    args = parser.parse_args()
+    with open(args.config_file, 'r') as yml_file:
+        cfg = yaml.safe_load(yml_file)
+
+    sc = SparkContext.getOrCreate()
+    sc.setLogLevel('WARN')
+    hive_context = HiveContext(sc)
+
+    run(hive_context=hive_context, cfg=cfg)
+    sc.stop()
+    end = time.time()
+    print('Runtime of the program is:', (end - start))
diff --git a/Model/lookalike-model/lookalike_model/application/pipeline/score_vector_table.py b/Model/lookalike-model/lookalike_model/application/pipeline/score_vector_table.py
new file mode 100644
index 0000000..91e8aa7
--- /dev/null
+++ b/Model/lookalike-model/lookalike_model/application/pipeline/score_vector_table.py
@@ -0,0 +1,111 @@
+#  Licensed to the Apache Software Foundation (ASF) under one
+#  or more contributor license agreements.  See the NOTICE file
+#  distributed with this work for additional information
+#  regarding copyright ownership.  The ASF licenses this file
+#  to you under the Apache License, Version 2.0 (the
+#  "License"); you may not use this file except in compliance
+#  with the License.  You may obtain a copy of the License at
+
+#  http://www.apache.org/licenses/LICENSE-2.0.html
+
+#  Unless required by applicable law or agreed to in writing, software
+#  distributed under the License is distributed on an "AS IS" BASIS,
+#  WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+#  See the License for the specific language governing permissions and
+#  limitations under the License.
+
+import yaml
+import argparse
+from pyspark import SparkContext
+from pyspark.sql import HiveContext
+from pyspark.sql.functions import lit, col, udf
+from pyspark.sql.types import FloatType, StringType, StructType, StructField, ArrayType, MapType
+# from rest_client import predict, str_to_intlist
+import requests
+import json
+import argparse
+from pyspark.sql.functions import udf
+from math import sqrt
+import time
+
+'''
+
+To run, execute the following in application folder.
+spark-submit --master yarn --num-executors 20 --executor-cores 5 --executor-memory 8G --driver-memory 8G --conf spark.driver.maxResultSize=5g --conf spark.hadoop.hive.exec.dynamic.partition=true --conf spark.hadoop.hive.exec.dynamic.partition.mode=nonstrict score_vector_table.py config.yml
+
+This process generates the score_vector_table table.
+
+The top-n-similarity table is 
+
+|user| score-vector | did-bucket
+|:-------------| :------------: |
+|user-1-did| [similarity-score-11, similarity-score-12, similarity-score-13] | 1
+|user-2-did| [similarity-score-21, similarity-score-22, similarity-score-23] | 1
+|user-3-did| [similarity-score-31, similarity-score-32, similarity-score-33] | 2
+
+'''
+
+
+def __save_as_table(df, table_name, hive_context, create_table):
+
+    if create_table:
+        command = """
+            DROP TABLE IF EXISTS {}
+            """.format(table_name)
+
+        hive_context.sql(command)
+
+        df.createOrReplaceTempView("r907_temp_table")
+
+        command = """
+            CREATE TABLE IF NOT EXISTS {} as select * from r907_temp_table
+            """.format(table_name)
+
+        hive_context.sql(command)
+
+
+def run(hive_context, cfg):
+
+    keywords_table = cfg["score_vector"]["keywords_table"]
+    score_norm_table = cfg['score_vector']['score_norm_table']
+    score_vector_table = cfg['score_vector']['score_vector_table']
+    bucket_size = cfg['score_vector']['did_bucket_size']
+    bucket_step = cfg['score_vector']['did_bucket_step']
+
+    # get kw list
+    keywords = hive_context.sql("SELECT DISTINCT(keyword) FROM {}".format(keywords_table)).collect()
+    keywords = [_['keyword'] for _ in keywords]
+    keywords = sorted(keywords)
+
+    # add score-vector iterativly
+    first_round = True
+    for start_bucket in range(0, bucket_size, bucket_step):
+        command = "SELECT did, did_bucket, kws FROM {} WHERE did_bucket BETWEEN {} AND {}".format(score_norm_table, start_bucket, start_bucket+bucket_size-1)
+
+        # |0004f3b4731abafa9ac54d04cb88782ed61d30531262decd799d91beb6d6246a|0         |
+        # [social -> 0.24231663, entertainment -> 0.20828941, reading -> 0.44120282, video -> 0.34497723, travel -> 0.3453492, shopping -> 0.5347804, info -> 0.1978679]|
+        df = hive_context.sql(command)
+        df = df.withColumn("score_vector",
+                           udf(lambda kws: [kws[keyword] if keyword in kws else 0.0 for keyword in keywords], ArrayType(FloatType()))(df.kws))
+
+        df = df.select('did', 'did_bucket', 'score_vector')
+        __save_as_table(df, table_name=score_vector_table, hive_context=hive_context, create_table=first_round)
+        first_round = False
+
+
+if __name__ == "__main__":
+    start = time.time()
+    parser = argparse.ArgumentParser(description=" ")
+    parser.add_argument('config_file')
+    args = parser.parse_args()
+    with open(args.config_file, 'r') as yml_file:
+        cfg = yaml.safe_load(yml_file)
+
+    sc = SparkContext.getOrCreate()
+    sc.setLogLevel('WARN')
+    hive_context = HiveContext(sc)
+
+    run(hive_context=hive_context, cfg=cfg)
+    sc.stop()
+    end = time.time()
+    print('Runtime of the program is:', (end - start))
diff --git a/Model/lookalike-model/lookalike_model/application/seed_user_selector.py b/Model/lookalike-model/lookalike_model/application/pipeline/seed_user_selector.py
similarity index 81%
rename from Model/lookalike-model/lookalike_model/application/seed_user_selector.py
rename to Model/lookalike-model/lookalike_model/application/pipeline/seed_user_selector.py
index ffb8925..5640652 100644
--- a/Model/lookalike-model/lookalike_model/application/seed_user_selector.py
+++ b/Model/lookalike-model/lookalike_model/application/pipeline/seed_user_selector.py
@@ -26,15 +26,17 @@
 
 '''
 
-def run(hive_context,cfg, kwi):
+
+def run(hive_context, cfg, kwi):
     seed_user_table = cfg['input']['seeduser_table']
-    log_table =cfg ['input']['log_table']
-    number_of_seeduser = cfg ['input']['number_of_seeduser']
+    log_table = cfg['input']['log_table']
+    number_of_seeduser = cfg['input']['number_of_seeduser']
+
     # command = "select * from (select * from {} where is_click=1 and keyword_index=29) as s join (select * from {} where is_click=1 and keyword_index=26) as b on b.did = s.did where s.gender = 1"
-    command = "select * from {} where is_click=1 and ( keyword_index={})"
-    df = hive_context.sql(command.format(log_table,kwi))
+    command = "SELECT * FROM {} WHERE is_click=1 AND keyword_index={}"
+    df = hive_context.sql(command.format(log_table, kwi))
     user_list = df.select('did').alias('did').distinct().limit(number_of_seeduser)
-    print('number of seed user is: ', user_list.count())
+    user_list.cache()
 
     user_list.write.option("header", "true").option(
         "encoding", "UTF-8").mode("overwrite").format('hive').saveAsTable(seed_user_table)
@@ -46,7 +48,7 @@
     """
     parser = argparse.ArgumentParser(description=" ")
     parser.add_argument('config_file')
-    parser.add_argument('kwi' )
+    parser.add_argument('kwi')
     args = parser.parse_args()
     kwi = args.kwi
     with open(args.config_file, 'r') as yml_file:
@@ -56,5 +58,5 @@
     sc.setLogLevel('WARN')
     hive_context = HiveContext(sc)
 
-    run(hive_context=hive_context,cfg=cfg, kwi=kwi)
-    sc.stop()
\ No newline at end of file
+    run(hive_context=hive_context, cfg=cfg, kwi=kwi)
+    sc.stop()
diff --git a/Model/lookalike-model/lookalike_model/application/pipeline/top_n_similarity_table_generator.py b/Model/lookalike-model/lookalike_model/application/pipeline/top_n_similarity_table_generator.py
new file mode 100644
index 0000000..ae28fbf
--- /dev/null
+++ b/Model/lookalike-model/lookalike_model/application/pipeline/top_n_similarity_table_generator.py
@@ -0,0 +1,128 @@
+#  Licensed to the Apache Software Foundation (ASF) under one
+#  or more contributor license agreements.  See the NOTICE file
+#  distributed with this work for additional information
+#  regarding copyright ownership.  The ASF licenses this file
+#  to you under the Apache License, Version 2.0 (the
+#  "License"); you may not use this file except in compliance
+#  with the License.  You may obtain a copy of the License at
+
+#  http://www.apache.org/licenses/LICENSE-2.0.html
+
+#  Unless required by applicable law or agreed to in writing, software
+#  distributed under the License is distributed on an "AS IS" BASIS,
+#  WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+#  See the License for the specific language governing permissions and
+#  limitations under the License.
+
+import yaml
+import argparse
+import pyspark.sql.functions as fn
+
+from pyspark import SparkContext
+from pyspark.sql import HiveContext
+from pyspark.sql.types import FloatType, StringType, StructType, StructField, ArrayType, MapType, StructType
+
+# from rest_client import predict, str_to_intlist
+import requests
+import json
+import argparse
+from pyspark.sql.functions import udf
+from math import sqrt
+import time
+import numpy as np
+import itertools
+import heapq
+
+'''
+This process generates the top-n-similarity table.
+
+spark-submit --master yarn --num-executors 20 --executor-cores 5 --executor-memory 16G --driver-memory 16G --conf spark.driver.maxResultSize=5g --conf spark.hadoop.hive.exec.dynamic.partition=true --conf spark.hadoop.hive.exec.dynamic.partition.mode=nonstrict top_n_similarity_table_generator.py config.yml
+
+The top-n-similarity table is 
+
+|user| top-N-similarity|top-n-users
+|:-------------| :------------: |
+|user-1-did| [similarity-score-11, similarity-score-12, similarity-score-13] |[user-did-1, user-did-2, user-did-3]|
+|user-2-did| [similarity-score-21, similarity-score-22, similarity-score-23] |[user-did-10, user-did-20, user-did-30]|
+|user-3-did| [similarity-score-31, similarity-score-32, similarity-score-33] |[user-did-23, user-did-87, user-did-45]|
+
+'''
+
+
+def __save_as_table(df, table_name, hive_context, create_table):
+
+    if create_table:
+        command = """
+            DROP TABLE IF EXISTS {}
+            """.format(table_name)
+
+        hive_context.sql(command)
+
+        df.createOrReplaceTempView("r907_temp_table")
+
+        command = """
+            CREATE TABLE IF NOT EXISTS {} as select * from r907_temp_table
+            """.format(table_name)
+
+        hive_context.sql(command)
+
+
+def run(sc, hive_context, cfg):
+
+    score_vector_alpha_table = cfg['score_vector_rebucketing']['score_vector_alpha_table']
+    similarity_table = cfg['top_n_similarity']['similarity_table']
+    N = cfg['top_n_similarity']['top_n']
+
+    command = "SELECT did, score_vector FROM {}".format(score_vector_alpha_table)
+
+    # |0004f3b4731abafa9ac54d04cb88782ed61d30531262decd799d91beb6d6246a|0         |
+    # [0.24231663, 0.20828941, 0.0]|
+    df = hive_context.sql(command)
+    df = df.withColumn('top_n_user_score', fn.array())
+
+    alpha_bucket_size = cfg['score_vector_rebucketing']['alpha_did_bucket_size']
+    alpha_bucket_step = cfg['top_n_similarity']['alpha_did_bucket_step']
+
+    first_round = True
+    for start_bucket in range(0, alpha_bucket_size,alpha_bucket_step):
+        command = "SELECT did, did_bucket, score_vector, alpha_did_bucket FROM {} WHERE alpha_did_bucket BETWEEN {} AND {}".format(score_vector_alpha_table, 
+        start_bucket, start_bucket + alpha_bucket_size - 1)
+
+        df_user = hive_context.sql(command)
+        block_user = df_user.select('did', 'score_vector').collect()
+        block_user = ([_['did'] for _ in block_user], [_['score_vector'] for _ in block_user])
+        block_user_broadcast = sc.broadcast(block_user)
+
+        def calculate_similarity(user_score_vector, top_n_user_score):
+            user_score_vector = np.array(user_score_vector)
+            dids, other_score_vectors = block_user_broadcast.value
+            other_score_vectors = np.array(other_score_vectors)
+            product = np.matmul(user_score_vector, other_score_vectors.transpose()).tolist()
+            user_score_s = list(itertools.izip(dids, product))
+            user_score_s.extend(top_n_user_score)
+            user_score_s = heapq.nlargest(N, user_score_s, key=lambda x: x[1])
+            return user_score_s
+
+        elements_type = StructType([StructField('did', StringType(), False), StructField('score', FloatType(), False)])
+        df = df.withColumn('top_n_user_score', udf(calculate_similarity, ArrayType(elements_type))(df.score_vector, df.top_n_user_score))
+    
+    __save_as_table(df.select('did', 'top_n_user_score'), similarity_table, hive_context, True)
+        
+
+
+if __name__ == "__main__":
+    start = time.time()
+    parser = argparse.ArgumentParser(description=" ")
+    parser.add_argument('config_file')
+    args = parser.parse_args()
+    with open(args.config_file, 'r') as yml_file:
+        cfg = yaml.safe_load(yml_file)
+
+    sc = SparkContext.getOrCreate()
+    sc.setLogLevel('INFO')
+    hive_context = HiveContext(sc)
+
+    run(sc=sc, hive_context=hive_context, cfg=cfg)
+    sc.stop()
+    end = time.time()
+    print('Runtime of the program is:', (end - start))
diff --git a/Model/lookalike-model/lookalike_model/application/validation.py b/Model/lookalike-model/lookalike_model/application/pipeline/validation.py
similarity index 100%
rename from Model/lookalike-model/lookalike_model/application/validation.py
rename to Model/lookalike-model/lookalike_model/application/pipeline/validation.py
diff --git a/Model/lookalike-model/lookalike_model/application/run.sh b/Model/lookalike-model/lookalike_model/application/run.sh
deleted file mode 100644
index c4b07e6..0000000
--- a/Model/lookalike-model/lookalike_model/application/run.sh
+++ /dev/null
@@ -1,6 +0,0 @@
-#!/bin/bash
-spark-submit --executor-memory 16G --driver-memory 24G  --num-executors 16 --executor-cores 5 --master yarn --conf spark.driver.maxResultSize=8g seed_user_selector.py config.yml "29" ;
-spark-submit --executor-memory 16G --driver-memory 24G  --num-executors 16 --executor-cores 5 --master yarn --conf spark.driver.maxResultSize=8g score_generator.py config.yml ;
-spark-submit --executor-memory 16G --driver-memory 24G  --num-executors 16 --executor-cores 5 --master yarn --conf spark.driver.maxResultSize=8g distance_table_list.py config.yml ;
-spark-submit --executor-memory 16G --driver-memory 24G  --num-executors 16 --executor-cores 5 --master yarn --conf spark.driver.maxResultSize=8g validation.py config.yml "29";
-
diff --git a/Model/lookalike-model/lookalike_model/trainer/model_multi_gpu.py b/Model/lookalike-model/lookalike_model/trainer/model_multi_gpu.py
new file mode 100644
index 0000000..c96a8de
--- /dev/null
+++ b/Model/lookalike-model/lookalike_model/trainer/model_multi_gpu.py
@@ -0,0 +1,391 @@
+import tensorflow as tf
+
+from dice import dice
+from tensorflow.python.ops.rnn_cell import GRUCell
+# from tensorflow.python.ops.rnn import dynamic_rn
+from rnn import dynamic_rnn
+USE_DICE = True
+USE_RNN = False #True
+
+class Model(object):
+
+  def __init__(self, user_count, item_count, cate_count, cate_list, predict_batch_size, predict_ads_num, reuse):
+
+    with tf.variable_scope('DinNet', reuse=reuse):
+        self.u = tf.placeholder(tf.int32, [None,]) # [B]
+        self.i = tf.placeholder(tf.int32, [None,]) # [B]
+        self.j = tf.placeholder(tf.int32, [None,]) # [B]
+        self.y = tf.placeholder(tf.int32, [None,]) # [B]
+        self.hist_i = tf.placeholder(tf.int32, [None, None]) # [B, T]
+        self.sl = tf.placeholder(tf.int32, [None,]) # [B]
+        self.lr = tf.placeholder(tf.float64, [])
+
+        hidden_units = 128
+
+        item_emb_w = tf.get_variable("item_emb_w", [item_count, hidden_units // 2])
+        item_b = tf.get_variable("item_b", [item_count], initializer=tf.constant_initializer(0.0))
+        cate_emb_w = tf.get_variable("cate_emb_w", [cate_count, hidden_units // 2])
+        cate_list = tf.convert_to_tensor(cate_list, dtype=tf.int64)
+
+        ic = tf.gather(cate_list, self.i)
+        i_emb = tf.concat(values = [
+            tf.nn.embedding_lookup(item_emb_w, self.i),
+            tf.nn.embedding_lookup(cate_emb_w, ic),
+            ], axis=1)
+        i_b = tf.gather(item_b, self.i)
+
+        jc = tf.gather(cate_list, self.j)
+        j_emb = tf.concat([
+            tf.nn.embedding_lookup(item_emb_w, self.j),
+            tf.nn.embedding_lookup(cate_emb_w, jc),
+            ], axis=1)
+        j_b = tf.gather(item_b, self.j)
+
+        hc = tf.gather(cate_list, self.hist_i)
+        h_emb = tf.concat([
+            tf.nn.embedding_lookup(item_emb_w, self.hist_i),
+            tf.nn.embedding_lookup(cate_emb_w, hc),
+            ], axis=2)
+
+        if USE_RNN:
+            rnn_outputs, _ = dynamic_rnn(GRUCell(hidden_units), inputs=h_emb, sequence_length=self.sl, dtype=tf.float32, scope='gru1')
+            hist_i =attention(i_emb, rnn_outputs, self.sl)
+        else:
+            hist_i =attention(i_emb, h_emb, self.sl)
+        #-- attention end ---
+
+        hist_i = tf.layers.batch_normalization(inputs = hist_i)
+        hist_i = tf.reshape(hist_i, [-1, hidden_units], name='hist_bn')
+        hist_i = tf.layers.dense(hist_i, hidden_units, name='hist_fcn')
+
+        u_emb_i = hist_i
+
+        if USE_RNN:
+            hist_j =attention(j_emb, rnn_outputs, self.sl)
+        else:
+            hist_j =attention(j_emb, h_emb, self.sl)
+        #-- attention end ---
+
+        hist_j = tf.layers.batch_normalization(inputs = hist_j, reuse=True)
+        hist_j = tf.reshape(hist_j, [-1, hidden_units], name='hist_bn')
+        hist_j = tf.layers.dense(hist_j, hidden_units, name='hist_fcn', reuse=True)
+
+        u_emb_j = hist_j
+        print('shapes:')
+        print(f'(u_emb_i, u_emb_j, i_emb, j_emb) -> ({u_emb_i.get_shape().as_list()}, {u_emb_j.get_shape().as_list()}, {i_emb.get_shape().as_list()}, {j_emb.get_shape().as_list()})')
+
+        #-- fcn begin -------
+        din_i = tf.concat([u_emb_i, i_emb], axis=-1)
+        din_i = tf.layers.batch_normalization(inputs=din_i, name='b1')
+
+        if USE_DICE:
+            d_layer_1_i = tf.layers.dense(din_i, 80, activation=None, name='f1')
+            d_layer_1_i = dice(d_layer_1_i, name='dice_1')
+            d_layer_2_i = tf.layers.dense(d_layer_1_i, 40, activation=None, name='f2')
+            d_layer_2_i = dice(d_layer_2_i, name='dice_2')
+        else:
+            d_layer_1_i = tf.layers.dense(din_i, 80, activation=tf.nn.sigmoid, name='f1')
+            d_layer_2_i = tf.layers.dense(d_layer_1_i, 40, activation=tf.nn.sigmoid, name='f2')
+
+        #if u want try dice change sigmoid to None and add dice layer like following two lines. You can also find model_dice.py in this folder.
+        d_layer_3_i = tf.layers.dense(d_layer_2_i, 1, activation=None, name='f3')
+        din_j = tf.concat([u_emb_j, j_emb], axis=-1)
+        din_j = tf.layers.batch_normalization(inputs=din_j, name='b1', reuse=True)
+
+        if USE_DICE:
+            d_layer_1_j = tf.layers.dense(din_j, 80, activation=None, name='f1', reuse=True)
+            d_layer_1_j = dice(d_layer_1_j, name='dice_1')
+            d_layer_2_j = tf.layers.dense(d_layer_1_j, 40, activation=None, name='f2', reuse=True)
+            d_layer_2_j = dice(d_layer_2_j, name='dice_2')
+        else:
+            d_layer_1_j = tf.layers.dense(din_j, 80, activation=tf.nn.sigmoid, name='f1', reuse=True)
+            d_layer_2_j = tf.layers.dense(d_layer_1_j, 40, activation=tf.nn.sigmoid, name='f2', reuse=True)
+
+        d_layer_3_j = tf.layers.dense(d_layer_2_j, 1, activation=None, name='f3', reuse=True)
+        d_layer_3_i = tf.reshape(d_layer_3_i, [-1])
+        d_layer_3_j = tf.reshape(d_layer_3_j, [-1])
+        x = i_b - j_b + d_layer_3_i - d_layer_3_j # [B]
+        self.logits = i_b + d_layer_3_i
+
+        # prediciton for selected items
+        # logits for selected item:
+        item_emb_all = tf.concat([
+            item_emb_w,
+            tf.nn.embedding_lookup(cate_emb_w, cate_list)
+            ], axis=1)
+        item_emb_sub = item_emb_all[:predict_ads_num,:]
+        item_emb_sub = tf.expand_dims(item_emb_sub, 0)
+        item_emb_sub = tf.tile(item_emb_sub, [predict_batch_size, 1, 1])
+        hist_sub =attention_multi_items(item_emb_sub, h_emb, self.sl)
+        #-- attention end ---
+
+        hist_sub = tf.layers.batch_normalization(inputs = hist_sub, name='hist_bn', reuse=tf.AUTO_REUSE)
+        hist_sub = tf.reshape(hist_sub, [-1, hidden_units])
+        hist_sub = tf.layers.dense(hist_sub, hidden_units, name='hist_fcn', reuse=tf.AUTO_REUSE)
+
+        u_emb_sub = hist_sub
+        item_emb_sub = tf.reshape(item_emb_sub, [-1, hidden_units])
+        din_sub = tf.concat([u_emb_sub, item_emb_sub], axis=-1)
+        din_sub = tf.layers.batch_normalization(inputs=din_sub, name='b1', reuse=True)
+        d_layer_1_sub = tf.layers.dense(din_sub, 80, activation=tf.nn.sigmoid, name='f1', reuse=True)
+        d_layer_2_sub = tf.layers.dense(d_layer_1_sub, 40, activation=tf.nn.sigmoid, name='f2', reuse=True)
+        d_layer_3_sub = tf.layers.dense(d_layer_2_sub, 1, activation=None, name='f3', reuse=True)
+        d_layer_3_sub = tf.reshape(d_layer_3_sub, [-1, predict_ads_num])
+        self.logits_sub = tf.sigmoid(item_b[:predict_ads_num] + d_layer_3_sub)
+        self.logits_sub = tf.reshape(self.logits_sub, [-1, predict_ads_num, 1])
+        #-- fcn end -------
+
+
+        self.mf_auc = tf.reduce_mean(tf.to_float(x > 0))
+        self.score_i = tf.sigmoid(i_b + d_layer_3_i)
+        self.score_j = tf.sigmoid(j_b + d_layer_3_j)
+        self.score_i = tf.reshape(self.score_i, [-1, 1])
+        self.score_j = tf.reshape(self.score_j, [-1, 1])
+        self.p_and_n = tf.concat([self.score_i, self.score_j], axis=-1)
+        print(f'p_and_n -> {self.p_and_n.get_shape().as_list()}')
+
+        # Step variable
+        self.global_step = tf.Variable(0, trainable=False, name='global_step')
+        self.global_epoch_step = tf.Variable(0, trainable=False, name='global_epoch_step')
+        self.global_epoch_step_op = tf.assign(self.global_epoch_step, self.global_epoch_step+1)
+
+        self.loss = tf.reduce_mean(
+            tf.nn.sigmoid_cross_entropy_with_logits(
+                logits=self.logits,
+                # labels=self.y)
+                labels=tf.cast(self.y, tf.float32))
+            )
+
+        self.trainable_params = tf.trainable_variables()
+        self.opt = tf.train.GradientDescentOptimizer(learning_rate=self.lr)
+        self.gradients = tf.gradients(self.loss, self.trainable_params)
+        self.clip_gradients, _ = tf.clip_by_global_norm(self.gradients, 5)
+        self.train_op = self.opt.apply_gradients(
+            zip(self.clip_gradients, self.trainable_params), global_step=self.global_step)
+
+
+  def train(self, sess, uij, l):
+    loss, _ = sess.run([self.loss, self.train_op], feed_dict={
+        self.u: uij[0],
+        self.i: uij[1],
+        self.y: uij[2],
+        self.hist_i: uij[3],
+        self.sl: uij[4],
+        self.lr: l,
+        })
+    return loss
+    # return loss, gradients
+
+  def eval(self, sess, uij):
+    u_auc, socre_p_and_n = sess.run([self.mf_auc, self.p_and_n], feed_dict={
+        self.u: uij[0],
+        self.i: uij[1],
+        self.j: uij[2],
+        self.hist_i: uij[3],
+        self.sl: uij[4],
+        })
+    return u_auc, socre_p_and_n
+  
+  def eval_logdata(self, sess, uij):
+      score_i = sess.run([self.score_i], feed_dict={
+          self.u: uij[0],
+          self.i: uij[1],
+          self.hist_i: uij[3],
+          self.sl: uij[4],
+      })
+      return score_i
+
+  def test(self, sess, uij):
+    return sess.run(self.logits_sub, feed_dict={
+        self.u: uij[0],
+        self.i: uij[1],
+        self.j: uij[2],
+        self.hist_i: uij[3],
+        self.sl: uij[4],
+        })
+
+  def save(self, sess, path):
+    saver = tf.train.Saver()
+    saver.save(sess, save_path=path)
+
+  def restore(self, sess, path):
+    saver = tf.train.Saver()
+    saver.restore(sess, save_path=path)
+
+
+def extract_axis_1(data, ind):
+  batch_range = tf.range(tf.shape(data)[0])
+  indices = tf.stack([batch_range, ind], axis=1)
+  res = tf.gather_nd(data, indices)
+  return res
+
+
+def attention(queries, keys, keys_length):
+  '''
+    queries:     [B, H]
+    keys:        [B, T, H]
+    keys_length: [B]
+  '''
+  queries_hidden_units = queries.get_shape().as_list()[-1]
+  queries = tf.tile(queries, [1, tf.shape(keys)[1]])
+  queries = tf.reshape(queries, [-1, tf.shape(keys)[1], queries_hidden_units])
+  din_all = tf.concat([queries, keys, queries-keys, queries*keys], axis=-1)
+  d_layer_1_all = tf.layers.dense(din_all, 80, activation=tf.nn.sigmoid, name='f1_att', reuse=tf.AUTO_REUSE)
+  d_layer_2_all = tf.layers.dense(d_layer_1_all, 40, activation=tf.nn.sigmoid, name='f2_att', reuse=tf.AUTO_REUSE)
+  d_layer_3_all = tf.layers.dense(d_layer_2_all, 1, activation=None, name='f3_att', reuse=tf.AUTO_REUSE)
+  d_layer_3_all = tf.reshape(d_layer_3_all, [-1, 1, tf.shape(keys)[1]])
+  outputs = d_layer_3_all 
+  # Mask
+  key_masks = tf.sequence_mask(keys_length, tf.shape(keys)[1])   # [B, T]
+  key_masks = tf.expand_dims(key_masks, 1) # [B, 1, T]
+  paddings = tf.ones_like(outputs) * (-2 ** 32 + 1)
+  outputs = tf.where(key_masks, outputs, paddings)  # [B, 1, T]
+
+  # Scale
+  outputs = outputs / (keys.get_shape().as_list()[-1] ** 0.5)
+
+  # Activation
+  outputs = tf.nn.softmax(outputs)  # [B, 1, T]
+
+  # Weighted sum
+  outputs = tf.matmul(outputs, keys)  # [B, 1, H]
+
+  return outputs
+
+
+def attention_multi_items(queries, keys, keys_length):
+  '''
+    queries:     [B, N, H] N is the number of ads
+    keys:        [B, T, H] 
+    keys_length: [B]
+  '''
+  queries_hidden_units = queries.get_shape().as_list()[-1]
+  queries_nums = queries.get_shape().as_list()[1]
+  queries = tf.tile(queries, [1, 1, tf.shape(keys)[1]])
+  queries = tf.reshape(queries, [-1, queries_nums, tf.shape(keys)[1], queries_hidden_units]) # shape : [B, N, T, H]
+  max_len = tf.shape(keys)[1]
+  keys = tf.tile(keys, [1, queries_nums, 1])
+  keys = tf.reshape(keys, [-1, queries_nums, max_len, queries_hidden_units]) # shape : [B, N, T, H]
+  din_all = tf.concat([queries, keys, queries-keys, queries*keys], axis=-1)
+  d_layer_1_all = tf.layers.dense(din_all, 80, activation=tf.nn.sigmoid, name='f1_att', reuse=tf.AUTO_REUSE)
+  d_layer_2_all = tf.layers.dense(d_layer_1_all, 40, activation=tf.nn.sigmoid, name='f2_att', reuse=tf.AUTO_REUSE)
+  d_layer_3_all = tf.layers.dense(d_layer_2_all, 1, activation=None, name='f3_att', reuse=tf.AUTO_REUSE)
+  d_layer_3_all = tf.reshape(d_layer_3_all, [-1, queries_nums, 1, max_len])
+  outputs = d_layer_3_all 
+  # Mask
+  key_masks = tf.sequence_mask(keys_length, max_len)   # [B, T]
+  key_masks = tf.tile(key_masks, [1, queries_nums])
+  key_masks = tf.reshape(key_masks, [-1, queries_nums, 1, max_len]) # shape : [B, N, 1, T]
+  paddings = tf.ones_like(outputs) * (-2 ** 32 + 1)
+  outputs = tf.where(key_masks, outputs, paddings)  # [B, N, 1, T]
+
+  # Scale
+  outputs = outputs / (keys.get_shape().as_list()[-1] ** 0.5)
+
+  # Activation
+  outputs = tf.nn.softmax(outputs)  # [B, N, 1, T]
+  outputs = tf.reshape(outputs, [-1, 1, max_len])
+  keys = tf.reshape(keys, [-1, max_len, queries_hidden_units])
+
+  # Weighted sum
+  outputs = tf.matmul(outputs, keys)
+  outputs = tf.reshape(outputs, [-1, queries_nums, queries_hidden_units])  # [B, N, 1, H]
+  print(f'outputs -> {outputs.get_shape().as_list()}')
+  return outputs
+
+
+def DIN(i, j, y, hist_i, sl, item_count, cate_count, cate_list, reuse, is_training):
+
+    with tf.variable_scope('DinNet', reuse=reuse):
+
+        hidden_units = 128
+
+        item_emb_w = tf.get_variable("item_emb_w", [item_count, hidden_units // 2])
+        item_b = tf.get_variable("item_b", [item_count], initializer=tf.constant_initializer(0.0))
+        cate_emb_w = tf.get_variable("cate_emb_w", [cate_count, hidden_units // 2])
+        cate_list = tf.convert_to_tensor(cate_list, dtype=tf.int64)
+
+        ic = tf.gather(cate_list, i)
+        i_emb = tf.concat(values = [
+            tf.nn.embedding_lookup(item_emb_w, i),
+            tf.nn.embedding_lookup(cate_emb_w, ic),
+            ], axis=1)
+        i_b = tf.gather(item_b, i)
+
+        jc = tf.gather(cate_list, j)
+        j_emb = tf.concat([
+            tf.nn.embedding_lookup(item_emb_w, j),
+            tf.nn.embedding_lookup(cate_emb_w, jc),
+            ], axis=1)
+        j_b = tf.gather(item_b, j)
+
+        hc = tf.gather(cate_list, hist_i)
+        h_emb = tf.concat([
+            tf.nn.embedding_lookup(item_emb_w, hist_i),
+            tf.nn.embedding_lookup(cate_emb_w, hc),
+            ], axis=2)
+
+        if USE_RNN:
+            rnn_outputs, _ = dynamic_rnn(GRUCell(hidden_units), inputs=h_emb, sequence_length=sl, dtype=tf.float32, scope='gru1')
+            hist_i =attention(i_emb, rnn_outputs, sl)
+        else:
+            hist_i =attention(i_emb, h_emb, sl)
+        #-- attention end ---
+
+        hist_i = tf.layers.batch_normalization(inputs = hist_i)
+        hist_i = tf.reshape(hist_i, [-1, hidden_units], name='hist_bn')
+        hist_i = tf.layers.dense(hist_i, hidden_units, name='hist_fcn')
+
+        u_emb_i = hist_i
+
+        if USE_RNN:
+            hist_j =attention(j_emb, rnn_outputs, sl)
+        else:
+            hist_j =attention(j_emb, h_emb, sl)
+        #-- attention end ---
+
+        # hist_j = tf.layers.batch_normalization(inputs = hist_j)
+        hist_j = tf.layers.batch_normalization(inputs = hist_j, reuse=True)
+        hist_j = tf.reshape(hist_j, [-1, hidden_units], name='hist_bn')
+        hist_j = tf.layers.dense(hist_j, hidden_units, name='hist_fcn', reuse=True)
+
+        u_emb_j = hist_j
+        print('shapes:')
+        print(f'(u_emb_i, u_emb_j, i_emb, j_emb) -> ({u_emb_i.get_shape().as_list()}, {u_emb_j.get_shape().as_list()}, {i_emb.get_shape().as_list()}, {j_emb.get_shape().as_list()})')
+
+        #-- fcn begin -------
+        din_i = tf.concat([u_emb_i, i_emb], axis=-1)
+        din_i = tf.layers.batch_normalization(inputs=din_i, name='b1')
+
+        if USE_DICE:
+            d_layer_1_i = tf.layers.dense(din_i, 80, activation=None, name='f1')
+            d_layer_1_i = dice(d_layer_1_i, name='dice_1')
+            d_layer_2_i = tf.layers.dense(d_layer_1_i, 40, activation=None, name='f2')
+            d_layer_2_i = dice(d_layer_2_i, name='dice_2')
+        else:
+            d_layer_1_i = tf.layers.dense(din_i, 80, activation=tf.nn.sigmoid, name='f1')
+            d_layer_2_i = tf.layers.dense(d_layer_1_i, 40, activation=tf.nn.sigmoid, name='f2')
+
+        #if u want try dice change sigmoid to None and add dice layer like following two lines. You can also find model_dice.py in this folder.
+        d_layer_3_i = tf.layers.dense(d_layer_2_i, 1, activation=None, name='f3')
+        din_j = tf.concat([u_emb_j, j_emb], axis=-1)
+        din_j = tf.layers.batch_normalization(inputs=din_j, name='b1', reuse=True)
+
+        if USE_DICE:
+            d_layer_1_j = tf.layers.dense(din_j, 80, activation=None, name='f1', reuse=True)
+            d_layer_1_j = dice(d_layer_1_j, name='dice_1')
+            d_layer_2_j = tf.layers.dense(d_layer_1_j, 40, activation=None, name='f2', reuse=True)
+            d_layer_2_j = dice(d_layer_2_j, name='dice_2')
+        else:
+            d_layer_1_j = tf.layers.dense(din_j, 80, activation=tf.nn.sigmoid, name='f1', reuse=True)
+            d_layer_2_j = tf.layers.dense(d_layer_1_j, 40, activation=tf.nn.sigmoid, name='f2', reuse=True)
+
+        d_layer_3_j = tf.layers.dense(d_layer_2_j, 1, activation=None, name='f3', reuse=True)
+        d_layer_3_i = tf.reshape(d_layer_3_i, [-1])
+        d_layer_3_j = tf.reshape(d_layer_3_j, [-1])
+        x = i_b - j_b + d_layer_3_i - d_layer_3_j # [B]
+        logits = i_b + d_layer_3_i
+
+        logits = tf.sigmoid(logits) if not is_training else logits
+
+    return logits
diff --git a/Model/lookalike-model/lookalike_model/trainer/train_multi_gpu.py b/Model/lookalike-model/lookalike_model/trainer/train_multi_gpu.py
new file mode 100644
index 0000000..e0d65a8
--- /dev/null
+++ b/Model/lookalike-model/lookalike_model/trainer/train_multi_gpu.py
@@ -0,0 +1,303 @@
+import os
+os.environ['CUDA_VISIBLE_DEVICES'] = "0,1,2,3"
+
+import time
+import pickle
+import random
+import numpy as np
+import tensorflow as tf
+import sys
+from input import DataInput, DataInputTest
+from model_multi_gpu import Model, DIN
+from sklearn.metrics import roc_auc_score
+
+random.seed(1234)
+np.random.seed(1234)
+tf.set_random_seed(1234)
+
+train_batch_size = 2048 #1024
+test_batch_size = 2048 #1024
+predict_batch_size = 32
+predict_users_num = 1000
+# predict_ads_num = 6
+predict_ads_num = 30
+time_interval_num = 10
+gpu_num = 3
+
+with open('ad_dataset_lookalike_jimmy_' + str(time_interval_num) + '.pkl', 'rb') as f:
+  train_set = pickle.load(f)
+  test_set = pickle.load(f)
+  cate_list = pickle.load(f)
+  user_count, item_count, cate_count = pickle.load(f)
+
+with open('label_lookalike_jimmy_' + str(time_interval_num) + '.pkl', 'rb') as f:
+    test_lb = pickle.load(f)
+test_set_with_label = []
+for i in range(len(test_lb)):
+    test_set_with_label.append(tuple([test_set[i][0], test_set[i][1], test_set[i][2][0], test_lb[i]]))
+
+best_auc = 0.0
+best_auc_train = 0.0
+best_auc_test = 0.0
+def calc_auc(raw_arr):
+    """Summary
+
+    Args:
+        raw_arr (TYPE): Description
+
+    Returns:
+        TYPE: Description
+    """
+    # sort by pred value, from small to big
+    arr = sorted(raw_arr, key=lambda d:d[2])
+
+    auc = 0.0
+    fp1, tp1, fp2, tp2 = 0.0, 0.0, 0.0, 0.0
+    for record in arr:
+        fp2 += record[0] # noclick
+        tp2 += record[1] # click
+        auc += (fp2 - fp1) * (tp2 + tp1)
+        fp1, tp1 = fp2, tp2
+
+    # if all nonclick or click, disgard
+    threshold = len(arr) - 1e-3
+    if tp2 > threshold or fp2 > threshold:
+        return -0.5
+
+    if tp2 * fp2 > 0.0:  # normal auc
+        return (1.0 - auc / (2.0 * tp2 * fp2))
+    else:
+        return None
+
+
+def _auc_arr(score):
+  score_p = score[:,0]
+  score_n = score[:,1]
+  #print "============== p ============="
+  #print score_p
+  #print "============== n ============="
+  #print score_n
+  score_arr = []
+  for s in score_p.tolist():
+    score_arr.append([0, 1, s])
+  for s in score_n.tolist():
+    score_arr.append([1, 0, s])
+  return score_arr
+
+
+def _eval(sess, model):
+  auc_sum = 0.0
+  score_arr = []
+  for _, uij in DataInputTest(test_set, test_batch_size):
+    auc_, score_ = model.eval(sess, uij)
+    score_arr += _auc_arr(score_)
+    auc_sum += auc_ * len(uij[0])
+  test_gauc = auc_sum / len(test_set)
+  Auc = calc_auc(score_arr)
+  global best_auc
+  if best_auc < test_gauc:
+    best_auc = test_gauc
+    model.save(sess, 'save_path/ckpt')
+  return test_gauc, Auc
+
+
+def _eval_logdata(sess, model, dataset, batch_size, type):
+    score_arr = []
+    y = []
+    for _, uij in DataInput(dataset, batch_size):
+        score_ = model.eval_logdata(sess, uij)
+        score_arr.append(np.squeeze(score_[0]))
+        y.append(np.asarray(uij[2]))
+    # score_arr = np.vstack(score_arr)
+    score_arr = np.hstack(score_arr)
+    y = np.hstack(np.asarray(y))
+    Auc = roc_auc_score(y, score_arr)
+
+    global best_auc_train
+    global best_auc_test
+
+    if type=='train' and best_auc_train<Auc:
+        best_auc_train = Auc
+    if type=='test' and best_auc_test<Auc:
+        best_auc_test = Auc
+        model.save(sess, 'save_path/ckpt')
+    return Auc
+
+def _eval_logdata_improve(sess, dataset, batch_size, type):
+    score_arr = []
+    y = []
+    for _, uij in DataInput(dataset, batch_size):
+        score_ = sess.run([batch_logits_test], feed_dict={
+            i_batch: uij[1],
+            j_batch: uij[2],
+            y_batch: uij[2],
+            hist_i_batch: uij[3],
+            sl_batch: uij[4]
+        })
+
+        score_arr.append(np.squeeze(score_[0]))
+        y.append(np.asarray(uij[2]))
+    score_arr = np.hstack(score_arr)
+    y = np.hstack(np.asarray(y))
+    Auc = roc_auc_score(y[:len(score_arr)], score_arr)
+
+    global best_auc_train
+    global best_auc_test
+
+    if type=='train' and best_auc_train<Auc:
+        best_auc_train = Auc
+    if type=='test' and best_auc_test<Auc:
+        best_auc_test = Auc
+        saver = tf.train.Saver()
+        saver.save(sess, save_path='save_path/ckpt')
+
+    return Auc
+
+def _test(sess, model):
+  auc_sum = 0.0
+  score_arr = []
+  predicted_users_num = 0
+  print("test sub items")
+  for _, uij in DataInputTest(test_set, predict_batch_size):
+    if predicted_users_num >= predict_users_num:
+        break
+    score_ = model.test(sess, uij)
+    score_arr.append(score_)
+    predicted_users_num += predict_batch_size
+  return score_[0]
+
+
+PS_OPS = ['Variable', 'VariableV2', 'AutoReloadVariable']
+def assign_to_device(device, ps_device='/cpu:0'):
+    def _assign(op):
+        node_def = op if isinstance(op, tf.NodeDef) else op.node_def
+        if node_def.op in PS_OPS:
+            return "/" + ps_device
+        else:
+            return device
+
+    return _assign
+
+
+def average_gradients(tower_grads):
+    average_grads, cnt = [], 0
+    for grad_and_vars in zip(*tower_grads):
+        grads = []
+        for g, _ in grad_and_vars:
+            if g is not None:
+                expanded_g = tf.expand_dims(g, 0)
+            else:
+                expanded_g = None
+
+            grads.append(expanded_g)
+
+        if all(x is None for x in grads):
+            grad = None
+        else:
+            grad = tf.concat(grads, 0)
+            grad = tf.reduce_mean(grad, 0)
+
+        v = grad_and_vars[0][1]
+        grad_and_var = (grad, v)
+        average_grads.append(grad_and_var)
+        cnt += 1
+
+        rlt_grads = [v[0] for v in average_grads]
+        rlt_vars = [v[1] for v in average_grads]
+    return average_grads
+    # return rlt_grads, rlt_vars
+
+if min(cate_list)>0:
+    item_count += 1
+
+lr = 1.0  # 0.1
+reuse_vars = False
+
+tf.reset_default_graph()
+
+with tf.device('/cpu:0'):
+    tower_grads, tower_logits_test = [], []
+    reuse_vars = False
+
+    # tf Graph input
+    i_batch = tf.placeholder(tf.int32, [None,]) # [B]
+    j_batch = tf.placeholder(tf.int32, [None,]) # [B]
+    y_batch = tf.placeholder(tf.int32, [None,]) # [B]
+    hist_i_batch = tf.placeholder(tf.int32, [None, None]) # [B, T]
+    sl_batch = tf.placeholder(tf.int32, [None,]) # [B]
+
+    for i in range(gpu_num):
+        with tf.device(assign_to_device('/gpu:{}'.format(i), ps_device='/cpu:0')):
+            delta = tf.shape(i_batch)[0] // gpu_num
+            start, end = i*delta, (i+1)*delta
+            i = i_batch[start:end]
+            j = j_batch[start:end]
+            y = y_batch[start:end]
+            hist_i = hist_i_batch[start:end,:]
+            sl = sl_batch[start:end]
+
+            logits_train = DIN(i, j, y, hist_i, sl,
+                               item_count, cate_count, cate_list, reuse=reuse_vars, is_training=True)
+
+            logits_test = DIN(i, j, y, hist_i, sl,
+                              item_count, cate_count, cate_list, reuse=True, is_training=False)
+
+            loss_op = tf.reduce_mean(tf.nn.sigmoid_cross_entropy_with_logits(
+                logits=logits_train, labels=tf.cast(y, tf.float32)))
+
+            trainable_params = tf.trainable_variables()
+
+            gradients = tf.gradients(loss_op, trainable_params)
+            clip_gradients, _ = tf.clip_by_global_norm(gradients, 5)
+
+            grads = [(v, trainable_params[ind]) for ind, v in enumerate(clip_gradients)]
+
+            reuse_vars = True
+            tower_grads.append(grads)
+            tower_logits_test.append(logits_test)
+
+    batch_logits_test = tf.concat(tower_logits_test, 0)
+    average_grads = average_gradients(tower_grads)
+    optimizer = tf.train.GradientDescentOptimizer(learning_rate=lr)
+
+    train_op = optimizer.apply_gradients(average_grads)
+
+    init = tf.global_variables_initializer()
+
+    gpu_options = tf.GPUOptions(allow_growth=True)
+    # with tf.Session(config=tf.ConfigProto(gpu_options=gpu_options, device_count={'GPU': 2})) as sess:
+    with tf.Session(config=tf.ConfigProto(allow_soft_placement=True, log_device_placement=True, gpu_options=gpu_options)) as sess:
+
+      sess.run(tf.global_variables_initializer())
+      sess.run(tf.local_variables_initializer())
+      sess.run(init)
+
+      epoch2display = 2
+      start_time = time.time()
+      for epoch in range(10):
+
+        random.shuffle(train_set)
+
+        epoch_total_loss, cnt = 0.0, 0
+        for _, uij in DataInput(train_set, train_batch_size*gpu_num):
+
+          loss_, _ = sess.run([loss_op, train_op], feed_dict={
+                  i_batch: uij[1],
+                  j_batch: uij[2],
+                  y_batch: uij[2],
+                  hist_i_batch: uij[3],
+                  sl_batch: uij[4]
+              })
+
+          epoch_total_loss += loss_
+          cnt += 1
+          epoch_mean_loss = epoch_total_loss / float(cnt)
+
+        if (epoch>0) and ((epoch+1 % epoch2display==0) or (epoch==9)):
+            auc_train = _eval_logdata_improve(sess, train_set, train_batch_size*gpu_num, 'train')
+            auc_test = _eval_logdata_improve(sess, test_set_with_label, test_batch_size*gpu_num, 'test')
+            print(f'Epoch {epoch} DONE\tCost time: {time.time()-start_time}\ttrain epoch average loss: {epoch_mean_loss}\ttrain auc: {auc_train}\ttest auc: {auc_test}')
+            sys.stdout.flush()
+
+      print('best train_gauc %.4f\ttest_gauc: %.4f' % (best_auc_train, best_auc_test))
+      sys.stdout.flush()