blob: 2ac548e622e214630a11b982f6dd72cb9f02ea8f [file] [log] [blame]
# -*- coding: UTF-8 -*-
# Licensed to the Apache Software Foundation (ASF) under one
# or more contributor license agreements. See the NOTICE file
# distributed with this work for additional information
# regarding copyright ownership. The ASF licenses this file
# to you under the Apache License, Version 2.0 (the
# "License"); you may not use this file except in compliance
# with the License. You may obtain a copy of the License at
# http://www.apache.org/licenses/LICENSE-2.0.html
# Unless required by applicable law or agreed to in writing, software
# distributed under the License is distributed on an "AS IS" BASIS,
# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
# See the License for the specific language governing permissions and
# limitations under the License.
"""
This script, sum the impression for a traffic from factdata table.
Run like this.
# spark-submit --master yarn --num-executors 10 --executor-cores 5 --executor-memory 16G --driver-memory 16G imp_of_traffic.py
"""
import sys
from datetime import datetime, timedelta
import yaml
from pyspark import SparkContext
from pyspark.sql import HiveContext
from pyspark.sql.functions import udf, expr, collect_list, struct, explode
from pyspark.sql.types import StringType, ArrayType, MapType, FloatType, StructField, StructType, IntegerType
def sum_count_array(count_array):
'''
["2:28","1:15"]
'''
count = 0
for item in count_array:
_, value = item.split(':')
count += int(value)
return count
def run(cfg):
sc = SparkContext()
hive_context = HiveContext(sc)
sc.setLogLevel(cfg['log_level'])
# Reading the max bucket_id
bucket_size = cfg['bucket_size']
bucket_step = cfg['bucket_step']
factdata = cfg['factdata_table']
start_bucket = 0
total_count = 0
while True:
end_bucket = min(bucket_size, start_bucket + bucket_step)
if start_bucket > end_bucket:
break
# Read factdata table
command = """
SELECT
FACTDATA.count_array,
FACTDATA.day,
FACTDATA.hour,
FACTDATA.uckey
FROM {} AS FACTDATA
WHERE FACTDATA.bucket_id BETWEEN {} AND {}
AND FACTDATA.day='{}'
""".format(factdata, str(start_bucket), str(end_bucket), str(cfg['target_day']))
start_bucket = end_bucket + 1
df = hive_context.sql(command)
# [Row(count_array=[u'1:504'], day=u'2019-11-02', hour=2, uckey=u'magazinelock,04,WIFI,g_m,1,CPM,78'
# Extract traffic attributes from uckey based on arritbutes in traffic
uckey_attrs = cfg['uckey_attrs']
for attr_index in range(len(uckey_attrs)):
if (uckey_attrs[attr_index] in cfg['traffic']):
df = df.withColumn(uckey_attrs[attr_index], udf(lambda x: x.split(',')[attr_index], StringType())(df.uckey))
# e.g. [Row(uckey=u'magazinelock,01,2G,,,CPM,13', ...
# m=u'magazinelock', si=u'01', t=u'2G', g=u'', a=u'', pm=u'CPM', r=u'13')]
# Filter uckey based on traffic
for attr, attr_value in cfg['traffic'].items():
df = df.filter(df[attr] == str(attr_value))
# Extract imp-count from count_array
# [Row(uckey='native,72bcd2720e5011e79bc8fa163e05184e,WIFI,g_m,5,CPM,15,76', day='2019-11-02', hour=19, count_array=[u'1:504'])]
df = df.withColumn('count', udf(sum_count_array, IntegerType())(df.count_array))
l = df.agg({'count': 'sum'}).take(1)
if (len(l) > 0):
total_count += l[0]["sum(count)"]
sc.stop()
print('FINAL RESULT = {}'.format(str(total_count)))
if __name__ == '__main__':
cfg = {'factdata_table': 'factdata_hq_09222020_r_ipl_mapped_11052020',
'bucket_size': 10,
'bucket_step': 2,
'log_level': 'INFO',
'uckey_attrs': ['m', 'si', 't', 'g', 'a', 'pm', 'r', 'ipl'],
'target_day': '2020-06-01',
'traffic': {'si': '66bcd2720e5011e79bc8fa163e05184e'}
}
run(cfg)