blob: b522b1007a3e3a8c546ae13098b065c7d185f263 [file] [log] [blame]
# Licensed to the Apache Software Foundation (ASF) under one
# or more contributor license agreements. See the NOTICE file
# distributed with this work for additional information
# regarding copyright ownership. The ASF licenses this file
# to you under the Apache License, Version 2.0 (the
# "License"); you may not use this file except in compliance
# with the License. You may obtain a copy of the License at
# http://www.apache.org/licenses/LICENSE-2.0.html
# Unless required by applicable law or agreed to in writing, software
# distributed under the License is distributed on an "AS IS" BASIS,
# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
# See the License for the specific language governing permissions and
# limitations under the License.
from imscommon.es.ims_esclient import ESClient
from pyspark import SparkContext, SparkConf, Row
from pyspark.sql.functions import concat_ws, count, lit, col, udf, expr, collect_list, sum as fsum
from pyspark.sql import HiveContext
from pyspark.sql.types import IntegerType, StringType
import math
from datetime import datetime, timedelta
from elasticsearch import Elasticsearch
import json
"""
This file calculated the actual and predicted traffic a si.
1. Input is a SI
2. Output is
actual traffic from ts (no join with dist_table, all the uckeys are participating)
predicted traffic from ES
spark-submit --master yarn --py-files lib/imscommon-2.0.0-py2.7.egg --num-executors 5 --executor-cores 3 --executor-memory 16G --driver-memory 16G tests/si_traffic_prediction_ckeck/si_traffic_prediction_check_2.py
+------------------------------+----------+----------+--------+---------+-----+
|version |si |day |actual |predicted|error|
+------------------------------+----------+----------+--------+---------+-----+
|110421_no_residency_no_mapping|a47eavw7ex|2021-07-22|68608008|74065399 |8% |
|110421_no_residency_no_mapping|a47eavw7ex|2021-07-23|69360585|75582893 |9% |
|110421_no_residency_no_mapping|a47eavw7ex|2021-07-24|69407841|70786481 |2% |
|110421_no_residency_no_mapping|a47eavw7ex|2021-07-25|66652553|73027064 |9% |
|110421_no_residency_no_mapping|a47eavw7ex|2021-07-26|65385719|73890032 |13% |
|110421_no_residency_no_mapping|a47eavw7ex|2021-07-27|69777146|73531806 |5% |
|110421_no_residency_no_mapping|a47eavw7ex|2021-07-28|70173316|73850899 |5% |
|110421_no_residency_no_mapping|a47eavw7ex|2021-07-29|69984013|76397722 |9% |
|110421_no_residency_no_mapping|a47eavw7ex|2021-07-30|70183372|75982843 |8% |
|110421_no_residency_no_mapping|a47eavw7ex|2021-07-31|67949880|73744501 |8% |
+------------------------------+----------+----------+--------+---------+-----+
Author: Reza
"""
"""
attr = uckey.split(',')
attributes_condition = {
'm': attr[0],
'si': attr[1],
't': attr[2],
'g': attr[3],
'a': attr[4],
'pm': attr[5],
'r': attr[6],
'ipl': attr[7]
}
"""
def query_predictions_traffic(cfg, starting_day, ending_day, si):
es_host, es_port = cfg['es_host'], cfg['es_port']
es_index, es_type = cfg['es_predictions_index'], cfg['es_predictions_type']
es = Elasticsearch([{'host': es_host, 'port': es_port}])
day = datetime.strptime(starting_day, '%Y-%m-%d')
end_date = datetime.strptime(ending_day, '%Y-%m-%d')
predicted_traffic = 0
# sum up the daily traffic as the total predicted traffic.
while day <= end_date:
prediction_day_query = {
"size": 0,
"query": {
"bool": {
"should": [
]
}
},
"aggs": {
"day": {
"sum": {
"field": "ucdoc.predictions.{}.hours.total".format(day.strftime('%Y-%m-%d'))
}
}
}
}
match = {
"match": {
"ucdoc.si": si
}
}
prediction_day_query['query']['bool']['should'].append(match)
res = es.search(index=es_index, body=prediction_day_query)
"""
result sample:
"hits" : {
"total" : 1339040,
"max_score" : 0.0,
"hits" : [ ]
},
"aggregations" : {
"day" : {
"value" : 17131.516769555536
}
}"""
day_predicted_result = res['aggregations']
day_predicted_traffic = int(day_predicted_result['day']['value']) if day_predicted_result['day']['value'] else 0
predicted_traffic += day_predicted_traffic
day = day + timedelta(days=1)
return predicted_traffic
def table_exists(table_name, hive_context):
command = """
SHOW TABLES LIKE '{}'
""".format(table_name)
df = hive_context.sql(command)
return df.count() > 0
def __save_as_table(df, table_name, hive_context):
if not table_exists(table_name, hive_context):
df.createOrReplaceTempView("r900_temp_table")
command = """
CREATE TABLE IF NOT EXISTS {} AS SELECT * FROM r900_temp_table
""".format(table_name)
hive_context.sql(command)
else:
df.write.format('hive').option("header", "true").option("encoding", "UTF-8").mode('append').insertInto(table_name)
def c_error(x, y):
x = x * 1.0
if x != 0:
e = abs(x - y) / x
else:
e = -1
e = round(e, 3)
return e
def run(hive_context, cfg, version, traffic, target_days):
command = """
SELECT
T1.uckey,
T1.price_cat,
T1.si,
T1.ts,
T1.ts_ver
FROM {} AS T1
WHERE T1.si='{}'
""".format(cfg['ts_table'], traffic['si'])
print(command)
df = hive_context.sql(command)
df.cache()
# The last days of the ts are ['2020-05-27', '2020-05-28', '2020-05-29', '2020-05-30', '2020-05-31'] which are real values
rows = df.collect()
# Actual Value
real_traffic = [0 for _ in range(len(target_days))]
shift = 1
for row in rows:
ts_ver = row['ts_ver']
ts_ver = ts_ver[shift:]
if len(target_days) != len(ts_ver):
raise Exception('ts_ver len is not equal to target_days')
for i in range(len(target_days)):
if ts_ver[i]:
real_traffic[i] += ts_ver[i]
# Predicted Value
predicted_traffic = []
for day in target_days:
v = query_predictions_traffic(cfg=cfg, starting_day=day, ending_day=day, si=si)
predicted_traffic.append(v)
errors = []
for i in range(len(target_days)):
error = c_error(real_traffic[i], predicted_traffic[i])
errors.append(round(100*error, 3))
# this is only one records
schema = ["version", "si", "day", "actual", "predicted", "error"]
data = []
for i in range(len(target_days)):
data.append((version, si, target_days[i], real_traffic[i], predicted_traffic[i], errors[i]))
df = hive_context.createDataFrame(data, schema)
#df.show(10, False)
__save_as_table(df, cfg['report_table'], hive_context)
def add_l2_to_l1(l1, l2):
for i in range(len(l2)):
if l2[i] is None:
continue
l1[i] += l2[i]
def agg_uckeys(mdict, _days):
tmp = [0 for _ in range(len(_days))]
for _, ts in mdict.items():
add_l2_to_l1(tmp, ts)
return tmp
if __name__ == "__main__":
cfg = {
'log_level': 'WARN',
'ts_table': 'dlpm_111021_no_residency_no_mapping_tmp_ts',
'uckey_attrs': ['m', 'si', 't', 'g', 'a', 'pm', 'r', 'ipl'],
'es_host': '10.213.37.41',
'es_port': '9200',
'es_predictions_index': 'dlpredictor_111021_no_residency_no_mapping_predictions_test_12212021',
'es_predictions_type': 'doc',
'report_table': 'si_traffic_prediction_check_111021_12212021'
}
# list of days in ts_ver in ts table.
target_days = sorted(["2021-07-22", "2021-07-23", "2021-07-24", "2021-07-25", "2021-07-26", "2021-07-27", "2021-07-28", "2021-07-29", "2021-07-30"])
VERSION = '110421_no_residency_no_mapping'
traffic = {'si': '', 'version': VERSION}
sis = [
'a47eavw7ex',
'66bcd2720e5011e79bc8fa163e05184e',
'x0ej5xhk60kjwq',
'l03493p0r3',
'7b0d7b55ab0c11e68b7900163e3e481d',
'b6le0s4qo8',
'e351de37263311e6af7500163e291137',
'a290af82884e11e5bdec00163e291137',
'68bcd2720e5011e79bc8fa163e05184e',
'f1iprgyl13',
'w9fmyd5r0i',
'w3wx3nv9ow5i97',
'd971z9825e',
'l2d4ec6csv',
'z041bf6g4s',
'71bcd2720e5011e79bc8fa163e05184e',
'5cd1c663263511e6af7500163e291137',
'x2fpfbm8rt',
'd9jucwkpr3',
'k4werqx13k',
'j1430itab9wj3b',
'a8syykhszz',
's4z85pd1h8',
'17dd6d8098bf11e5bdec00163e291137',
'd4d7362e879511e5bdec00163e291137']
sc = SparkContext.getOrCreate()
hive_context = HiveContext(sc)
sc.setLogLevel(cfg['log_level'])
for si in sis:
traffic['si'] = si
run(hive_context=hive_context, cfg=cfg, version=VERSION, traffic=traffic, target_days=target_days)
print(si)
sc.stop()