blob: 7baea3c73ed0a44bdfa3fa423bebae1bceee79bb [file] [log] [blame]
#!/usr/bin/env python3
#
# Licensed to the Apache Software Foundation (ASF) under one or more
# contributor license agreements. See the NOTICE file distributed with
# this work for additional information regarding copyright ownership.
# The ASF licenses this file to You under the Apache License, Version 2.0
# (the "License"); you may not use this file except in compliance with
# the License. You may obtain a copy of the License at
#
# http://www.apache.org/licenses/LICENSE-2.0
#
# Unless required by applicable law or agreed to in writing, software
# distributed under the License is distributed on an "AS IS" BASIS,
# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
# See the License for the specific language governing permissions and
# limitations under the License.
"""JIRA to PyPubSub bridge - polls JIRA for updates and pushes them to pypubsub."""
import requests
import time
import pytz
import datetime
import yaml
import sys
DEFAULT_POLL_INTERVAL = 5 # Default is poll once every five seconds.
DEFAULT_SETTINGS_FILE = "config.yaml"
# Launch time is recorded, so that we never fetch JIRA changes that are older than this.
LAUNCH_TIME = time.time()
# Global to keep track of the last time we saw an update for a specific ticket.
# This is far cheaper than keeping all the JiraTicket objects in memory.
UPDATES = {}
class Config:
def __init__(self, config_filepath = DEFAULT_SETTINGS_FILE):
self.yaml = yaml.safe_load(open(config_filepath).read())
self.jira_url = self.yaml["jira_url"]
self.jira_user = self.yaml["jira_user"]
self.jira_pass = self.yaml["jira_pass"]
self.jira_base = self.yaml["jira_base"]
self.pubsub_url = self.yaml["pubsub_url"]
self.debug = self.yaml.get("debug", False)
self.poll_interval = int(self.yaml.get("polling_internal", DEFAULT_POLL_INTERVAL))
class JiraTicket:
"""Jira Ticket parser class"""
def __init__(self, config: Config, json_data: dict):
"""Init a Jira ticket with the base data."""
self.key = json_data["key"]
self.link = f"{config.jira_base}{self.key}"
self.summary = json_data["fields"].get("summary", "(No summary available)")
self.created = self.jira_to_datetime(json_data["fields"]["created"])
self.last_update = UPDATES.get(self.key, LAUNCH_TIME)
self.events = []
self.json_data = json_data
self.config = config
@staticmethod
def jira_to_datetime(t: str):
"""Parses Jira timestamps into datetime objects"""
return datetime.datetime.strptime(t, "%Y-%m-%dT%H:%M:%S.%f%z")
def make_event_dict(self, entry: dict) -> [dict | None]:
"""Constructs a basic change event from a Jira change-set entry of any type"""
if "created" not in entry: # All entries must have this field, or we'll break...
return
change_epoch = self.jira_to_datetime(entry.get("updated", entry["created"])).timestamp()
# If this change is older than our last seen update, ignore and return None
if change_epoch <= self.last_update:
return
# Fetch author name and user ID if present
if "author" not in entry and "creator" in entry: # Rewrite author->creator if no author of this entry.
entry["author"] = entry["creator"]
if "author" in entry:
author_name = entry["author"].get("displayName", "??")
author_uid = entry["author"].get("name", "??")
else:
author_name = "??"
author_uid = "??"
# Make the basic event dict
base_event = {
"key": self.key,
"link": self.link,
"summary": self.summary,
"project": self.key.split("-", 1)[0],
"action": "unknown",
"author": author_name,
"author_uid": author_uid,
"timestamp": change_epoch,
"base_entry_created": self.jira_to_datetime(entry["created"]).timestamp()
}
return base_event
def parse_changelog(self):
"""Parses a changelog and formats it for humans"""
elements = self.json_data.get("changelog", {}).get("histories", [])
for change in elements:
base_event = self.make_event_dict(change)
if not base_event:
continue
# Check all ticket status changes/edits
for item in change["items"]:
new_event = base_event.copy() # Make a copy of the base event dict for each item
# Changing resolution
if item.get("field") == "resolution":
if not item.get("fromString"): # If changing from no res to a res, assume closing as $X
res = item.get("toString", "Unresolved")
if self.last_update > 0:
new_event["action"] = "close"
new_event["resolution"] = res
new_event["action_human_text"] = f"closed <{self.link}|{self.key}> as _{res}_."
self.events.append(new_event)
elif self.last_update > 0:
old_resolution = item.get("fromString")
new_resolution = item.get("toString", "Unresolved")
new_event["action"] = "resolution"
new_event["from"] = old_resolution
new_event["to"] = new_resolution
new_event["action_human_text"] = f"changed the resolution of <{self.link}|{self.key}> from "
f"_{old_resolution}_ to _{new_resolution}_."
self.events.append(new_event)
# Status change (like WFU/WFI)
elif item.get("field") == "status":
old_status = item.get("fromString", "")
new_status = item.get("toString")
if self.last_update > 0:
new_event["action"] = "status"
new_event["from"] = old_status
new_event["to"] = new_status
new_event["action_human_text"] = f"changed the status of <{self.link}|{self.key}> to *{new_status}*."
self.events.append(new_event)
# Summary change (editing the title of the ticket)
elif item.get("field") == "summary":
old_summary = item.get("fromString", "")
new_summary = item.get("toString")
if self.last_update > 0:
new_event["action"] = "summary"
new_event["from"] = old_summary
new_event["to"] = new_summary
new_event["action_human_text"] = f"changed the summary of <{self.link}|{self.key}>"
self.events.append(new_event)
# Description change (editing the main text body of the ticket)
elif item.get("field") == "description":
old_description = item.get("fromString", "")
new_description = item.get("toString")
if self.last_update > 0:
new_event["action"] = "description"
new_event["from"] = old_description
new_event["to"] = new_description
new_event["action_human_text"] = f"changed the description of <{self.link}|{self.key}>"
self.events.append(new_event)
# Assignee change
elif item.get("field") == "assignee":
old_assignee = item.get("fromString", "")
new_assignee = item.get("toString", "")
if self.last_update > 0:
new_event["action"] = "assign"
new_event["from"] = old_assignee
new_event["to"] = new_assignee
if new_assignee:
new_event["action_human_text"] = f"assigned *{new_assignee}* to <{self.link}|{self.key}>."
else:
new_event["action_human_text"] = f"unassigned *{old_assignee}* from <{self.link}|{self.key}>."
self.events.append(new_event)
def parse_comments(self):
"""Parse comment changes and format them for humans"""
elements = self.json_data["fields"].get("comment", {}).get("comments", [])
for comment in elements:
base_event = self.make_event_dict(comment)
if not base_event:
continue
# Comment altered?
if base_event["base_entry_created"] != base_event["timestamp"]:
if self.last_update > 0:
base_event["action"] = "comment_edit"
base_event["body"] = comment["body"]
base_event["action_human_text"] = "edited a comment"
self.events.append(base_event)
# Or comment created?
elif self.last_update > 0:
base_event["action"] = "comment"
base_event["body"] = comment["body"]
base_event["action_human_text"] = "added a comment"
self.events.append(base_event)
def parse_worklog(self):
"""Parses worklog changes and formats it for humans"""
elements = self.json_data["fields"].get("worklog", {}).get("worklogs", [])
for worklog in elements:
base_event = self.make_event_dict(worklog)
if not base_event:
continue
# Comment altered?
if base_event["base_entry_created"] != base_event["timestamp"]:
if self.last_update > 0:
base_event["action"] = "comment_edit"
base_event["body"] = worklog["comment"]
base_event["timespent"] = worklog["timeSpentSeconds"]
base_event["action_human_text"] = "edited a worklog entry"
self.events.append(base_event)
# Or comment created?
elif self.last_update > 0:
base_event["action"] = "comment"
base_event["body"] = worklog["comment"]
base_event["timespent"] = worklog["timeSpentSeconds"]
base_event["action_human_text"] = "added a worklog entry"
self.events.append(base_event)
def fetch_changes(config: Config):
"""Fetch the latest changes from JIRA and process them"""
try:
js = requests.get(config.jira_url, auth=(config.jira_user, config.jira_pass), timeout=15).json()
assert isinstance(js, dict), "Jira search did not return a dictionary response"
return js.get("issues", [])
except requests.RequestException as e: # This should work - if for whatever reason it doesn't, try again later.
print(f"Could not contact Jira, waiting for next try: {e}")
return []
except AssertionError as e:
print(f"General assertion error with Jira response: {e}")
def process_changes(config: Config, changes: list):
now = datetime.datetime.now(pytz.utc)
for issue in changes:
ticket = JiraTicket(config, issue)
# Is this a brand-new issue?
if ticket.last_update == 0 and (now - ticket.created).seconds <= 30:
UPDATES[ticket.key] = ticket.created.timestamp()
base_event = ticket.make_event_dict(issue["fields"])
base_event["action"] = "create"
base_event["action_human_text"] = f"created {ticket.key}: {ticket.summary}"
base_event["description"] = issue["fields"].get("description", "(No description available)")
ticket.events.append(base_event)
# Parse changelog, comments, worklog
ticket.parse_changelog()
ticket.parse_comments()
ticket.parse_worklog()
if ticket.events: # If any changes were found, update our "last updated" marker for this ticket.
UPDATES[ticket.key] = max([event["timestamp"] for event in ticket.events])
# Post all new events found to pubsub
for event in ticket.events:
# Set some basic things for each change-set that we only get from the base ticket entry
target_url = config.pubsub_url.format(**event)
if config.debug:
print(event["timestamp"], event["key"], event["author"], event["action_human_text"])
print(event)
print("---------------")
else:
try:
requests.post(target_url, json=event, timeout=5)
except requests.RequestException as e:
print(f"Could not post update for {ticket.key}: {e}")
def main():
config = Config(sys.argv[1] if (len(sys.argv) > 1) else DEFAULT_SETTINGS_FILE)
while True: # Loop the Loop, forever and ever.
process_start = time.time() # Log when we started processing this batch
change_set = fetch_changes(config) # Fetch latest change-set
process_changes(config, change_set) # Process change-set (if any)
process_duration = time.time() - process_start # Figure out how long that process took
if process_duration < config.poll_interval: # If we need to sleep, sleep, otherwise poll again.
time.sleep(config.poll_interval - process_duration)
if __name__ == "__main__":
main()