blob: 60904f920b8e8b43528752ad9ed9ad175aa6ab53 [file] [log] [blame]
#!/usr/bin/env python3
#
# Licensed to the Apache Software Foundation (ASF) under one
# or more contributor license agreements. See the NOTICE file
# distributed with this work for additional information
# regarding copyright ownership. The ASF licenses this file
# to you under the Apache License, Version 2.0 (the
# "License"); you may not use this file except in compliance
# with the License. You may obtain a copy of the License at
#
# http://www.apache.org/licenses/LICENSE-2.0
#
# Unless required by applicable law or agreed to in writing, software
# distributed under the License is distributed on an "AS IS" BASIS,
# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
# See the License for the specific language governing permissions and
# limitations under the License.
import argparse
import hashlib
import json
import os
import re
import psycopg2
import creds
import csv
import glob
import datetime
parser = argparse.ArgumentParser()
parser.add_argument('--benchmark_dir', '-b',
help='Directory containing the benchmark to be submitted')
parser.add_argument('--identifier', '-i', help='Unique identifier for this benchmark result')
parser.add_argument('--instance_id', '-I', help='instance id to use if not present in metadata')
args = parser.parse_args()
build_identifier = ""
benchmark_dir = args.benchmark_dir.rstrip('/')
print("***************************")
print(f"processing benchmark data set in {benchmark_dir}.")
if args.identifier is not None:
build_identifier = args.identifier
metadata_file = f"{benchmark_dir}/metadata.json"
with open(metadata_file, "r") as read_file:
metadata_string = read_file.read()
data = json.loads(metadata_string)
# what we need to create a benchmark_build entry
ci_sha = ""
benchmark_sha = ""
build_version = ""
instance_id = ""
benchmarks_raw_results_uri = ""
notes = ""
build_sha = ""
if data["testMetadata"] is not None:
testmetadata = data["testMetadata"]
if 'instance_id' in testmetadata and testmetadata["instance_id"] is not None:
instance_id = testmetadata["instance_id"]
if (instance_id is None or instance_id == "") and args.instance_id is not None:
instance_id = args.instance_id
if 'source_version' in testmetadata and testmetadata["source_version"] is not None:
build_version = testmetadata["source_version"]
if 'source_revision' in testmetadata and testmetadata["source_revision"] is not None:
build_sha = testmetadata["source_revision"]
if 'benchmark_sha' in testmetadata and testmetadata["benchmark_sha"] is not None:
benchmark_sha = testmetadata["benchmark_sha"]
if (build_identifier is None or build_identifier == "") and \
'build_identifier' in testmetadata and \
testmetadata["build_identifier"] is not None:
build_identifier = testmetadata["build_identifier"]
if build_identifier == "":
m = hashlib.sha1()
m.update(metadata_string.encode('utf-8'))
m.update(f"{os.path.getmtime(metadata_file):.9f}".encode('utf-8'))
build_identifier = m.hexdigest()
print(f"The build identifier for this benchmark dataset is {build_identifier} ")
if instance_id == "":
possible_benchmark_archive_dir = benchmark_dir + "/../.."
possible_instance_id = os.path.basename(os.path.abspath(possible_benchmark_archive_dir))
if re.search(r'Benchmark-\d+-\d+',possible_instance_id) is not None:
instance_id = possible_instance_id
print(f"The instance id for this benchmark dataset is {instance_id}")
# Set up a connection to the postgres server.
conn_string = "host=" + creds.PGHOST + \
" port=5432" + \
" dbname=" + creds.PGDATABASE + \
" user=" + creds.PGUSER + \
" password=" + creds.PGPASSWORD
conn = psycopg2.connect(conn_string)
print("Connected to database!")
# Create a cursor object
cursor = conn.cursor()
# figure out if we've already submitted the data
identifier_command = f"select build_id from public.benchmark_build where build_identifier = %s"
cursor.execute(identifier_command, (build_identifier,))
rows = cursor.fetchall()
if len(rows) > 0:
print("* This build data has already been submitted to the database.")
exit(1)
table_columns = [
"ci_sha",
"benchmark_sha",
"build_version",
"instance_id",
"benchmarks_raw_results_uri",
"notes",
"build_sha",
"build_identifier"
]
table_values = []
for junk in table_columns:
table_values.append("%s")
sql_command = f"INSERT INTO public.benchmark_build({','.join(table_columns)}) " \
f"values ({','.join(table_values)}) returning build_id"
cursor.execute(sql_command, (ci_sha, benchmark_sha, build_version, instance_id, benchmarks_raw_results_uri, notes, build_sha, build_identifier))
build_id = cursor.fetchone()[0]
conn.commit()
if data["testNames"] is not None:
testnames = data["testNames"]
for testname in testnames:
testdir = f"{benchmark_dir}/{testname}"
clientdirs = glob.glob(f"{testdir}/client-*")
for clientdir in clientdirs:
latencyfilename = f"{clientdir}/latency_csv.hgrm"
sql_command = f"INSERT INTO " \
f"public.latency_result(build_id, benchmark_test, value, percentile, " \
f"total_count, one_by_one_minus_percentile) values({build_id}, '{testname}'," \
f" %s, %s, %s, %s)"
print(f"Submitting latency data for {testname}")
with open(latencyfilename) as f:
reader = csv.DictReader(filter(lambda row: row[0] != '#', f))
data = [r for r in reader]
for datum in data:
if datum['1/(1-Percentile)'] != 'Infinity':
cursor.execute(sql_command, (datum['Value'],
datum['Percentile'],
datum['TotalCount'],
datum['1/(1-Percentile)']))
# conn.commit()
yardstickdirs = glob.glob(f"{clientdir}/*-yardstick-output")
yardstickdir = yardstickdirs[0] if (yardstickdirs is not None) else None
if yardstickdir is not None:
throughputfilename = f"{yardstickdir}/ThroughputLatencyProbe.csv"
sql_command = f"INSERT INTO public.throughput_result(build_id, benchmark_test, " \
f"timestamp, ops_per_sec) values({build_id}, '{testname}', %s, %s)"
print(f"Submitting throughput data for {testname}")
with open(throughputfilename) as f:
reader = csv.DictReader(filter(lambda row: row[0] != '#' and
row[0] != '-' and
row[0] != '@' and
row[0] != '*', f),
fieldnames=('time', 'operations', 'latency'))
data = [r for r in reader]
for datum in data:
cursor.execute(sql_command,
(datetime.datetime.fromtimestamp(int(datum['time'])),
int(float(datum['operations']))))
conn.commit()
cursor.close()
conn.close()