blob: e218ec39f1b6fa037bea1ef5a174e14e6a8db858 [file] [log] [blame]
#!/usr/bin/env python3
# -*- coding: utf-8 -*-
# Licensed to the Apache Software Foundation (ASF) under one or more
# contributor license agreements. See the NOTICE file distributed with
# this work for additional information regarding copyright ownership.
# The ASF licenses this file to You under the Apache License, Version 2.0
# (the "License"); you may not use this file except in compliance with
#the License. You may obtain a copy of the License at
#
# http://www.apache.org/licenses/LICENSE-2.0
#
# Unless required by applicable law or agreed to in writing, software
# distributed under the License is distributed on an "AS IS" BASIS,
# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
# See the License for the specific language governing permissions and
# limitations under the License.
""" Git Evolution scanner """
import os
import subprocess
import re
import time
import datetime
import plugins.utils.git
import plugins.utils.sloc
import hashlib
from collections import namedtuple
title = "Git Evolution Scanner"
version = "0.1.0"
def accepts(source):
""" Do we accept this source? """
if source['type'] == 'git':
return True
# There are cases where we have a github repo, but don't wanna annalyze the code, just issues
if source['type'] == 'github' and source.get('issuesonly', False) == False:
return True
return False
def get_first_ref(gpath):
try:
return subprocess.check_output("cd %s && git log `git rev-list --max-parents=0 HEAD` --pretty=format:%%ct" % gpath,
shell = True)
except:
print("Could not get first ref, exiting!")
return None
def acquire(KibbleBit, source):
source['steps']['evolution'] = {
'time': time.time(),
'status': 'Evolution scan started at ' + time.strftime("%a, %d %b %Y %H:%M:%S +0000", time.gmtime()),
'running': True,
'good': True,
}
KibbleBit.updateSource(source)
def release(KibbleBit, source, status, exception=None, good=False):
source['steps']['evolution'] = {
'time': time.time(),
'status': status,
'running': False,
'good': good,
}
if exception:
source['steps']['evolution'].update({'exception': exception})
KibbleBit.updateSource(source)
def check_branch(gpath, date, branch):
try:
subprocess.check_call('cd %s && git rev-list -n 1 --before="%s" %s' % (gpath, date,
branch),
shell = True)
return True
except:
return False
def checkout(gpath, date, branch):
#print("Ready to cloc...checking out %s " % date)
try:
ref = subprocess.check_output('cd %s && git rev-list -n 1 --before="%s" "%s"' %
(gpath, date, branch),
shell = True,
stderr=subprocess.DEVNULL).decode('ascii',
'replace').strip()
subprocess.call('cd %s && git checkout %s -- ' % (gpath, ref),
shell = True,
stderr=subprocess.DEVNULL,
stdout=subprocess.DEVNULL)
except:
#print("borked!")
pass
def find_branch(date, gpath):
try:
os.chdir(gpath)
subprocess.check_call('cd %s && git rev-list -n 1 --before="%s" master' % (gpath, date), shell = True, stderr=subprocess.DEVNULL)
return "master"
except:
os.chdir(gpath)
branch = ""
try:
return subprocess.check_output('cd %s && git rev-parse --abbrev-ref HEAD' % gpath,
shell = True,
stderr=subprocess.DEVNULL).decode('ascii',
'replace').strip().strip("* ")
except:
#print("meh! no branch")
return None
def scan(KibbleBit, source):
rid = source['sourceID']
url = source['sourceURL']
rootpath = "%s/%s/git" % (KibbleBit.config['scanner']['scratchdir'], source['organisation'])
gpath = os.path.join(rootpath, rid)
gname = source['sourceID']
KibbleBit.pprint("Doing evolution scan of %s" % gname)
inp = get_first_ref(gpath)
if inp:
ts = int(inp.split()[0])
ts = ts - (ts % 86400)
date = time.strftime("%Y-%b-%d 0:00", time.gmtime(ts))
#print("Starting from %s" % date)
now = time.time()
rid = source['sourceID']
url = source['sourceURL']
rootpath = "%s/%s/git" % (KibbleBit.config['scanner']['scratchdir'], source['organisation'])
gpath = os.path.join(rootpath, rid)
if source['steps']['sync']['good'] and os.path.exists(gpath):
acquire(KibbleBit, source)
branch = find_branch(date, gpath)
if not branch:
release(source, "Could not do evolutionary scan of code",
"No default branch was found in this repository")
return
branch_exists = check_branch(gpath, date, branch)
if not branch_exists:
KibbleBit.pprint("Not trunk either (bad repo?), skipping")
release(source, "Could not do evolutionary scan of code",
"No default branch was found in this repository")
return
try:
d = time.gmtime(now)
year = d[0]
quarter = d[1] - (d[1] % 3)
if quarter <= 0:
quarter += 12
year -= 1
while now > ts:
pd = datetime.datetime(year, quarter, 1).replace(tzinfo=datetime.timezone.utc).timetuple()
date = time.strftime("%Y-%b-%d 0:00", pd)
unix = time.mktime(pd)
# Skip the dates we've already processed
dhash = hashlib.sha224((source['sourceID'] + date).encode('ascii',
'replace')).hexdigest()
found = KibbleBit.exists('evolution', dhash)
if not found:
checkout(gname, date, branch)
KibbleBit.pprint("Running cloc on %s (%s) at %s" % (gname, source['sourceURL'], date))
languages, codecount, comment, blank, years, cost = plugins.utils.sloc.count(gpath)
js = {
'time': unix,
'sourceID': source['sourceID'],
'sourceURL': source['sourceURL'],
'organisation': source['organisation'],
'loc': codecount,
'comments': comment,
'blank': blank,
'years': years,
'cost': cost,
'languages': languages
}
KibbleBit.index('evolution', dhash, js)
quarter -= 3
if quarter <= 0:
quarter += 12
year -= 1
# decrease month by 3
now = time.mktime(datetime.date(year, quarter, 1).timetuple())
except Exception as e:
KibbleBit.pprint(e)
release(KibbleBit, source, "Evolution scan failed at " +
time.strftime("%a, %d %b %Y %H:%M:%S +0000", time.gmtime()),
str(e))
return
release(KibbleBit, source, "Evolution scan completed at " +
time.strftime("%a, %d %b %Y %H:%M:%S +0000", time.gmtime()),
good=True)