Merge pull request #2 from radibnia77/main
Add tests
diff --git a/Model/lookalike-model/lookalike_model/pipeline/util.pyc b/Model/lookalike-model/lookalike_model/pipeline/util.pyc
deleted file mode 100644
index ef7660f..0000000
--- a/Model/lookalike-model/lookalike_model/pipeline/util.pyc
+++ /dev/null
Binary files differ
diff --git a/Model/lookalike-model/lookalike_model/trainer/lookalike_build_dataset.py b/Model/lookalike-model/lookalike_model/trainer/lookalike_build_dataset.py
index cb10d62..b515876 100644
--- a/Model/lookalike-model/lookalike_model/trainer/lookalike_build_dataset.py
+++ b/Model/lookalike-model/lookalike_model/trainer/lookalike_build_dataset.py
@@ -35,11 +35,11 @@
def __data_parser(serialized_example):
features = tf.parse_single_example(serialized_example,
- features={'interval_starting_time': tf.FixedLenSequenceFeature([], tf.int64, allow_missing=True),
- 'keywords' :tf.FixedLenSequenceFeature([], tf.string, allow_missing=True),
+ features={'interval_starting_time': tf.FixedLenSequenceFeature([], tf.string, allow_missing=True),
+ 'kwi' :tf.FixedLenSequenceFeature([], tf.string, allow_missing=True),
'did_index': tf.FixedLenFeature([], tf.int64),
- 'click_counts': tf.FixedLenSequenceFeature([], tf.string, allow_missing=True),
- 'show_counts': tf.FixedLenSequenceFeature([], tf.string, allow_missing=True),
+ 'kwi_click_counts': tf.FixedLenSequenceFeature([], tf.string, allow_missing=True),
+ 'kwi_show_counts': tf.FixedLenSequenceFeature([], tf.string, allow_missing=True),
# 'media_category_index': tf.FixedLenFeature([], tf.int64),
# 'net_type_index': tf.FixedLenFeature([], tf.int64),
'gender': tf.FixedLenFeature([], tf.int64),
@@ -49,11 +49,11 @@
})
did_str = tf.cast(features['did'], tf.string)
- time_interval = tf.cast(features['interval_starting_time'], tf.int64)
- keyword = tf.cast(features['keywords'], tf.string)
+ time_interval = tf.cast(features['interval_starting_time'], tf.string)
+ keyword = tf.cast(features['kwi'], tf.string)
ucdoc = tf.cast(features['did_index'], tf.int64)
- click_counts = tf.cast(features['click_counts'], tf.string)
- show_counts = tf.cast(features['show_counts'], tf.string)
+ click_counts = tf.cast(features['kwi_click_counts'], tf.string)
+ show_counts = tf.cast(features['kwi_show_counts'], tf.string)
# media_category = tf.cast(features['media_category_index'], tf.int64)
# net_type_index = tf.cast(features['net_type_index'], tf.int64)
gender = tf.cast(features['gender'], tf.int64)
@@ -126,11 +126,14 @@
with open(stats, 'rb') as f:
ucdoc_num = pickle.load(f)['distinct_records_count']
counter = 0
+ mapping=[]
for i in range(ucdoc_num):
x = sess.run(next_el)
- log = list(x[0:7])
- time_interval, ucdoc, click_counts, show_counts, gender, age, keyword = log[0], log[1], log[2], log[3], log[4], log[5], log[6]
+ log = list(x[0:8])
+ time_interval_s, ucdoc, click_counts, show_counts, gender, age, keyword ,did = log[0], log[1], log[2], log[3], log[4], log[5], log[6], log[7]
+ mapping.append([did,ucdoc])
+ time_interval = [int(i) for i in time_interval_s]
keyword_int = [[int(i) for i in keyword[j].decode().split(",")] for j in range(len(keyword))]
show_counts_list = str_to_intlist(show_counts)
click_counts_list = str_to_intlist(click_counts)
@@ -186,17 +189,20 @@
cate_list = np.array([x for x in range(30)])
user_count, item_count , cate_count = len(set(ucdoc_lst)) , 30, 30
print(counter)
- with open('label_gdin_30.pkl', 'wb') as f:
+ mapping = pd.DataFrame(mapping)
+ mapping.to_csv("mapping_pipeline.csv")
+ with open('label_lookalike.pkl', 'wb') as f:
pickle.dump(label, f, pickle.HIGHEST_PROTOCOL)
pickle.dump(user_att,f, pickle.HIGHEST_PROTOCOL )
- with open('ad_dataset_gdin_30.pkl', 'wb') as f:
+ with open('ad_dataset_lookalike.pkl', 'wb') as f:
pickle.dump(train_set, f, pickle.HIGHEST_PROTOCOL)
pickle.dump(test_set, f, pickle.HIGHEST_PROTOCOL)
pickle.dump(cate_list, f, pickle.HIGHEST_PROTOCOL)
pickle.dump((user_count, item_count, cate_count), f, pickle.HIGHEST_PROTOCOL)
+
if __name__ == '__main__':
parser = argparse.ArgumentParser(description='Prepare data')
diff --git a/Model/lookalike-model/tests/pipeline/config_trainready.yml b/Model/lookalike-model/tests/pipeline/config_trainready.yml
new file mode 100644
index 0000000..a268f22
--- /dev/null
+++ b/Model/lookalike-model/tests/pipeline/config_trainready.yml
@@ -0,0 +1,26 @@
+product_tag: 'lookalike'
+pipeline_tag: 'unittest'
+log:
+ level: 'ERROR' # log level for spark and app
+pipeline:
+ main_clean:
+ did_bucket_num: 2 # Number of partitions for did
+ load_logs_in_minutes: 1440 #1440/day, original=14400
+ conditions: {
+ 'new_slot_id_list': [
+ 'abcdef0', 'abcdef1', 'abcdef2', 'abcdef3', 'abcdef4',
+ 'abcdef5', 'abcdef6', 'abcdef7', 'abcdef8', 'abcdef9'
+ ],
+ 'new_slot_id_app_name_list': [
+ 'Huawei Magazine', 'Huawei Browser', 'Huawei Video', 'Huawei Music', 'Huawei Reading',
+ 'Huawei Magazine', 'Huawei Browser', 'Huawei Video', 'Huawei Music', 'Huawei Reading'
+ ],
+ 'starting_date': '2020-01-01',
+ 'ending_date': '2020-01-11'
+ }
+ main_logs:
+ interval_time_in_seconds: 86400 # default=1 day, group logs in interval time.
+ logs_output_table_name: 'lookalike_unittest_trainready_input'
+ main_trainready:
+ trainready_output_table: 'lookalike_unittest_trainready_output'
+
diff --git a/Model/lookalike-model/tests/pipeline/data_generator.py b/Model/lookalike-model/tests/pipeline/data_generator.py
index 1b33f91..4c9d90b 100644
--- a/Model/lookalike-model/tests/pipeline/data_generator.py
+++ b/Model/lookalike-model/tests/pipeline/data_generator.py
@@ -63,6 +63,10 @@
df = create_keywords(spark)
write_to_table(df, table_name)
+# Creates unified log data and writes it to Hive.
+def create_unified_log_table (spark, table_name):
+ df = create_unified_log(spark)
+ write_to_table(df, table_name)
#==========================================
# Create dataframes for the unit tests
@@ -160,6 +164,20 @@
def create_cleaned_log (spark):
data = [
('C000', '0000001', '1000', 'splash', 'abcdef0', 'DUB-AL00', 'WIFI', 'CPC', '2020-01-01 12:34:56.78', 'Huawei Magazine', 0, 0, 'travel', '1', '2020-01-01', '1', ),
+ # ('C000', '0000001', '1000', 'splash', 'abcdef0', 'DUB-AL00', 'WIFI', 'CPC', '2020-01-01 13:34:56.78', 'Huawei Magazine', 0, 0, 'travel', '1', '2020-01-01', '1', ),
+ # ('C000', '0000001', '1000', 'splash', 'abcdef0', 'DUB-AL00', 'WIFI', 'CPC', '2020-01-01 14:34:56.78', 'Huawei Magazine', 0, 0, 'travel', '1', '2020-01-01', '1', ),
+ # ('C000', '0000001', '1000', 'splash', 'abcdef0', 'DUB-AL00', 'WIFI', 'CPC', '2020-01-01 15:34:56.78', 'Huawei Magazine', 0, 0, 'travel', '1', '2020-01-01', '1', ),
+ # ('C000', '0000001', '1000', 'splash', 'abcdef0', 'DUB-AL00', 'WIFI', 'CPC', '2020-01-01 15:59:59.00', 'Huawei Magazine', 0, 0, 'travel', '1', '2020-01-01', '1', ),
+ # ('C000', '0000001', '1000', 'splash', 'abcdef0', 'DUB-AL00', 'WIFI', 'CPC', '2020-01-01 15:59:59.99', 'Huawei Magazine', 0, 0, 'travel', '1', '2020-01-01', '1', ),
+ # ('C000', '0000001', '1000', 'splash', 'abcdef0', 'DUB-AL00', 'WIFI', 'CPC', '2020-01-01 16:00:00.00', 'Huawei Magazine', 0, 0, 'travel', '1', '2020-01-01', '1', ),
+ # ('C000', '0000001', '1000', 'splash', 'abcdef0', 'DUB-AL00', 'WIFI', 'CPC', '2020-01-01 16:34:56.78', 'Huawei Magazine', 0, 0, 'travel', '1', '2020-01-01', '1', ),
+ # ('C000', '0000001', '1000', 'splash', 'abcdef0', 'DUB-AL00', 'WIFI', 'CPC', '2020-01-01 17:34:56.78', 'Huawei Magazine', 0, 0, 'travel', '1', '2020-01-01', '1', ),
+ # ('C000', '0000001', '1000', 'splash', 'abcdef0', 'DUB-AL00', 'WIFI', 'CPC', '2020-01-01 18:34:56.78', 'Huawei Magazine', 0, 0, 'travel', '1', '2020-01-01', '1', ),
+ # ('C000', '0000001', '1000', 'splash', 'abcdef0', 'DUB-AL00', 'WIFI', 'CPC', '2020-01-01 19:34:56.78', 'Huawei Magazine', 0, 0, 'travel', '1', '2020-01-01', '1', ),
+ # ('C000', '0000001', '1000', 'splash', 'abcdef0', 'DUB-AL00', 'WIFI', 'CPC', '2020-01-01 20:34:56.78', 'Huawei Magazine', 0, 0, 'travel', '1', '2020-01-01', '1', ),
+ # ('C000', '0000001', '1000', 'splash', 'abcdef0', 'DUB-AL00', 'WIFI', 'CPC', '2020-01-01 21:34:56.78', 'Huawei Magazine', 0, 0, 'travel', '1', '2020-01-01', '1', ),
+ # ('C000', '0000001', '1000', 'splash', 'abcdef0', 'DUB-AL00', 'WIFI', 'CPC', '2020-01-01 22:34:56.78', 'Huawei Magazine', 0, 0, 'travel', '1', '2020-01-01', '1', ),
+ # ('C000', '0000001', '1000', 'splash', 'abcdef0', 'DUB-AL00', 'WIFI', 'CPC', '2020-01-01 23:34:56.78', 'Huawei Magazine', 0, 0, 'travel', '1', '2020-01-01', '1', ),
('C001', '0000002', '1000', 'splash', 'abcdef1', 'DUB-AL00', 'WIFI', 'CPC', '2020-01-02 12:34:56.78', 'Huawei Browser', 1, 0, 'travel', '1', '2020-01-02', '1', ),
('C002', '0000003', '1001', 'native', 'abcdef2', 'ABC-AL00', '4G', 'CPD', '2020-01-03 12:34:56.78', 'Huawei Video', 0, 1, 'travel', '1', '2020-01-03', '1', ),
('C010', '0000004', '1001', 'native', 'abcdef3', 'ABC-AL00', '4G', 'CPD', '2020-01-04 12:34:56.78', 'Huawei Music', 1, 1, 'game-avg', '2', '2020-01-04', '1', ),
@@ -220,6 +238,85 @@
return spark.createDataFrame(spark.sparkContext.parallelize(data), schema)
+# Returns a dataframe with a unified log data.
+def create_unified_log (spark):
+
+
+ data = [
+ ('0000001', 0, '2020-01-01 12:34:56.78', 'travel', '1', 'splash', 'WIFI', 0, 0, '1000', 1577836800, 1577910896, '2020-01-01', '1', ),
+ ('0000001', 1, '2020-01-01 12:34:56.78', 'travel', '1', 'splash', 'WIFI', 0, 0, '1000', 1577836800, 1577910896, '2020-01-01', '1', ),
+ ('0000001', 1, '2020-01-01 12:34:56.78', 'travel', '1', 'splash', 'WIFI', 0, 0, '1000', 1577836800, 1577910896, '2020-01-01', '1', ),
+ ('0000001', 1, '2020-01-01 12:34:56.78', 'travel', '1', 'splash', 'WIFI', 0, 0, '1000', 1577836800, 1577910896, '2020-01-01', '1', ),
+ ('0000001', 0, '2020-01-01 12:34:56.78', 'game-avg', '2', 'splash', 'WIFI', 0, 0, '1000', 1577836800, 1577910896, '2020-01-01', '1', ),
+ ('0000001', 0, '2020-01-01 12:34:56.78', 'game-avg', '2', 'splash', 'WIFI', 0, 0, '1000', 1577836800, 1577910896, '2020-01-01', '1', ),
+ ('0000001', 1, '2020-01-01 12:34:56.78', 'game-avg', '2', 'splash', 'WIFI', 0, 0, '1000', 1577836800, 1577910896, '2020-01-01', '1', ),
+ ('0000001', 1, '2020-01-01 12:34:56.78', 'game-avg', '2', 'splash', 'WIFI', 0, 0, '1000', 1577836800, 1577910896, '2020-01-01', '1', ),
+ ('0000001', 0, '2020-01-01 12:34:56.78', 'reading', '3', 'splash', 'WIFI', 0, 0, '1000', 1577836800, 1577910896, '2020-01-01', '1', ),
+ ('0000001', 0, '2020-01-01 12:34:56.78', 'reading', '3', 'splash', 'WIFI', 0, 0, '1000', 1577836800, 1577910896, '2020-01-01', '1', ),
+ ('0000001', 0, '2020-01-01 12:34:56.78', 'reading', '3', 'splash', 'WIFI', 0, 0, '1000', 1577836800, 1577910896, '2020-01-01', '1', ),
+ ('0000001', 1, '2020-01-01 12:34:56.78', 'reading', '3', 'splash', 'WIFI', 0, 0, '1000', 1577836800, 1577910896, '2020-01-01', '1', ),
+ ('0000001', 0, '2020-01-02 12:34:56.78', 'travel', '1', 'splash', 'WIFI', 0, 0, '1000', 1577923200, 1577997296, '2020-01-02', '1', ),
+ ('0000001', 1, '2020-01-02 12:34:56.78', 'travel', '1', 'splash', 'WIFI', 0, 0, '1000', 1577923200, 1577997296, '2020-01-02', '1', ),
+ ('0000001', 0, '2020-01-03 12:34:56.78', 'travel', '1', 'native', '4G', 0, 0, '1001', 1578009600, 1578083696, '2020-01-03', '1', ),
+ ('0000001', 1, '2020-01-03 12:34:56.78', 'travel', '1', 'native', '4G', 0, 0, '1001', 1578009600, 1578083696, '2020-01-03', '1', ),
+ ('0000002', 0, '2020-01-02 12:34:56.78', 'travel', '1', 'splash', 'WIFI', 1, 0, '1000', 1577923200, 1577997296, '2020-01-02', '1', ),
+ ('0000002', 0, '2020-01-02 12:34:56.78', 'travel', '1', 'splash', 'WIFI', 1, 0, '1000', 1577923200, 1577997296, '2020-01-02', '1', ),
+ ('0000002', 0, '2020-01-02 12:34:56.78', 'travel', '1', 'splash', 'WIFI', 1, 0, '1000', 1577923200, 1577997296, '2020-01-02', '1', ),
+ ('0000003', 1, '2020-01-03 12:34:56.78', 'travel', '1', 'native', '4G', 0, 1, '1001', 1578009600, 1578083696, '2020-01-03', '1', ),
+ ('0000003', 1, '2020-01-03 12:34:56.78', 'travel', '1', 'native', '4G', 0, 1, '1001', 1578009600, 1578083696, '2020-01-03', '1', ),
+ ('0000003', 1, '2020-01-03 12:34:56.78', 'travel', '1', 'native', '4G', 0, 1, '1001', 1578009600, 1578083696, '2020-01-03', '1', ),
+ ]
+
+ schema = StructType([
+ StructField('did', StringType(), True),
+ StructField('is_click', IntegerType(), True),
+ StructField('action_time', StringType(), True),
+ StructField('keyword', StringType(), True),
+ StructField('keyword_index', StringType(), True),
+ StructField('media', StringType(), True),
+ StructField('net_type', StringType(), True),
+ StructField('gender', IntegerType(), True),
+ StructField('age', IntegerType(), True),
+ StructField('adv_id', StringType(), True),
+ StructField('interval_starting_time', IntegerType(), True),
+ StructField('action_time_seconds', IntegerType(), True),
+ StructField('day', StringType(), True),
+ StructField('did_bucket', StringType(), True),
+ ])
+
+ return spark.createDataFrame(spark.sparkContext.parallelize(data), schema)
+
+# Returns a dataframe with trainready data.
+def create_trainready_data (spark):
+ data = [
+ (0, 0, '0000001', 1000000001, [u'1578009600', u'1577923200', u'1577836800'], [u'travel', u'travel', u'travel,game-avg'], [u'1', u'1', u'1,2'], [u'1:2', u'1:2', u'1:2,2:1'], [u'1:1', u'1:1', u'1:1,2:0'], '1', ),
+ (0, 1, '0000002', 1000000002, [u'1577923200'], [u'travel'], [u'1'], [u'1:2'], [u'1:1'], '1', ),
+ (1, 0, '0000003', 1000000003, [u'1578009600'], [u'travel'], [u'1'], [u'1:2'], [u'1:1'], '1', ),
+ (1, 1, '0000004', 1000000004, [u'1578096000'], [u'game-avg'], [u'2'], [u'2:2'], [u'2:1'], '1', ),
+ (2, 0, '0000005', 1000000005, [u'1578182400'], [u'game-avg'], [u'2'], [u'2:2'], [u'2:1'], '1', ),
+ (2, 1, '0000006', 1, [u'1578268800'], [u'game-avg'], [u'2'], [u'2:2'], [u'2:1'], '0', ),
+ (3, 0, '0000007', 2, [u'1578355200'], [u'reading'], [u'3'], [u'3:2'], [u'3:1'], '0', ),
+ (3, 1, '0000008', 3, [u'1578441600'], [u'reading'], [u'3'], [u'3:2'], [u'3:1'], '0', ),
+ (4, 0, '0000009', 4, [u'1578528000'], [u'reading'], [u'3'], [u'3:2'], [u'3:1'], '0', ),
+ (4, 1, '0000010', 1000000006, [u'1578614400'], [u'reading'], [u'3'], [u'3:2'], [u'3:1'], '1', ),
+ ]
+
+ schema = StructType([
+ StructField('age', IntegerType(), True),
+ StructField('gender', IntegerType(), True),
+ StructField('did', StringType(), True),
+ StructField('did_index', LongType(), True),
+ StructField('interval_starting_time', ArrayType(), True),
+ StructField('interval_keywords', ArrayType(), True),
+ StructField('kwi', ArrayType(), True),
+ StructField('kwi_show_counts', ArrayType(), True),
+ StructField('kwi_click_counts', ArrayType(), True),
+ StructField('did_bucket', StringType(), True),
+ ])
+
+ return spark.createDataFrame(spark.sparkContext.parallelize(data), schema)
+
+
# Prints to screen the code to generate the given data frame.
def print_df_generator_code (df):
columns = df.columns
diff --git a/Model/lookalike-model/tests/pipeline/test_main_clean.py b/Model/lookalike-model/tests/pipeline/test_main_clean.py
index 9163c86..3102fa3 100644
--- a/Model/lookalike-model/tests/pipeline/test_main_clean.py
+++ b/Model/lookalike-model/tests/pipeline/test_main_clean.py
@@ -144,6 +144,7 @@
# Validate the cleaned clicklog table.
df_clicklog = util.load_df(self.hive_context, clicklog_output_table)
+ print(df_clicklog.sort('action_time').show(100, False))
self.validate_cleaned_log(df_clicklog, conditions, df_persona, df_keywords, df_log, cfg['pipeline']['main_clean']['did_bucket_num'])
# Validate the cleaned showlog table.
diff --git a/Model/lookalike-model/tests/pipeline/test_main_logs.py b/Model/lookalike-model/tests/pipeline/test_main_logs.py
index 46112b7..69d8c7a 100644
--- a/Model/lookalike-model/tests/pipeline/test_main_logs.py
+++ b/Model/lookalike-model/tests/pipeline/test_main_logs.py
@@ -83,14 +83,15 @@
# Validate the output.
df = util.load_df(self.hive_context, log_output_table)
+ print(df.sort('action_time_seconds').show(100, False))
print_df_generator_code(df.sort('did', 'is_click'))
self.validate_unified_logs(df, create_cleaned_log(self.spark))
def validate_unified_logs (self, df, df_log):
# Verify the column names.
- columns = ['is_click', 'did', 'adv_id', 'media',
- 'net_type', 'action_time', 'gender', 'age',
+ columns = ['is_click', 'did', 'adv_id', 'media', 'action_time_seconds',
+ 'net_type', 'action_time', 'gender', 'age', 'interval_starting_time',
'keyword', 'keyword_index', 'day', 'did_bucket']
for name in columns:
self.assertTrue(name in df.columns)
diff --git a/Model/lookalike-model/tests/pipeline/test_main_trainready.py b/Model/lookalike-model/tests/pipeline/test_main_trainready.py
new file mode 100644
index 0000000..8e2dd30
--- /dev/null
+++ b/Model/lookalike-model/tests/pipeline/test_main_trainready.py
@@ -0,0 +1,213 @@
+# Licensed to the Apache Software Foundation (ASF) under one
+# or more contributor license agreements. See the NOTICE file
+# distributed with this work for additional information
+# regarding copyright ownership. The ASF licenses this file
+# to you under the Apache License, Version 2.0 (the
+# "License"); you may not use this file except in compliance
+# with the License. You may obtain a copy of the License at
+
+# http://www.apache.org/licenses/LICENSE-2.0.html
+
+# Unless required by applicable law or agreed to in writing, software
+# distributed under the License is distributed on an "AS IS" BASIS,
+# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+# See the License for the specific language governing permissions and
+# limitations under the License.
+
+import unittest
+import yaml
+from pyspark import SparkContext
+from pyspark.sql import SparkSession, HiveContext
+from pyspark.sql.functions import col
+from pyspark.sql.types import IntegerType
+from lookalike_model.pipeline import main_trainready, util
+from data_generator import *
+
+class TestMainTrainReady (unittest.TestCase):
+
+ def setUp (self):
+ # Set the log level.
+ sc = SparkContext.getOrCreate()
+ sc.setLogLevel('ERROR')
+
+ # Initialize the Spark session
+ self.spark = SparkSession.builder.appName('unit test').enableHiveSupport().getOrCreate()
+ self.hive_context = HiveContext(sc)
+
+ def test_generate_trainready (self):
+ print('*** Running test_generate_trainready test ***')
+
+ # Load the configuration data.
+ with open('pipeline/config_trainready.yml', 'r') as ymlfile:
+ cfg = yaml.safe_load(ymlfile)
+
+ # Load the training parameters.
+ did_bucket_num = cfg['pipeline']['main_clean']['did_bucket_num']
+ interval_time_in_seconds = cfg['pipeline']['main_logs']['interval_time_in_seconds']
+ log_table = cfg['pipeline']['main_logs']['logs_output_table_name']
+ trainready_table = cfg['pipeline']['main_trainready']['trainready_output_table']
+
+ # Create the input data table.
+ create_unified_log_table(self.spark, log_table)
+
+ # Clear the output from any previous runs.
+ util.drop_table(self.hive_context, trainready_table)
+
+ # Run the method to be tested.
+ main_trainready.generate_trainready(self.hive_context, util.load_batch_config(cfg),
+ interval_time_in_seconds, log_table, trainready_table, did_bucket_num)
+
+ # Validate the output.
+ df = util.load_df(self.hive_context, trainready_table)
+ self.validate_trainready_output(df, create_unified_log(self.spark))
+
+
+ def test_run (self):
+ # Load the configuration data.
+ with open('pipeline/config_trainready.yml', 'r') as ymlfile:
+ cfg = yaml.safe_load(ymlfile)
+
+ # Create the input data table.
+ log_table = cfg['pipeline']['main_logs']['logs_output_table_name']
+ create_unified_log_table(self.spark, log_table)
+
+ # Clear the output from any previous runs.
+ trainready_table = cfg['pipeline']['main_trainready']['trainready_output_table']
+ util.drop_table(self.hive_context, trainready_table)
+
+ # Run the method to be tested.
+ main_trainready.run(self.hive_context, cfg)
+
+ # Validate the output.
+ df = util.load_df(self.hive_context, trainready_table)
+ print(df.sort('did').show(100, False))
+ print_df_generator_code(df.sort('did'))
+ self.validate_trainready_output(df, create_unified_log(self.spark))
+
+ def validate_trainready_output (self, df, df_input_logs):
+ # Parses a list of list of ints or dicts of ints into a list of dicts/sets.
+ def parse_int_field (values):
+ result = []
+ for value in values:
+ # Evaluate the value as a set or map.
+ # Does not work for strings since there are no quotes.
+ result.append(eval('{' + value + '}'))
+ return result
+
+ # Parses a list of list of strings into a list of sets of strings.
+ def parse_string_field (values):
+ result = []
+ for value in values:
+ temp = set()
+ for val in value.split(','):
+ temp.add(val)
+ result.append(temp)
+ return result
+
+ # Verify the column names.
+ columns = {'did', 'did_index', 'did_bucket', 'interval_keywords',
+ 'gender', 'age', 'interval_starting_time',
+ 'kwi', 'kwi_show_counts', 'kwi_click_counts'}
+ for name in columns:
+ self.assertTrue(name in df.columns)
+
+ # Check that the number of rows is the same of the number of
+ # unique did's.
+ self.assertEqual(df.count(), df_input_logs.select('did').distinct().count())
+
+ seen_did = set()
+ for did in [ i['did'] for i in df_input_logs.select('did').distinct().collect() ]:
+ # Get the rows of the input and output data set for this did.
+ seen_did.add(did)
+ df_did = df.where(col('did') == did)
+ df_input_did = df_input_logs.where(col('did') == did)
+
+ # Verify that the output has only one row for this did.
+ self.assertEqual(df_did.count(), 1)
+ print(df_did.show(1, False))
+ print(df_input_did.show(100, False))
+
+ # Verify the non-aggregate columns.
+ df_did_row = df_did.collect()[0]
+ for column in ['did_bucket', 'age', 'gender']:
+ self.assertEqual(df_did_row[column],
+ df_input_did.collect()[0][column])
+
+ # Parse the show and click count fields.
+ show_counts = parse_int_field(df_did_row['kwi_show_counts'])
+ click_counts = parse_int_field(df_did_row['kwi_click_counts'])
+
+ # Parse the keyword fields.
+ keywords = parse_string_field(df_did_row['interval_keywords'])
+ kwi = parse_int_field(df_did_row['kwi'])
+
+ # Verify the aggregate columns by start time.
+ seen_start_times = set()
+ for start_time in [ int(i['interval_starting_time']) for i in df_input_did.select('interval_starting_time').distinct().collect() ]:
+ seen_start_times.add(start_time)
+
+ # Find the index of the start time.
+ index = -1
+ for i, time in enumerate(df_did_row['interval_starting_time']):
+ if int(time) == start_time:
+ index = i
+ # Fail if the start time is not found.
+ self.assertNotEqual(index, -1)
+
+ # Get the rows of the input dataframe for this start time.
+ df_st = df_input_did.where(col('interval_starting_time') == str(start_time))
+
+ # Iterate through the keyword indexes for this start time.
+ keyword_indexes = df_st.select('keyword_index').distinct().collect()
+ seen_kwi = set()
+ seen_keywords = set()
+ for keyword_index in [ int(i['keyword_index']) for i in keyword_indexes ]:
+ # Get the rows of the input dataframe with this did, start time, and keyword index.
+ seen_kwi.add(keyword_index)
+ df_kw = df_st.where(col('keyword_index') == keyword_index)
+ # print(df_kw.show(10, False))
+
+ # Validate the keyword and keyword index.
+ self.assertTrue(keyword_index in kwi[index])
+ for row in df_kw.collect():
+ self.assertTrue(int(row['keyword_index']) in kwi[index])
+ self.assertTrue(row['keyword'] in keywords[index])
+ seen_keywords.add(row['keyword'])
+
+ # Get the show and click count this this did, start time, and keyword index.
+ show_count = df_kw.where(col('is_click') == 0).count()
+ click_count = df_kw.where(col('is_click') == 1).count()
+
+ # Validate the click and show counts.
+ self.assertEqual(show_counts[index][keyword_index], show_count)
+ self.assertEqual(click_counts[index][keyword_index], click_count)
+
+ # Check for any additional keyword indices that should not be there.
+ self.assertEqual(len(show_counts[index]), len(seen_kwi))
+ for key in show_counts[index].keys():
+ self.assertTrue(key in seen_kwi)
+ self.assertEqual(len(click_counts[index]), len(seen_kwi))
+ for key in click_counts[index].keys():
+ self.assertTrue(key in seen_kwi)
+ self.assertEqual(len(kwi[index]), len(seen_kwi))
+ for i in kwi[index]:
+ self.assertTrue(i in seen_kwi)
+ self.assertEqual(len(keywords[index]), len(seen_keywords))
+ for i in keywords[index]:
+ self.assertTrue(i in seen_keywords)
+
+ # Check for any additional start times that should not be there.
+ self.assertEqual(len(df_did_row['interval_starting_time']), len(seen_start_times))
+ for i in df_did_row['interval_starting_time']:
+ self.assertTrue(int(i) in seen_start_times)
+
+ # Check for any did's that shouldn't be there.
+ output_did = [ i['did'] for i in df_input_logs.select('did').distinct().collect()]
+ self.assertEqual(len(output_did), len(seen_did))
+ for i in output_did:
+ self.assertTrue(i in seen_did)
+
+# Runs the tests.
+if __name__ == '__main__':
+ # Run the unit tests.
+ unittest.main()
diff --git a/Model/lookalike-model/tests/run_test.sh b/Model/lookalike-model/tests/run_test.sh
index be1d942..e4fdff9 100644
--- a/Model/lookalike-model/tests/run_test.sh
+++ b/Model/lookalike-model/tests/run_test.sh
@@ -4,14 +4,19 @@
cd $DIR
# test_main_clean: preparing cleaned persona, click and show logs data.
-if true
+if false
then
spark-submit --master yarn --num-executors 5 --executor-cores 2 --conf spark.hadoop.hive.exec.dynamic.partition=true --conf spark.hadoop.hive.exec.dynamic.partition.mode=nonstrict pipeline/test_main_clean.py
fi
# test_main_logs: merges click and show log data.
-if true
+if false
then
spark-submit --master yarn --num-executors 5 --executor-cores 2 --conf spark.hadoop.hive.exec.dynamic.partition=true --conf spark.hadoop.hive.exec.dynamic.partition.mode=nonstrict pipeline/test_main_logs.py
fi
+# test_main_logs: merges click and show log data.
+if true
+then
+ spark-submit --master yarn --num-executors 5 --executor-cores 2 --conf spark.hadoop.hive.exec.dynamic.partition=true --conf spark.hadoop.hive.exec.dynamic.partition.mode=nonstrict pipeline/test_main_trainready.py
+fi