blob: c18b45c5e65d14f5efbebf467e2d7b477a204983 [file] [log] [blame]
# Licensed to the Apache Software Foundation (ASF) under one
# or more contributor license agreements. See the NOTICE file
# distributed with this work for additional information
# regarding copyright ownership. The ASF licenses this file
# to you under the Apache License, Version 2.0 (the
# "License"); you may not use this file except in compliance
# with the License. You may obtain a copy of the License at
# http://www.apache.org/licenses/LICENSE-2.0.html
# Unless required by applicable law or agreed to in writing, software
# distributed under the License is distributed on an "AS IS" BASIS,
# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
# See the License for the specific language governing permissions and
# limitations under the License.
from pyspark import SparkContext
from pyspark.sql import HiveContext
import pyspark.sql.functions as functions
from operator import add
import time
"""
Author: Eric Tsai
This script reads the *_tmp_ts and *_distribution_detail Hive tables and print the
impression totals for all the uckeys, the dense uckeys, and the sparse ones.
"""
def run(cfg):
start_time = time.time()
sc = SparkContext()
hive_context = HiveContext(sc)
sc.setLogLevel(cfg['log_level'])
# Load the distribution detail table.
distribution_table = cfg['distribution_table']
command = 'select * from {}'.format(distribution_table)
df = hive_context.sql(command)
df = df.select('uckey', 'price_cat', 'ratio')
df.cache()
# Get the dense uckeys.
df_dense = df.filter(df['ratio'] == 1)
# Get the sparse uckeys.
df_sparse = df.filter(df['ratio'] != 1)
# Read the time series tables and unpack the time series column and
# filter out the nulls.
time_series_table = cfg['time_series_table']
command = 'select * from {}'.format(time_series_table)
df_ts = hive_context.sql(command)
df_ts = df_ts.withColumn('day_imp', functions.explode(functions.col('ts')))
df_ts = df_ts.filter(df_ts.day_imp.isNotNull())
df_ts.select('uckey', 'price_cat', 'day_imp')
df_ts.cache()
# Calculate the total impressions in the time series table.
total = df_ts.rdd.map(lambda x: (1,x['day_imp'])).reduceByKey(add).collect()[0][1]
print('Total: {:>15,}'.format(total))
lap_time = int(time.time() - start_time)
print('Total elapsed time: {}:{:02}'.format(lap_time/60, lap_time%60))
# Calculate the total number of impressions in the dense (non-sparse) uckeys.
df_ts_dense = df_ts.join(df_dense, on=[ 'uckey', 'price_cat' ], how='inner')
dense_total = df_ts_dense.rdd.map(lambda x: (1,x['day_imp'])).reduceByKey(add).collect()[0][1]
print('Dense impression total: {:>15,}'.format(dense_total))
lap_time = int(time.time() - start_time)
print('Total elapsed time: {}:{:02}'.format(lap_time/60, lap_time%60))
# Calculate the total number of impressions of sparse uckeys.
df_ts_sparse = df_ts.join(df_sparse, on=[ 'uckey', 'price_cat' ], how='inner')
sparse_total = df_ts_sparse.rdd.map(lambda x: (1,x['day_imp'])).reduceByKey(add).collect()[0][1]
print('Sparse impression total: {:>15,}'.format(dense_total))
lap_time = int(time.time() - start_time)
print('Total elapsed time: {}:{:02}'.format(lap_time/60, lap_time%60))
df_ts_other = df_ts.join(df_dense, on=[ 'uckey', 'price_cat' ], how='left_anti')
df_ts_other = df_ts_other.join(df_sparse, on=[ 'uckey', 'price_cat' ], how='left_anti')
df_ts_other.select('day_imp')
other_total = df_ts_other.rdd.map(lambda x: (1,x['day_imp'])).reduceByKey(add).collect()[0][1]
lap_time = int(time.time() - start_time)
print('Total elapsed time: {}:{:02}'.format(lap_time/60, lap_time%60))
sc.stop()
# Print the results.
print('Reading data from tables: {} and {}'.format(time_series_table, distribution_table))
print('Dense impression total: {:>15,}'.format(dense_total))
print('Sparse impression total: {:>15,}'.format(sparse_total))
print('Other impression total: {:>15,}'.format(other_total))
print('Dense + sparse total: {:>15,}'.format(dense_total + sparse_total))
print('Dense + sparse + other total: {:>15,}'.format(dense_total + sparse_total + other_total))
print('Total impressions in table: {:>15,}'.format(total))
print('Dense impression %: {:>5.1f}%'.format(float(dense_total)*100/(dense_total + sparse_total)))
print('Sparse impression %: {:>5.1f}%'.format(float(sparse_total)*100/(dense_total + sparse_total)))
if __name__ == '__main__':
cfg = {
# 'distribution_table': 'dlpm_03182021_tmp_distribution_detail',
# 'time_series_table': 'dlpm_03182021_tmp_ts',
'distribution_table': 'dlpm_05182021_1500_tmp_distribution_detail',
'time_series_table': 'dlpm_05182021_1500_tmp_ts',
'log_level': 'WARN'
}
run(cfg)