blob: 95c3f935ae9aecc44dcf74f575df959677a12b2d [file] [log] [blame]
#
# Licensed to the Apache Software Foundation (ASF) under one or more
# contributor license agreements. See the NOTICE file distributed with
# this work for additional information regarding copyright ownership.
# The ASF licenses this file to You under the Apache License, Version 2.0
# (the "License"); you may not use this file except in compliance with
# the License. You may obtain a copy of the License at
#
# http://www.apache.org/licenses/LICENSE-2.0
#
# Queries Jira to collect metrics and put them in postgresql.
import os
import psycopg2
import re
import requests
import socket
import sys
import time
from datetime import datetime, timedelta
from xml.etree import ElementTree
# Keeping this as reference for localhost debug
# Fetching docker host machine ip for testing purposes.
# Actual host should be used for production.
# import subprocess
# cmd_out = subprocess.check_output(["ip", "route", "show"]).decode("utf-8")
# host = cmd_out.split(" ")[2]
host = os.environ['DB_HOST']
port = os.environ['DB_PORT']
dbname = os.environ['DB_DBNAME']
dbusername = os.environ['DB_DBUSERNAME']
dbpassword = os.environ['DB_DBPWD']
jiraIssuesTableName = 'jira_issues'
jiraIssuesCreateTableQuery = f"""
create table {jiraIssuesTableName} (
id integer PRIMARY KEY,
key varchar,
creator varchar,
assignee varchar,
status varchar,
labels varchar,
summary varchar,
created TIMESTAMP,
resolutiondate TIMESTAMP)
"""
jiraIssuesMetadataTableName = 'jira_issues_metadata'
jiraIssuesMetadataCreateTableQuery = f"""
create table {jiraIssuesMetadataTableName} (
lastsynctime TIMESTAMP
)
"""
def fetchIssues(startTime, startAt = 0):
startTimeStr = startTime.strftime("%Y-%m-%d %H:%M")
# time format 0001-01-01 00:01 ==> "yyyy-mm-dd hh:mm"
url = (f'https://issues.apache.org/jira/rest/api/latest/search?'
f'jql=project=BEAM '
f'AND updated > "{startTimeStr} "'
f'AND component=test-failures'
f'&maxResults=100&startAt={startAt}')
print(url)
r = requests.get(url)
return r.json()
def initDBConnection():
conn = None
while not conn:
try:
conn = psycopg2.connect(f"dbname='{dbname}' user='{dbusername}' host='{host}'"
f" port='{port}' password='{dbpassword}'")
except:
print('Failed to connect to DB; retrying in 1 minute')
time.sleep(60)
return conn
def tableExists(cursor, tableName):
cursor.execute(f"select * from information_schema.tables"
f" where table_name='{tableName}';")
return bool(cursor.rowcount)
def initDBTablesIfNeeded():
connection = initDBConnection()
cursor = connection.cursor()
buildsTableExists = tableExists(cursor, jiraIssuesTableName)
print('Builds table exists', buildsTableExists)
if not buildsTableExists:
cursor.execute(jiraIssuesCreateTableQuery)
if not bool(cursor.rowcount):
raise Exception(f"Failed to create table {jiraIssuesTableName}")
metadataTableExists = tableExists(cursor, jiraIssuesMetadataTableName)
print('Metadata table exists', buildsTableExists)
if not metadataTableExists:
cursor.execute(jiraIssuesMetadataCreateTableQuery)
if not bool(cursor.rowcount):
raise Exception(f"Failed to create table {jiraIssuesMetadataTableName}")
minTimestamp = datetime(1970, 1, 1)
insertDummyTimestampSqlQuery = f"insert into {jiraIssuesMetadataTableName} values (%s)"
cursor.execute(insertDummyTimestampSqlQuery, [minTimestamp])
cursor.close()
connection.commit()
connection.close()
def updateLastSyncTimestamp(timestamp):
connection = initDBConnection()
cursor = connection.cursor()
cleanupQuery = f"delete from {jiraIssuesMetadataTableName}"
cursor.execute(cleanupQuery)
insertTimestampSqlQuery = f"insert into {jiraIssuesMetadataTableName} values (%s)"
cursor.execute(insertTimestampSqlQuery, [timestamp])
cursor.close()
connection.commit()
connection.close()
def fetchLastSyncTime():
connection = initDBConnection()
cursor = connection.cursor()
fetchQuery = f'select lastsynctime from {jiraIssuesMetadataTableName}'
cursor.execute(fetchQuery)
result = cursor.fetchone()[0]
cursor.close()
connection.close()
return result
def buildRowValuesArray(issue):
fields = issue['fields']
values = [issue['id'],
issue['key'],
fields['creator']['name'],
fields['assignee']['name'] if fields['assignee'] is not None else None,
fields['status']['name'],
fields['labels'],
fields['summary'],
fields['created'],
fields['resolutiondate']
]
return values
def insertRow(cursor, rowValues):
insertClause = (f'''insert into {jiraIssuesTableName}
values (%s, %s, %s, %s, %s, %s, %s, %s, %s)
ON CONFLICT (id) DO UPDATE
set
id = excluded.id,
key = excluded.key,
creator = excluded.creator,
assignee = excluded.assignee,
status = excluded.status,
labels = excluded.labels,
summary = excluded.summary,
created = excluded.created,
resolutiondate = excluded.resolutiondate'''
)
cursor.execute(insertClause, rowValues)
def fetchNewData():
currentTimestamp = datetime.now()
lastSyncTimestamp = fetchLastSyncTime()
startAt = 0
total = 1
while (startAt < total):
queryResult = fetchIssues(lastSyncTimestamp, startAt)
newIssues = queryResult['issues']
fetchedCount = len(newIssues)
startAt += fetchedCount
total = queryResult['total']
connection = initDBConnection()
cursor = connection.cursor()
for issue in newIssues:
rowValues = buildRowValuesArray(issue)
insertRow(cursor, rowValues)
cursor.close()
connection.commit()
connection.close()
updateLastSyncTimestamp(currentTimestamp)
lastSyncTimestamp = fetchLastSyncTime()
def probeJiraIsUp():
sock = socket.socket(socket.AF_INET, socket.SOCK_STREAM)
# This is close enough for Jira
result = sock.connect_ex(('issues.apache.org', 443))
return True if result == 0 else False
################################################################################
if __name__ == '__main__':
print("Started.")
print("Checking if DB needs to be initialized.")
sys.stdout.flush()
initDBTablesIfNeeded()
print("Start jobs fetching loop.")
sys.stdout.flush()
while True:
if not probeJiraIsUp():
print("Jira is unavailable, skipping fetching data.")
continue
else:
print("Start fetching data.")
fetchNewData()
print("Done fetching data.")
print("Sleeping for 5 min.")
sys.stdout.flush()
time.sleep(5 * 60)
print('Done.')