add rti transform module

The module transforms request table to impression table
diff --git a/Model/predictor-dl-model/predictor_dl_model/config.yml b/Model/predictor-dl-model/predictor_dl_model/config.yml
index fbc2ff2..16db94f 100644
--- a/Model/predictor-dl-model/predictor_dl_model/config.yml
+++ b/Model/predictor-dl-model/predictor_dl_model/config.yml
@@ -1,12 +1,23 @@
 product_tag: 'dlpm'
 pipeline_tag: '111021_no_residency_no_mapping' # IMPORTANT: The pipeline tag has to be changed before each run to prevent record duplication.
-factdata_table_name: 'factdata_10202021' #factdata_hq_09222020
+factdata_table_name: 'factdata_test_02112022'  # factdata_10202021 #factdata_hq_09222020
 
 log:
   level: 'warn' # log level for spark and app
 
 pipeline:
   config_table: '{product_tag}_{pipeline_tag}_config'
+
+  rti_transform: # This is to transform reques-based factdata to impression-based by filling out empty places
+    default_hour: 7
+    default_price_cat: '1'
+    day_step: 2
+    start_day: '2020-01-01'
+    end_day: '2020-01-02'
+    new_bucket_size: 2
+    input_table: 'factdata_request_01012022'
+    # output_table is factdata_table_name
+
   filter: # This is for data filtering- si and region
     percentile: 10     # This is for filtering traffic less than 1/10 of average traffic
     output_table_name: '{product_tag}_{pipeline_tag}_tmp_area_map'
@@ -51,9 +62,9 @@
     output_table_name: '{product_tag}_{pipeline_tag}_tmp_ts' # name of the hive table that keeps cleansed and normalized data before writing it into tfrecords, over-writes the existing table
     outlier_table: '{product_tag}_{pipeline_tag}_tmp_outlier'
   uckey_clustering: # This is done on whole data, not slicing on buckets
-    pre_cluster_table_name: '{product_tag}_{pipeline_tag}_tmp_pre_cluster'
+    pre_cluster_table_name: '{product_tag}_{pipeline_tag}_tmp_pre_cluster_test_12212021'
     create_pre_cluster_table: True
-    output_table_name: '{product_tag}_{pipeline_tag}_tmp_cluster'
+    output_table_name: '{product_tag}_{pipeline_tag}_tmp_cluster_test_12212021'
     cluster_size:
       number_of_virtual_clusters: 1000
       cluster_dense_num_ratio_cap: 0.01
@@ -64,7 +75,7 @@
       popularity_th: 4
       median_popularity_of_dense: 1856.2833251953125 # median imp of sparse=False, calculate once
   normalization: # This is done on whole data, not slicing on buckets
-    output_table_name: '{product_tag}_{pipeline_tag}_trainready'
+    output_table_name: '{product_tag}_{pipeline_tag}_trainready_test_12212021'
     columns: {
     'price_cat':['1','2','3'], 
     'a': ['','1','2','3','4','5','6'],
@@ -102,8 +113,8 @@
     tfrecords_hdfs_path: 'factdata.tfrecord.{pipeline_tag}' # it is hdfs location for tfrecords, over-writes the existing files
     tf_statistics_path: './tf_statistics_{pipeline_tag}.pkl'
   distribution:
-    output_table_name: '{product_tag}_{pipeline_tag}_tmp_distribution'
-    output_detail_table_name: '{product_tag}_{pipeline_tag}_tmp_distribution_detail'
+    output_table_name: '{product_tag}_{pipeline_tag}_tmp_distribution_test_12212021'
+    output_detail_table_name: '{product_tag}_{pipeline_tag}_tmp_distribution_detail_test_12212021'
 
 tfrecorder_reader:
   tfrecords_local_path: './factdata.tfrecord.{pipeline_tag}' # it us local path for tfrecords, over-writes the existing files
@@ -113,7 +124,7 @@
   start: '' # help="Effective start date. Data before the start is dropped"
   end: '' # help="Effective end date. Data past the end is dropped"
   corr_backoffset: 0 # default=0, type=int, help="Offset for correlation calculation"
-  batch_size: 11880 # batch size of exmaples in tfrecord
+  batch_size: 155000 # batch size of exmaples in tfrecord,
   duration: 82 # time series length, This has to less or equal prepare_past_days
   tf_statistics_path: './tf_statistics_{pipeline_tag}.pkl'
 
@@ -145,12 +156,12 @@
   back_offset: 0 # don't change it.
 
 save_model:
-  table: '{product_tag}_{pipeline_tag}_model_stat'
+  table: '{product_tag}_{pipeline_tag}_model_stat_test_12212021'
   data_dir: data/vars
   ckpt_dir: data/cpt/s32 
   saved_dir: data/vars
   model_version: 'version_{pipeline_tag}'
-  model_name: 'model_{product_tag}_{pipeline_tag}'
+  model_name: 'model_{product_tag}_{pipeline_tag}_test_12212021'
   train_window: 60 # Should be same as the one in hparams
 
 elastic_search:
diff --git a/Model/predictor-dl-model/predictor_dl_model/pipeline/main_rti_transform.py b/Model/predictor-dl-model/predictor_dl_model/pipeline/main_rti_transform.py
new file mode 100644
index 0000000..961f8a0
--- /dev/null
+++ b/Model/predictor-dl-model/predictor_dl_model/pipeline/main_rti_transform.py
@@ -0,0 +1,199 @@
+# Copyright 2019, Futurewei Technologies
+#
+# Licensed to the Apache Software Foundation (ASF) under one
+# or more contributor license agreements.  See the NOTICE file
+# distributed with this work for additional information
+# regarding copyright ownership.  The ASF licenses this file
+# to you under the Apache License, Version 2.0 (the
+#                                                 * "License"); you may not use this file except in compliance
+# with the License.  You may obtain a copy of the License at
+#
+#    http://www.apache.org/licenses/LICENSE-2.0
+#
+#  Unless required by applicable law or agreed to in writing,
+#  software distributed under the License is distributed on an
+#  "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+#  KIND, either express or implied.  See the License for the
+#  specific language governing permissions and limitations
+#  under the License.
+
+import yaml
+import argparse
+
+from pyspark import SparkContext
+from pyspark.sql.functions import lit, udf
+from pyspark.sql import HiveContext
+from pyspark.sql.types import StringType, ArrayType
+from datetime import datetime, timedelta
+from util import resolve_placeholder
+import hashlib
+
+
+'''
+This module transform 
+T1 : request based factdata
+T2 : compatible factdata for main_ts.py
+
+T1
+
++-----------------------+---------+-------+
+|col_name               |data_type|comment|
++-----------------------+---------+-------+
+|uckey                  |string   |null   |
+|total                  |int      |null   |
+|pt_d                   |string   |null   |
+|# Partition Information|         |       |
+|# col_name             |data_type|comment|
+|pt_d                   |string   |null   |
++-----------------------+---------+-------+
+CREATE TABLE table_name ( uckey string, total int)                                
+PARTITIONED BY (pt_d string)
+
+
+
+T2
+
+
++-----------------------+-------------+-------+
+|col_name               |data_type    |comment|
++-----------------------+-------------+-------+
+|uckey                  |string       |null   |
+|count_array            |array<string>|null   |
+|hour                   |int          |null   |
+|day                    |string       |null   |
+|bucket_id              |int          |null   |
+|# Partition Information|             |       |
+|# col_name             |data_type    |comment|
+|day                    |string       |null   |
+|bucket_id              |int          |null   |
++-----------------------+-------------+-------+
+
+
+spark-submit --master yarn --num-executors 10 --executor-cores 5 --executor-memory 16G --driver-memory 16G --conf spark.driver.maxResultSize=5G --conf spark.hadoop.hive.exec.dynamic.partition=true --conf spark.hadoop.hive.exec.dynamic.partition.mode=nonstrict pipeline/main_rti_transform.py config.yml
+
+'''
+
+
+def advance_date(date, day_added):
+    _time = datetime.strptime(date, "%Y-%m-%d")
+    _time = _time + timedelta(days=day_added)
+    return _time.strftime("%Y-%m-%d")
+
+
+def assign_new_bucket_id(df, n):
+    def __hash_sha256(s):
+        hex_value = hashlib.sha256(s.encode('utf-8')).hexdigest()
+        return int(hex_value, 16)
+    _udf = udf(lambda x: __hash_sha256(x) % n)
+    df = df.withColumn('bucket_id', _udf(df.uckey))
+    return df
+
+
+def __save_as_table(df, table_name, hive_context, create_table):
+
+    if create_table:
+        command = """
+            DROP TABLE IF EXISTS {}
+            """.format(table_name)
+
+        hive_context.sql(command)
+
+        command = """
+            CREATE TABLE IF NOT EXISTS {}
+            (
+            uckey string, count_array array<string>, hour int, day string 
+            ) PARTITIONED BY (bucket_id int)
+            """.format(table_name)
+
+        hive_context.sql(command)
+
+    df.select('uckey',
+              'count_array',
+              'hour',
+              'day',
+              'bucket_id'
+              ).write.format('hive').option("header", "true").option("encoding", "UTF-8").mode('append').insertInto(table_name)
+
+
+def run(hive_context,
+        input_table, output_table,
+        start_day, end_day, day_step,
+        new_bucket_size,
+        default_hour, default_price_cat):
+
+    _st = start_day
+    first_round = True
+
+    while True:
+
+        _end = min(end_day, advance_date(_st, day_step))
+
+        if _st > _end:
+            break
+
+        # Read factdata table
+        command = """
+        SELECT uckey, total, pt_d FROM {} WHERE pt_d BETWEEN '{}' AND '{}'
+        """.format(input_table, _st, _end)
+
+        _st = advance_date(_end, 1)
+
+        df = hive_context.sql(command)
+        print(command)
+
+        df = df.withColumnRenamed('pt_d', 'day')
+
+        # add count_array
+        # [Row(count_array=[u'0:0', u'1:0', u'2:0', u'3:0'], day=u'2018-03-09', hour=0, uckey=u'banner,1,3G,g_f,1,pt,1002,icc')]
+        df = df.withColumn('count_array', udf(lambda x: [default_price_cat + ':' + str(x)], ArrayType(StringType()))(df.total))
+
+       # add hour
+        df = df.withColumn('hour', lit(default_hour))
+
+        df = assign_new_bucket_id(df, new_bucket_size)
+
+        # we want to keep the paritions for a batch under 200
+        df = df.repartition(200)
+
+        # Writing into partitions might throw some exceptions but does not impair data.
+        __save_as_table(df, output_table, hive_context, first_round)
+
+        first_round = False
+
+    return
+
+
+if __name__ == "__main__":
+
+    parser = argparse.ArgumentParser(description='Prepare data')
+    parser.add_argument('config_file')
+    args = parser.parse_args()
+
+    # Load config file
+    with open(args.config_file, 'r') as ymlfile:
+        cfg = yaml.load(ymlfile, Loader=yaml.FullLoader)
+        resolve_placeholder(cfg)
+
+    cfg_log = cfg['log']
+    cfg_rti = cfg['pipeline']['rti_transform']
+
+    sc = SparkContext()
+    hive_context = HiveContext(sc)
+    sc.setLogLevel(cfg_log['level'])
+
+    default_hour = cfg_rti['default_hour']
+    default_price_cat = cfg_rti['default_price_cat']
+    day_step = cfg_rti['day_step']
+    start_day = cfg_rti['start_day']
+    end_day = cfg_rti['end_day']
+    new_bucket_size = cfg_rti['new_bucket_size']
+    input_table = cfg_rti['input_table']
+    output_table = cfg['factdata_table_name']
+
+    run(hive_context=hive_context,
+        input_table=input_table, output_table=output_table,
+        start_day=start_day, end_day=end_day, day_step=day_step,
+        new_bucket_size=new_bucket_size,
+        default_hour=default_hour, default_price_cat=default_price_cat)
+
+    sc.stop()
diff --git a/Model/predictor-dl-model/predictor_dl_model/run.sh b/Model/predictor-dl-model/predictor_dl_model/run.sh
index 5005517..23b22a7 100644
--- a/Model/predictor-dl-model/predictor_dl_model/run.sh
+++ b/Model/predictor-dl-model/predictor_dl_model/run.sh
@@ -5,6 +5,12 @@
     spark-submit --master yarn --num-executors 10 --executor-cores 5 --executor-memory 16G --driver-memory 16G --conf spark.driver.maxResultSize=5G pipeline/show_config.py config.yml
 fi
 
+#This module transform T1 : request based factdata to T2 : compatible factdata for rest of pipelie
+if false
+then
+    spark-submit --master yarn --num-executors 10 --executor-cores 5 --executor-memory 16G --driver-memory 16G --conf spark.driver.maxResultSize=5G --conf spark.hadoop.hive.exec.dynamic.partition=true --conf spark.hadoop.hive.exec.dynamic.partition.mode=nonstrict pipeline/main_rti_transform.py config.yml
+fi
+
 #Preparing the data by filtering reliable si, remapping r, ipl and recalculating bucket-ids
 #This part might be optional if uckeys have stable slot-id with region data
 if false