add rti transform module
The module transforms the request table into the impression table.
 product_tag: 'dlpm'
 pipeline_tag: '111021_no_residency_no_mapping' # IMPORTANT: The pipeline tag has to be changed before each run to prevent record duplication.
-factdata_table_name: 'factdata_10202021' #factdata_hq_09222020
+factdata_table_name: 'factdata_test_02112022'  # factdata_10202021 #factdata_hq_09222020
   level: 'warn' # log level for spark and app
   config_table: '{product_tag}_{pipeline_tag}_config'
+  rti_transform: # This is to transform reques-based factdata to impression-based by filling out empty places
+    default_hour: 7
+    default_price_cat: '1'
+    day_step: 2
+    start_day: '2020-01-01'
+    end_day: '2020-01-02'
+    new_bucket_size: 2
+    input_table: 'factdata_request_01012022'
+    # output_table is factdata_table_name
   filter: # This is for data filtering- si and region
     percentile: 10     # This is for filtering traffic less than 1/10 of average traffic
     output_table_name: '{product_tag}_{pipeline_tag}_tmp_area_map'
     output_table_name: '{product_tag}_{pipeline_tag}_tmp_ts' # name of the hive table that keeps cleansed and normalized data before writing it into tfrecords, over-writes the existing table
     outlier_table: '{product_tag}_{pipeline_tag}_tmp_outlier'
   uckey_clustering: # This is done on whole data, not slicing on buckets
-    pre_cluster_table_name: '{product_tag}_{pipeline_tag}_tmp_pre_cluster'
+    pre_cluster_table_name: '{product_tag}_{pipeline_tag}_tmp_pre_cluster_test_12212021'
     create_pre_cluster_table: True
-    output_table_name: '{product_tag}_{pipeline_tag}_tmp_cluster'
+    output_table_name: '{product_tag}_{pipeline_tag}_tmp_cluster_test_12212021'
       number_of_virtual_clusters: 1000
       cluster_dense_num_ratio_cap: 0.01
       popularity_th: 4
       median_popularity_of_dense: 1856.2833251953125 # median imp of sparse=False, calculate once
   normalization: # This is done on whole data, not slicing on buckets
-    output_table_name: '{product_tag}_{pipeline_tag}_trainready'
+    output_table_name: '{product_tag}_{pipeline_tag}_trainready_test_12212021'
     columns: {
     'a': ['','1','2','3','4','5','6'],
     tfrecords_hdfs_path: 'factdata.tfrecord.{pipeline_tag}' # it is hdfs location for tfrecords, over-writes the existing files
     tf_statistics_path: './tf_statistics_{pipeline_tag}.pkl'
-    output_table_name: '{product_tag}_{pipeline_tag}_tmp_distribution'
-    output_detail_table_name: '{product_tag}_{pipeline_tag}_tmp_distribution_detail'
+    output_table_name: '{product_tag}_{pipeline_tag}_tmp_distribution_test_12212021'
+    output_detail_table_name: '{product_tag}_{pipeline_tag}_tmp_distribution_detail_test_12212021'
   tfrecords_local_path: './factdata.tfrecord.{pipeline_tag}' # it us local path for tfrecords, over-writes the existing files
   start: '' # help="Effective start date. Data before the start is dropped"
   end: '' # help="Effective end date. Data past the end is dropped"
   corr_backoffset: 0 # default=0, type=int, help="Offset for correlation calculation"
-  batch_size: 11880 # batch size of exmaples in tfrecord
+  batch_size: 155000 # batch size of exmaples in tfrecord,
   duration: 82 # time series length, This has to less or equal prepare_past_days
   tf_statistics_path: './tf_statistics_{pipeline_tag}.pkl'
   back_offset: 0 # don't change it.
-  table: '{product_tag}_{pipeline_tag}_model_stat'
+  table: '{product_tag}_{pipeline_tag}_model_stat_test_12212021'
   data_dir: data/vars
   ckpt_dir: data/cpt/s32 
   saved_dir: data/vars
   model_version: 'version_{pipeline_tag}'
-  model_name: 'model_{product_tag}_{pipeline_tag}'
+  model_name: 'model_{product_tag}_{pipeline_tag}_test_12212021'
   train_window: 60 # Should be same as the one in hparams
+# Copyright 2019, Futurewei Technologies
+# Licensed to the Apache Software Foundation (ASF) under one
+# or more contributor license agreements.  See the NOTICE file
+# distributed with this work for additional information
+# regarding copyright ownership.  The ASF licenses this file
+# to you under the Apache License, Version 2.0 (the
+#                                                 * "License"); you may not use this file except in compliance
+# with the License.  You may obtain a copy of the License at
+#  Unless required by applicable law or agreed to in writing,
+#  software distributed under the License is distributed on an
+#  KIND, either express or implied.  See the License for the
+#  specific language governing permissions and limitations
+#  under the License.
+import yaml
+import argparse
+from pyspark import SparkContext
+from pyspark.sql.functions import lit, udf
+from pyspark.sql import HiveContext
+from pyspark.sql.types import StringType, ArrayType
+from datetime import datetime, timedelta
+from util import resolve_placeholder
+import hashlib
+This module transform 
+T1 : request based factdata
+T2 : compatible factdata for
+|col_name               |data_type|comment|
+|uckey                  |string   |null   |
+|total                  |int      |null   |
+|pt_d                   |string   |null   |
+|# Partition Information|         |       |
+|# col_name             |data_type|comment|
+|pt_d                   |string   |null   |
+CREATE TABLE table_name ( uckey string, total int)                                
+PARTITIONED BY (pt_d string)
+|col_name               |data_type    |comment|
+|uckey                  |string       |null   |
+|count_array            |array<string>|null   |
+|hour                   |int          |null   |
+|day                    |string       |null   |
+|bucket_id              |int          |null   |
+|# Partition Information|             |       |
+|# col_name             |data_type    |comment|
+|day                    |string       |null   |
+|bucket_id              |int          |null   |
+spark-submit --master yarn --num-executors 10 --executor-cores 5 --executor-memory 16G --driver-memory 16G --conf spark.driver.maxResultSize=5G --conf spark.hadoop.hive.exec.dynamic.partition=true --conf spark.hadoop.hive.exec.dynamic.partition.mode=nonstrict pipeline/ config.yml
+def advance_date(date, day_added):
+    _time = datetime.strptime(date, "%Y-%m-%d")
+    _time = _time + timedelta(days=day_added)
+    return _time.strftime("%Y-%m-%d")
+def assign_new_bucket_id(df, n):
+    def __hash_sha256(s):
+        hex_value = hashlib.sha256(s.encode('utf-8')).hexdigest()
+        return int(hex_value, 16)
+    _udf = udf(lambda x: __hash_sha256(x) % n)
+    df = df.withColumn('bucket_id', _udf(df.uckey))
+    return df
+def __save_as_table(df, table_name, hive_context, create_table):
+    if create_table:
+        command = """
+            DROP TABLE IF EXISTS {}
+            """.format(table_name)
+        hive_context.sql(command)
+        command = """
+            (
+            uckey string, count_array array<string>, hour int, day string 
+            ) PARTITIONED BY (bucket_id int)
+            """.format(table_name)
+        hive_context.sql(command)
+              'count_array',
+              'hour',
+              'day',
+              'bucket_id'
+              ).write.format('hive').option("header", "true").option("encoding", "UTF-8").mode('append').insertInto(table_name)
+def run(hive_context,
+        input_table, output_table,
+        start_day, end_day, day_step,
+        new_bucket_size,
+        default_hour, default_price_cat):
+    _st = start_day
+    first_round = True
+    while True:
+        _end = min(end_day, advance_date(_st, day_step))
+        if _st > _end:
+            break
+        # Read factdata table
+        command = """
+        SELECT uckey, total, pt_d FROM {} WHERE pt_d BETWEEN '{}' AND '{}'
+        """.format(input_table, _st, _end)
+        _st = advance_date(_end, 1)
+        df = hive_context.sql(command)
+        print(command)
+        df = df.withColumnRenamed('pt_d', 'day')
+        # add count_array
+        # [Row(count_array=[u'0:0', u'1:0', u'2:0', u'3:0'], day=u'2018-03-09', hour=0, uckey=u'banner,1,3G,g_f,1,pt,1002,icc')]
+        df = df.withColumn('count_array', udf(lambda x: [default_price_cat + ':' + str(x)], ArrayType(StringType()))(
+       # add hour
+        df = df.withColumn('hour', lit(default_hour))
+        df = assign_new_bucket_id(df, new_bucket_size)
+        # we want to keep the paritions for a batch under 200
+        df = df.repartition(200)
+        # Writing into partitions might throw some exceptions but does not impair data.
+        __save_as_table(df, output_table, hive_context, first_round)
+        first_round = False
+    return
+if __name__ == "__main__":
+    parser = argparse.ArgumentParser(description='Prepare data')
+    parser.add_argument('config_file')
+    args = parser.parse_args()
+    # Load config file
+    with open(args.config_file, 'r') as ymlfile:
+        cfg = yaml.load(ymlfile, Loader=yaml.FullLoader)
+        resolve_placeholder(cfg)
+    cfg_log = cfg['log']
+    cfg_rti = cfg['pipeline']['rti_transform']
+    sc = SparkContext()
+    hive_context = HiveContext(sc)
+    sc.setLogLevel(cfg_log['level'])
+    default_hour = cfg_rti['default_hour']
+    default_price_cat = cfg_rti['default_price_cat']
+    day_step = cfg_rti['day_step']
+    start_day = cfg_rti['start_day']
+    end_day = cfg_rti['end_day']
+    new_bucket_size = cfg_rti['new_bucket_size']
+    input_table = cfg_rti['input_table']
+    output_table = cfg['factdata_table_name']
+    run(hive_context=hive_context,
+        input_table=input_table, output_table=output_table,
+        start_day=start_day, end_day=end_day, day_step=day_step,
+        new_bucket_size=new_bucket_size,
+        default_hour=default_hour, default_price_cat=default_price_cat)
+    sc.stop()
     spark-submit --master yarn --num-executors 10 --executor-cores 5 --executor-memory 16G --driver-memory 16G --conf spark.driver.maxResultSize=5G pipeline/ config.yml
+#This module transform T1 : request based factdata to T2 : compatible factdata for rest of pipelie
+if false
+    spark-submit --master yarn --num-executors 10 --executor-cores 5 --executor-memory 16G --driver-memory 16G --conf spark.driver.maxResultSize=5G --conf spark.hadoop.hive.exec.dynamic.partition=true --conf spark.hadoop.hive.exec.dynamic.partition.mode=nonstrict pipeline/ config.yml
 #Preparing the data by filtering reliable si, remapping r, ipl and recalculating bucket-ids
 #This part might be optional if uckeys have stable slot-id with region data
 if false