blob: dbfe32b84e00b7b918c7ba91b56dfa36479a71c9 [file] [log] [blame]
# Licensed to the Apache Software Foundation (ASF) under one
import pyspark.sql.functions as fn
from pyspark import SparkContext
from pyspark.sql import HiveContext
''' Get the ts table and canculate the traffic in each day for each SI '''
def run(hive_context, input_table_name):
# Read factdata table
command = """
SELECT si,ts FROM {}
""".format(input_table_name)
# DataFrame[ts: array<int>]
df = hive_context.sql(command)
columns = ['ts']
df_sizes = df.select(*[fn.size(col).alias(col) for col in columns])
df_max = df_sizes.agg(*[fn.max(col).alias(col) for col in columns])
max_dict = df_max.collect()[0].asDict()
df_result = df.select('si', *[df[col][i] for col in columns for i in range(max_dict[col])])
df_result = df_result.na.fill(value=0)
df_result.toPandas().to_csv('total_si')
if __name__ == '__main__':
sc = SparkContext()
hive_context = HiveContext(sc)
run(hive_context=hive_context, input_table_name="dlpm_10052021_1400_tmp_ts")