Merge pull request #2 from radibnia77/main

Add tests
diff --git a/Model/lookalike-model/lookalike_model/pipeline/util.pyc b/Model/lookalike-model/lookalike_model/pipeline/util.pyc
deleted file mode 100644
index ef7660f..0000000
--- a/Model/lookalike-model/lookalike_model/pipeline/util.pyc
+++ /dev/null
Binary files differ
diff --git a/Model/lookalike-model/lookalike_model/trainer/lookalike_build_dataset.py b/Model/lookalike-model/lookalike_model/trainer/lookalike_build_dataset.py
index cb10d62..b515876 100644
--- a/Model/lookalike-model/lookalike_model/trainer/lookalike_build_dataset.py
+++ b/Model/lookalike-model/lookalike_model/trainer/lookalike_build_dataset.py
@@ -35,11 +35,11 @@
 def __data_parser(serialized_example):
 
     features = tf.parse_single_example(serialized_example,
-                                       features={'interval_starting_time': tf.FixedLenSequenceFeature([], tf.int64, allow_missing=True),
-                                                 'keywords' :tf.FixedLenSequenceFeature([], tf.string, allow_missing=True),
+                                       features={'interval_starting_time': tf.FixedLenSequenceFeature([], tf.string, allow_missing=True),
+                                                 'kwi' :tf.FixedLenSequenceFeature([], tf.string, allow_missing=True),
                                                  'did_index': tf.FixedLenFeature([], tf.int64),
-                                                 'click_counts': tf.FixedLenSequenceFeature([], tf.string, allow_missing=True),
-                                                 'show_counts': tf.FixedLenSequenceFeature([], tf.string, allow_missing=True),
+                                                 'kwi_click_counts': tf.FixedLenSequenceFeature([], tf.string, allow_missing=True),
+                                                 'kwi_show_counts': tf.FixedLenSequenceFeature([], tf.string, allow_missing=True),
                                                  # 'media_category_index': tf.FixedLenFeature([], tf.int64),
                                                  # 'net_type_index': tf.FixedLenFeature([], tf.int64),
                                                  'gender': tf.FixedLenFeature([], tf.int64),
@@ -49,11 +49,11 @@
 
                                                  })
     did_str = tf.cast(features['did'], tf.string)
-    time_interval = tf.cast(features['interval_starting_time'], tf.int64)
-    keyword = tf.cast(features['keywords'], tf.string)
+    time_interval = tf.cast(features['interval_starting_time'], tf.string)
+    keyword = tf.cast(features['kwi'], tf.string)
     ucdoc = tf.cast(features['did_index'], tf.int64)
-    click_counts = tf.cast(features['click_counts'], tf.string)
-    show_counts = tf.cast(features['show_counts'], tf.string)
+    click_counts = tf.cast(features['kwi_click_counts'], tf.string)
+    show_counts = tf.cast(features['kwi_show_counts'], tf.string)
     # media_category = tf.cast(features['media_category_index'], tf.int64)
     # net_type_index = tf.cast(features['net_type_index'], tf.int64)
     gender = tf.cast(features['gender'], tf.int64)
@@ -126,11 +126,14 @@
     with open(stats, 'rb') as f:
         ucdoc_num = pickle.load(f)['distinct_records_count']
     counter = 0
+    mapping=[]
     for i in range(ucdoc_num):
         x = sess.run(next_el)
-        log = list(x[0:7])
-        time_interval, ucdoc, click_counts, show_counts, gender, age, keyword = log[0], log[1], log[2], log[3], log[4], log[5], log[6]
+        log = list(x[0:8])
+        time_interval_s, ucdoc, click_counts, show_counts, gender, age, keyword ,did = log[0], log[1], log[2], log[3], log[4], log[5], log[6], log[7]
+        mapping.append([did,ucdoc])
 
+        time_interval = [int(i) for i in time_interval_s]
         keyword_int = [[int(i) for i in keyword[j].decode().split(",")] for j in range(len(keyword))]
         show_counts_list = str_to_intlist(show_counts)
         click_counts_list = str_to_intlist(click_counts)
@@ -186,17 +189,20 @@
     cate_list = np.array([x for x in range(30)])
     user_count, item_count , cate_count = len(set(ucdoc_lst)) , 30, 30
     print(counter)
-    with open('label_gdin_30.pkl', 'wb') as f:
+    mapping = pd.DataFrame(mapping)
+    mapping.to_csv("mapping_pipeline.csv")
+    with open('label_lookalike.pkl', 'wb') as f:
         pickle.dump(label, f, pickle.HIGHEST_PROTOCOL)
         pickle.dump(user_att,f, pickle.HIGHEST_PROTOCOL )
 
-    with open('ad_dataset_gdin_30.pkl', 'wb') as f:
+    with open('ad_dataset_lookalike.pkl', 'wb') as f:
         pickle.dump(train_set, f, pickle.HIGHEST_PROTOCOL)
         pickle.dump(test_set, f, pickle.HIGHEST_PROTOCOL)
         pickle.dump(cate_list, f, pickle.HIGHEST_PROTOCOL)
         pickle.dump((user_count, item_count, cate_count), f, pickle.HIGHEST_PROTOCOL)
 
 
+
 if __name__ == '__main__':
 
     parser = argparse.ArgumentParser(description='Prepare data')
diff --git a/Model/lookalike-model/tests/pipeline/config_trainready.yml b/Model/lookalike-model/tests/pipeline/config_trainready.yml
new file mode 100644
index 0000000..a268f22
--- /dev/null
+++ b/Model/lookalike-model/tests/pipeline/config_trainready.yml
@@ -0,0 +1,26 @@
+product_tag: 'lookalike'
+pipeline_tag: 'unittest'
+log:
+  level: 'ERROR' # log level for spark and app
+pipeline:
+  main_clean:  
+    did_bucket_num: 2 # Number of partitions for did
+    load_logs_in_minutes: 1440 #1440/day, original=14400
+    conditions: {
+      'new_slot_id_list': [
+          'abcdef0', 'abcdef1', 'abcdef2', 'abcdef3', 'abcdef4', 
+          'abcdef5', 'abcdef6', 'abcdef7', 'abcdef8', 'abcdef9'
+      ],
+      'new_slot_id_app_name_list': [
+          'Huawei Magazine', 'Huawei Browser', 'Huawei Video', 'Huawei Music', 'Huawei Reading', 
+          'Huawei Magazine', 'Huawei Browser', 'Huawei Video', 'Huawei Music', 'Huawei Reading'
+      ],
+      'starting_date': '2020-01-01',
+      'ending_date': '2020-01-11'
+    }
+  main_logs:
+    interval_time_in_seconds: 86400 # default=1 day, group logs in interval time.
+    logs_output_table_name: 'lookalike_unittest_trainready_input'
+  main_trainready:
+    trainready_output_table: 'lookalike_unittest_trainready_output'
+
diff --git a/Model/lookalike-model/tests/pipeline/data_generator.py b/Model/lookalike-model/tests/pipeline/data_generator.py
index 1b33f91..4c9d90b 100644
--- a/Model/lookalike-model/tests/pipeline/data_generator.py
+++ b/Model/lookalike-model/tests/pipeline/data_generator.py
@@ -63,6 +63,10 @@
     df = create_keywords(spark)
     write_to_table(df, table_name)
 
+# Creates unified log data and writes it to Hive.
+def create_unified_log_table (spark, table_name):
+    df = create_unified_log(spark)
+    write_to_table(df, table_name)
 
 #==========================================
 # Create dataframes for the unit tests
@@ -160,6 +164,20 @@
 def create_cleaned_log (spark):
     data = [
         ('C000', '0000001', '1000', 'splash', 'abcdef0', 'DUB-AL00', 'WIFI', 'CPC', '2020-01-01 12:34:56.78', 'Huawei Magazine', 0, 0, 'travel', '1', '2020-01-01', '1', ),
+        # ('C000', '0000001', '1000', 'splash', 'abcdef0', 'DUB-AL00', 'WIFI', 'CPC', '2020-01-01 13:34:56.78', 'Huawei Magazine', 0, 0, 'travel', '1', '2020-01-01', '1', ),
+        # ('C000', '0000001', '1000', 'splash', 'abcdef0', 'DUB-AL00', 'WIFI', 'CPC', '2020-01-01 14:34:56.78', 'Huawei Magazine', 0, 0, 'travel', '1', '2020-01-01', '1', ),
+        # ('C000', '0000001', '1000', 'splash', 'abcdef0', 'DUB-AL00', 'WIFI', 'CPC', '2020-01-01 15:34:56.78', 'Huawei Magazine', 0, 0, 'travel', '1', '2020-01-01', '1', ),
+        # ('C000', '0000001', '1000', 'splash', 'abcdef0', 'DUB-AL00', 'WIFI', 'CPC', '2020-01-01 15:59:59.00', 'Huawei Magazine', 0, 0, 'travel', '1', '2020-01-01', '1', ),
+        # ('C000', '0000001', '1000', 'splash', 'abcdef0', 'DUB-AL00', 'WIFI', 'CPC', '2020-01-01 15:59:59.99', 'Huawei Magazine', 0, 0, 'travel', '1', '2020-01-01', '1', ),
+        # ('C000', '0000001', '1000', 'splash', 'abcdef0', 'DUB-AL00', 'WIFI', 'CPC', '2020-01-01 16:00:00.00', 'Huawei Magazine', 0, 0, 'travel', '1', '2020-01-01', '1', ),
+        # ('C000', '0000001', '1000', 'splash', 'abcdef0', 'DUB-AL00', 'WIFI', 'CPC', '2020-01-01 16:34:56.78', 'Huawei Magazine', 0, 0, 'travel', '1', '2020-01-01', '1', ),
+        # ('C000', '0000001', '1000', 'splash', 'abcdef0', 'DUB-AL00', 'WIFI', 'CPC', '2020-01-01 17:34:56.78', 'Huawei Magazine', 0, 0, 'travel', '1', '2020-01-01', '1', ),
+        # ('C000', '0000001', '1000', 'splash', 'abcdef0', 'DUB-AL00', 'WIFI', 'CPC', '2020-01-01 18:34:56.78', 'Huawei Magazine', 0, 0, 'travel', '1', '2020-01-01', '1', ),
+        # ('C000', '0000001', '1000', 'splash', 'abcdef0', 'DUB-AL00', 'WIFI', 'CPC', '2020-01-01 19:34:56.78', 'Huawei Magazine', 0, 0, 'travel', '1', '2020-01-01', '1', ),
+        # ('C000', '0000001', '1000', 'splash', 'abcdef0', 'DUB-AL00', 'WIFI', 'CPC', '2020-01-01 20:34:56.78', 'Huawei Magazine', 0, 0, 'travel', '1', '2020-01-01', '1', ),
+        # ('C000', '0000001', '1000', 'splash', 'abcdef0', 'DUB-AL00', 'WIFI', 'CPC', '2020-01-01 21:34:56.78', 'Huawei Magazine', 0, 0, 'travel', '1', '2020-01-01', '1', ),
+        # ('C000', '0000001', '1000', 'splash', 'abcdef0', 'DUB-AL00', 'WIFI', 'CPC', '2020-01-01 22:34:56.78', 'Huawei Magazine', 0, 0, 'travel', '1', '2020-01-01', '1', ),
+        # ('C000', '0000001', '1000', 'splash', 'abcdef0', 'DUB-AL00', 'WIFI', 'CPC', '2020-01-01 23:34:56.78', 'Huawei Magazine', 0, 0, 'travel', '1', '2020-01-01', '1', ),
         ('C001', '0000002', '1000', 'splash', 'abcdef1', 'DUB-AL00', 'WIFI', 'CPC', '2020-01-02 12:34:56.78', 'Huawei Browser', 1, 0, 'travel', '1', '2020-01-02', '1', ),
         ('C002', '0000003', '1001', 'native', 'abcdef2', 'ABC-AL00', '4G', 'CPD', '2020-01-03 12:34:56.78', 'Huawei Video', 0, 1, 'travel', '1', '2020-01-03', '1', ),
         ('C010', '0000004', '1001', 'native', 'abcdef3', 'ABC-AL00', '4G', 'CPD', '2020-01-04 12:34:56.78', 'Huawei Music', 1, 1, 'game-avg', '2', '2020-01-04', '1', ),
@@ -220,6 +238,85 @@
 
     return spark.createDataFrame(spark.sparkContext.parallelize(data), schema)
 
+# Returns a dataframe with a unified log data.
+def create_unified_log (spark):
+
+
+    data = [
+        ('0000001', 0, '2020-01-01 12:34:56.78', 'travel', '1', 'splash', 'WIFI', 0, 0, '1000', 1577836800, 1577910896, '2020-01-01', '1', ),
+        ('0000001', 1, '2020-01-01 12:34:56.78', 'travel', '1', 'splash', 'WIFI', 0, 0, '1000', 1577836800, 1577910896, '2020-01-01', '1', ),
+        ('0000001', 1, '2020-01-01 12:34:56.78', 'travel', '1', 'splash', 'WIFI', 0, 0, '1000', 1577836800, 1577910896, '2020-01-01', '1', ),
+        ('0000001', 1, '2020-01-01 12:34:56.78', 'travel', '1', 'splash', 'WIFI', 0, 0, '1000', 1577836800, 1577910896, '2020-01-01', '1', ),
+        ('0000001', 0, '2020-01-01 12:34:56.78', 'game-avg', '2', 'splash', 'WIFI', 0, 0, '1000', 1577836800, 1577910896, '2020-01-01', '1', ),
+        ('0000001', 0, '2020-01-01 12:34:56.78', 'game-avg', '2', 'splash', 'WIFI', 0, 0, '1000', 1577836800, 1577910896, '2020-01-01', '1', ),
+        ('0000001', 1, '2020-01-01 12:34:56.78', 'game-avg', '2', 'splash', 'WIFI', 0, 0, '1000', 1577836800, 1577910896, '2020-01-01', '1', ),
+        ('0000001', 1, '2020-01-01 12:34:56.78', 'game-avg', '2', 'splash', 'WIFI', 0, 0, '1000', 1577836800, 1577910896, '2020-01-01', '1', ),
+        ('0000001', 0, '2020-01-01 12:34:56.78', 'reading', '3', 'splash', 'WIFI', 0, 0, '1000', 1577836800, 1577910896, '2020-01-01', '1', ),
+        ('0000001', 0, '2020-01-01 12:34:56.78', 'reading', '3', 'splash', 'WIFI', 0, 0, '1000', 1577836800, 1577910896, '2020-01-01', '1', ),
+        ('0000001', 0, '2020-01-01 12:34:56.78', 'reading', '3', 'splash', 'WIFI', 0, 0, '1000', 1577836800, 1577910896, '2020-01-01', '1', ),
+        ('0000001', 1, '2020-01-01 12:34:56.78', 'reading', '3', 'splash', 'WIFI', 0, 0, '1000', 1577836800, 1577910896, '2020-01-01', '1', ),
+        ('0000001', 0, '2020-01-02 12:34:56.78', 'travel', '1', 'splash', 'WIFI', 0, 0, '1000', 1577923200, 1577997296, '2020-01-02', '1', ),
+        ('0000001', 1, '2020-01-02 12:34:56.78', 'travel', '1', 'splash', 'WIFI', 0, 0, '1000', 1577923200, 1577997296, '2020-01-02', '1', ),
+        ('0000001', 0, '2020-01-03 12:34:56.78', 'travel', '1', 'native', '4G', 0, 0, '1001', 1578009600, 1578083696, '2020-01-03', '1', ),
+        ('0000001', 1, '2020-01-03 12:34:56.78', 'travel', '1', 'native', '4G', 0, 0, '1001', 1578009600, 1578083696, '2020-01-03', '1', ),
+        ('0000002', 0, '2020-01-02 12:34:56.78', 'travel', '1', 'splash', 'WIFI', 1, 0, '1000', 1577923200, 1577997296, '2020-01-02', '1', ),
+        ('0000002', 0, '2020-01-02 12:34:56.78', 'travel', '1', 'splash', 'WIFI', 1, 0, '1000', 1577923200, 1577997296, '2020-01-02', '1', ),
+        ('0000002', 0, '2020-01-02 12:34:56.78', 'travel', '1', 'splash', 'WIFI', 1, 0, '1000', 1577923200, 1577997296, '2020-01-02', '1', ),
+        ('0000003', 1, '2020-01-03 12:34:56.78', 'travel', '1', 'native', '4G', 0, 1, '1001', 1578009600, 1578083696, '2020-01-03', '1', ),
+        ('0000003', 1, '2020-01-03 12:34:56.78', 'travel', '1', 'native', '4G', 0, 1, '1001', 1578009600, 1578083696, '2020-01-03', '1', ),
+        ('0000003', 1, '2020-01-03 12:34:56.78', 'travel', '1', 'native', '4G', 0, 1, '1001', 1578009600, 1578083696, '2020-01-03', '1', ),
+    ]
+
+    schema = StructType([
+        StructField('did', StringType(), True),
+        StructField('is_click', IntegerType(), True),
+        StructField('action_time', StringType(), True),
+        StructField('keyword', StringType(), True),
+        StructField('keyword_index', StringType(), True),
+        StructField('media', StringType(), True),
+        StructField('net_type', StringType(), True),
+        StructField('gender', IntegerType(), True),
+        StructField('age', IntegerType(), True),
+        StructField('adv_id', StringType(), True),
+        StructField('interval_starting_time', IntegerType(), True),
+        StructField('action_time_seconds', IntegerType(), True),
+        StructField('day', StringType(), True),
+        StructField('did_bucket', StringType(), True),
+    ])
+
+    return spark.createDataFrame(spark.sparkContext.parallelize(data), schema)
+
+# Returns a dataframe with trainready data.
+def create_trainready_data (spark):
+    data = [
+        (0, 0, '0000001', 1000000001, [u'1578009600', u'1577923200', u'1577836800'], [u'travel', u'travel', u'travel,game-avg'], [u'1', u'1', u'1,2'], [u'1:2', u'1:2', u'1:2,2:1'], [u'1:1', u'1:1', u'1:1,2:0'], '1', ),
+        (0, 1, '0000002', 1000000002, [u'1577923200'], [u'travel'], [u'1'], [u'1:2'], [u'1:1'], '1', ),
+        (1, 0, '0000003', 1000000003, [u'1578009600'], [u'travel'], [u'1'], [u'1:2'], [u'1:1'], '1', ),
+        (1, 1, '0000004', 1000000004, [u'1578096000'], [u'game-avg'], [u'2'], [u'2:2'], [u'2:1'], '1', ),
+        (2, 0, '0000005', 1000000005, [u'1578182400'], [u'game-avg'], [u'2'], [u'2:2'], [u'2:1'], '1', ),
+        (2, 1, '0000006', 1, [u'1578268800'], [u'game-avg'], [u'2'], [u'2:2'], [u'2:1'], '0', ),
+        (3, 0, '0000007', 2, [u'1578355200'], [u'reading'], [u'3'], [u'3:2'], [u'3:1'], '0', ),
+        (3, 1, '0000008', 3, [u'1578441600'], [u'reading'], [u'3'], [u'3:2'], [u'3:1'], '0', ),
+        (4, 0, '0000009', 4, [u'1578528000'], [u'reading'], [u'3'], [u'3:2'], [u'3:1'], '0', ),
+        (4, 1, '0000010', 1000000006, [u'1578614400'], [u'reading'], [u'3'], [u'3:2'], [u'3:1'], '1', ),
+    ]
+
+    schema = StructType([
+        StructField('age', IntegerType(), True),
+        StructField('gender', IntegerType(), True),
+        StructField('did', StringType(), True),
+        StructField('did_index', LongType(), True),
+        StructField('interval_starting_time', ArrayType(), True),
+        StructField('interval_keywords', ArrayType(), True),
+        StructField('kwi', ArrayType(), True),
+        StructField('kwi_show_counts', ArrayType(), True),
+        StructField('kwi_click_counts', ArrayType(), True),
+        StructField('did_bucket', StringType(), True),
+    ])
+
+    return spark.createDataFrame(spark.sparkContext.parallelize(data), schema)
+
+
 # Prints to screen the code to generate the given data frame.
 def print_df_generator_code (df):
     columns = df.columns
diff --git a/Model/lookalike-model/tests/pipeline/test_main_clean.py b/Model/lookalike-model/tests/pipeline/test_main_clean.py
index 9163c86..3102fa3 100644
--- a/Model/lookalike-model/tests/pipeline/test_main_clean.py
+++ b/Model/lookalike-model/tests/pipeline/test_main_clean.py
@@ -144,6 +144,7 @@
 
         # Validate the cleaned clicklog table.
         df_clicklog = util.load_df(self.hive_context, clicklog_output_table)
+        print(df_clicklog.sort('action_time').show(100, False))
         self.validate_cleaned_log(df_clicklog, conditions, df_persona, df_keywords, df_log, cfg['pipeline']['main_clean']['did_bucket_num'])
 
         # Validate the cleaned showlog table.
diff --git a/Model/lookalike-model/tests/pipeline/test_main_logs.py b/Model/lookalike-model/tests/pipeline/test_main_logs.py
index 46112b7..69d8c7a 100644
--- a/Model/lookalike-model/tests/pipeline/test_main_logs.py
+++ b/Model/lookalike-model/tests/pipeline/test_main_logs.py
@@ -83,14 +83,15 @@
 
         # Validate the output.
         df = util.load_df(self.hive_context, log_output_table)
+        print(df.sort('action_time_seconds').show(100, False))
         print_df_generator_code(df.sort('did', 'is_click'))
         self.validate_unified_logs(df, create_cleaned_log(self.spark))
 
 
     def validate_unified_logs (self, df, df_log):
         # Verify the column names.
-        columns = ['is_click', 'did', 'adv_id', 'media', 
-            'net_type', 'action_time', 'gender', 'age', 
+        columns = ['is_click', 'did', 'adv_id', 'media', 'action_time_seconds',
+            'net_type', 'action_time', 'gender', 'age', 'interval_starting_time',
             'keyword', 'keyword_index', 'day', 'did_bucket']
         for name in columns:
             self.assertTrue(name in df.columns)
diff --git a/Model/lookalike-model/tests/pipeline/test_main_trainready.py b/Model/lookalike-model/tests/pipeline/test_main_trainready.py
new file mode 100644
index 0000000..8e2dd30
--- /dev/null
+++ b/Model/lookalike-model/tests/pipeline/test_main_trainready.py
@@ -0,0 +1,213 @@
+#  Licensed to the Apache Software Foundation (ASF) under one
+#  or more contributor license agreements.  See the NOTICE file
+#  distributed with this work for additional information
+#  regarding copyright ownership.  The ASF licenses this file
+#  to you under the Apache License, Version 2.0 (the
+#  "License"); you may not use this file except in compliance
+#  with the License.  You may obtain a copy of the License at
+ 
+#  http://www.apache.org/licenses/LICENSE-2.0.html
+
+#  Unless required by applicable law or agreed to in writing, software
+#  distributed under the License is distributed on an "AS IS" BASIS,
+#  WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+#  See the License for the specific language governing permissions and
+#  limitations under the License.
+
+import unittest
+import yaml
+from pyspark import SparkContext
+from pyspark.sql import SparkSession, HiveContext
+from pyspark.sql.functions import col
+from pyspark.sql.types import IntegerType
+from lookalike_model.pipeline import main_trainready, util
+from data_generator import *
+
+class TestMainTrainReady (unittest.TestCase):
+
+    def setUp (self):
+        # Set the log level.
+        sc = SparkContext.getOrCreate()
+        sc.setLogLevel('ERROR')
+
+        # Initialize the Spark session
+        self.spark = SparkSession.builder.appName('unit test').enableHiveSupport().getOrCreate()
+        self.hive_context = HiveContext(sc)
+
+    def test_generate_trainready (self):
+        print('*** Running test_generate_trainready test ***')
+
+        # Load the configuration data.
+        with open('pipeline/config_trainready.yml', 'r') as ymlfile:
+            cfg = yaml.safe_load(ymlfile)
+
+        # Load the training parameters.
+        did_bucket_num = cfg['pipeline']['main_clean']['did_bucket_num']
+        interval_time_in_seconds = cfg['pipeline']['main_logs']['interval_time_in_seconds']
+        log_table = cfg['pipeline']['main_logs']['logs_output_table_name']
+        trainready_table = cfg['pipeline']['main_trainready']['trainready_output_table']
+
+        # Create the input data table.
+        create_unified_log_table(self.spark, log_table)
+
+        # Clear the output from any previous runs.
+        util.drop_table(self.hive_context, trainready_table)
+
+        # Run the method to be tested.
+        main_trainready.generate_trainready(self.hive_context, util.load_batch_config(cfg),
+            interval_time_in_seconds, log_table, trainready_table, did_bucket_num)
+
+        # Validate the output.
+        df = util.load_df(self.hive_context, trainready_table)
+        self.validate_trainready_output(df, create_unified_log(self.spark))
+
+
+    def test_run (self):
+        # Load the configuration data.
+        with open('pipeline/config_trainready.yml', 'r') as ymlfile:
+            cfg = yaml.safe_load(ymlfile)
+
+        # Create the input data table.
+        log_table = cfg['pipeline']['main_logs']['logs_output_table_name']
+        create_unified_log_table(self.spark, log_table)
+
+        # Clear the output from any previous runs.
+        trainready_table = cfg['pipeline']['main_trainready']['trainready_output_table']
+        util.drop_table(self.hive_context, trainready_table)
+
+        # Run the method to be tested.
+        main_trainready.run(self.hive_context, cfg)
+
+        # Validate the output.
+        df = util.load_df(self.hive_context, trainready_table)
+        print(df.sort('did').show(100, False))
+        print_df_generator_code(df.sort('did'))
+        self.validate_trainready_output(df, create_unified_log(self.spark))
+
+    def validate_trainready_output (self, df, df_input_logs):
+        # Parses a list of list of ints or dicts of ints into a list of dicts/sets.
+        def parse_int_field (values):
+            result = []
+            for value in values:
+                # Evaluate the value as a set or map.
+                # Does not work for strings since there are no quotes.
+                result.append(eval('{' + value + '}'))
+            return result
+
+        # Parses a list of list of strings into a list of sets of strings.
+        def parse_string_field (values):
+            result = []
+            for value in values:
+                temp = set()
+                for val in value.split(','):
+                    temp.add(val)
+                result.append(temp)
+            return result
+
+        # Verify the column names.
+        columns = {'did', 'did_index', 'did_bucket', 'interval_keywords', 
+            'gender', 'age', 'interval_starting_time',
+            'kwi', 'kwi_show_counts', 'kwi_click_counts'}
+        for name in columns:
+            self.assertTrue(name in df.columns)
+
+        # Check that the number of rows is the same of the number of
+        # unique did's.
+        self.assertEqual(df.count(), df_input_logs.select('did').distinct().count())
+
+        seen_did = set()
+        for did in [ i['did'] for i in df_input_logs.select('did').distinct().collect() ]:
+            # Get the rows of the input and output data set for this did.
+            seen_did.add(did)
+            df_did = df.where(col('did') == did)
+            df_input_did = df_input_logs.where(col('did') == did)
+
+            # Verify that the output has only one row for this did.
+            self.assertEqual(df_did.count(), 1)
+            print(df_did.show(1, False))
+            print(df_input_did.show(100, False))
+
+            # Verify the non-aggregate columns.
+            df_did_row = df_did.collect()[0]
+            for column in ['did_bucket', 'age', 'gender']:
+                self.assertEqual(df_did_row[column], 
+                    df_input_did.collect()[0][column])
+
+            # Parse the show and click count fields.
+            show_counts = parse_int_field(df_did_row['kwi_show_counts'])
+            click_counts = parse_int_field(df_did_row['kwi_click_counts'])
+
+            # Parse the keyword fields.
+            keywords = parse_string_field(df_did_row['interval_keywords'])
+            kwi = parse_int_field(df_did_row['kwi'])
+
+            # Verify the aggregate columns by start time.
+            seen_start_times = set()
+            for start_time in [ int(i['interval_starting_time']) for i in df_input_did.select('interval_starting_time').distinct().collect() ]:
+                seen_start_times.add(start_time)
+
+                # Find the index of the start time.
+                index = -1
+                for i, time in enumerate(df_did_row['interval_starting_time']):
+                    if int(time) == start_time:
+                        index = i
+                # Fail if the start time is not found.
+                self.assertNotEqual(index, -1)
+
+                # Get the rows of the input dataframe for this start time.
+                df_st = df_input_did.where(col('interval_starting_time') == str(start_time))
+
+                # Iterate through the keyword indexes for this start time.
+                keyword_indexes = df_st.select('keyword_index').distinct().collect()
+                seen_kwi = set()
+                seen_keywords = set()
+                for keyword_index in [ int(i['keyword_index']) for i in keyword_indexes ]:
+                    # Get the rows of the input dataframe with this did, start time, and keyword index.
+                    seen_kwi.add(keyword_index)
+                    df_kw = df_st.where(col('keyword_index') == keyword_index)
+                    # print(df_kw.show(10, False))
+
+                    # Validate the keyword and keyword index.
+                    self.assertTrue(keyword_index in kwi[index])
+                    for row in df_kw.collect():
+                        self.assertTrue(int(row['keyword_index']) in kwi[index])
+                        self.assertTrue(row['keyword'] in keywords[index])
+                        seen_keywords.add(row['keyword'])
+
+                    # Get the show and click count this this did, start time, and keyword index.
+                    show_count = df_kw.where(col('is_click') == 0).count()
+                    click_count = df_kw.where(col('is_click') == 1).count()
+
+                    # Validate the click and show counts.
+                    self.assertEqual(show_counts[index][keyword_index], show_count)
+                    self.assertEqual(click_counts[index][keyword_index], click_count)
+
+                # Check for any additional keyword indices that should not be there.
+                self.assertEqual(len(show_counts[index]), len(seen_kwi))
+                for key in show_counts[index].keys():
+                    self.assertTrue(key in seen_kwi)
+                self.assertEqual(len(click_counts[index]), len(seen_kwi))
+                for key in click_counts[index].keys():
+                    self.assertTrue(key in seen_kwi)
+                self.assertEqual(len(kwi[index]), len(seen_kwi))
+                for i in kwi[index]:
+                    self.assertTrue(i in seen_kwi)
+                self.assertEqual(len(keywords[index]), len(seen_keywords))
+                for i in keywords[index]:
+                    self.assertTrue(i in seen_keywords)
+
+            # Check for any additional start times that should not be there.
+            self.assertEqual(len(df_did_row['interval_starting_time']), len(seen_start_times))
+            for i in df_did_row['interval_starting_time']:
+                self.assertTrue(int(i) in seen_start_times)
+
+        # Check for any did's that shouldn't be there.
+        output_did = [ i['did'] for i in df_input_logs.select('did').distinct().collect()]
+        self.assertEqual(len(output_did), len(seen_did))
+        for i in output_did:
+            self.assertTrue(i in seen_did)
+
+# Runs the tests.
+if __name__ == '__main__':
+    # Run the unit tests.
+    unittest.main()
diff --git a/Model/lookalike-model/tests/run_test.sh b/Model/lookalike-model/tests/run_test.sh
index be1d942..e4fdff9 100644
--- a/Model/lookalike-model/tests/run_test.sh
+++ b/Model/lookalike-model/tests/run_test.sh
@@ -4,14 +4,19 @@
 cd $DIR
 
 # test_main_clean: preparing cleaned persona, click and show logs data.
-if true
+if false
 then
     spark-submit --master yarn --num-executors 5 --executor-cores 2 --conf spark.hadoop.hive.exec.dynamic.partition=true --conf spark.hadoop.hive.exec.dynamic.partition.mode=nonstrict pipeline/test_main_clean.py
 fi
 
 # test_main_logs: merges click and show log data.
-if true
+if false
 then
     spark-submit --master yarn --num-executors 5 --executor-cores 2 --conf spark.hadoop.hive.exec.dynamic.partition=true --conf spark.hadoop.hive.exec.dynamic.partition.mode=nonstrict pipeline/test_main_logs.py
 fi
 
+# test_main_logs: merges click and show log data.
+if true
+then
+    spark-submit --master yarn --num-executors 5 --executor-cores 2 --conf spark.hadoop.hive.exec.dynamic.partition=true --conf spark.hadoop.hive.exec.dynamic.partition.mode=nonstrict pipeline/test_main_trainready.py
+fi