blob: a1695bfdc0dfb353ece9510054d009189764841a [file] [log] [blame]
#pylint: disable=no-member, too-many-locals, too-many-branches, no-self-use, broad-except, lost-exception, too-many-nested-blocks, too-few-public-methods
"""
This script runs notebooks in selected directory and report
errors for each notebook.
Traceback information can be found in the output notebooks
generated in coresponding output directories.
Before running this scripe, make sure all the notebooks have
been run at least once and outputs are generated.
"""
import os
import json
import ConfigParser
import re
import sys
from textwrap import dedent
reload(sys)
sys.setdefaultencoding('utf-8')
#pylint: enable=no-member
import nbformat
import nbconvert.preprocessors.execute as execute
TIME_LIMIT_FLAG = '# @@@ AUTOTEST_TIME_LIMT_SECONDS='
IGNORED_CELL_FLAG = '# @@@ AUTOTEST_OUTPUT_IGNORED_CELL'
class CustomizedPreprocessor(execute.ExecutePreprocessor):
"""A customized preprocessor which allows preset for cell.
In this test script, timeout is set before executing a cell.
"""
def preprocess_cell(self, cell, resources, cell_index):
"""
Executes a code cell with timeout. Default timeout is 900 sec.
"""
if cell.cell_type != 'code':
return cell, resources
regex = re.compile(TIME_LIMIT_FLAG + '[0-9]+')
time_flag = re.search(regex, cell.source)
if time_flag is not None:
timeout = int(re.search(r'[0-9]+', time_flag).group())
self.timeout = timeout
outputs = self.run_cell(cell)
cell.outputs = outputs
if not self.allow_errors:
for out in outputs:
if out.output_type == 'error':
pattern = u"""\
An error occurred while executing cell No.{cell.execution_count}:
------------------
{cell.source}
------------------
{out.ename}: {out.evalue}
"""
msg = dedent(pattern).format(out=out, cell=cell)
raise execute.CellExecutionError(msg)
return cell, resources
class NotebookTester(object):
"""The class of notebook automated testing. A NotebookTester loads a test_config
file and execute each notebook. A report containing detail traceback information
will be generated.
"""
def __init__(self, test_config):
self.test_config = test_config
def __read_config(self, test_config):
"""Read notebooks to be tested from test config file.
Parameters
----------
test_config : str
test configuration file
Returns
-------
nb_list : list
Notebook list to be tested
"""
nb_list = []
config_parser = ConfigParser.RawConfigParser()
config_parser.read(test_config)
test_dirs = config_parser.get('Folder Path', 'test_path').split(', ')
if len(test_dirs) == 1 and len(test_dirs[0]) == 0:
test_dirs.append('.')
ignored_item = config_parser.get('Folder Path', 'test_ignored').split(', ')
ignored_dir = set()
ignored_nb = set()
for item in ignored_item:
if item == '@@@ IGNORE_ALL':
return nb_list
if item.endswith('.ipynb'):
ignored_nb.add(os.path.abspath(item))
else:
for root, _, _ in os.walk(item):
ignored_dir.add(os.path.abspath(root))
for test_dir in test_dirs:
for root, _, files in os.walk(test_dir):
if os.path.abspath(root) in ignored_dir:
continue
for test_file in files:
if test_file.endswith('.ipynb') and not \
test_file.endswith('-checkpoint.ipynb'):
notebook = os.path.join(root, test_file)
if os.path.abspath(notebook) not in ignored_nb:
if notebook.startswith('./'):
notebook = notebook[2:]
nb_list.append(notebook)
return nb_list
def __notebook_run(self, path):
"""Execute a notebook via nbconvert and collect output.
Parameters
----------
path : str
notebook file path.
Returns
-------
error : str
notebook first cell execution errors.
"""
error = ""
parent_dir, nb_name = os.path.split(path)
with open(path) as nb_file:
notebook = nbformat.read(nb_file, as_version=4)
eprocessor = CustomizedPreprocessor(timeout=900)
#Use a loop to avoid "Kernel died before replying to kernel_info" error, repeat 5 times
for _ in range(0, 5):
error = ""
try:
eprocessor.preprocess(notebook, {'metadata': {'path': parent_dir}})
except Exception as ex_error:
error = str(ex_error)
finally:
if error != 'Kernel died before replying to kernel_info':
output_nb = os.path.splitext(nb_name)[0] + "_output.ipynb"
with open(output_nb, mode='w') as output_file:
nbformat.write(notebook, output_file)
output_file.close()
nb_file.close()
if len(error) == 0:
cell_num = self.__verify_output(path, output_nb)
if cell_num > 0:
error = "Output in cell No.%d has changed." % cell_num
os.remove(output_nb)
return error
return error
def __verify_output(self, origin_nb, output_nb):
"""Compare the output cells of testing output notebook with original notebook.
Parameters
----------
origin_nb : str
original notebook file path.
output_nb : str
output notebook file path.
Returns
-------
cell_num : int
First cell number in which outputs are incompatible
"""
cell_num = 0
origin_nb_file = open(origin_nb)
origin_nb_js = json.load(origin_nb_file)
output_nb_file = open(output_nb)
output_nb_js = json.load(output_nb_file)
for origin_cell, output_cell in zip(origin_nb_js["cells"], output_nb_js["cells"]):
is_ignored_cell = False
if len(origin_cell["source"]) == 0 or not origin_cell.has_key("outputs"):
is_ignored_cell = True
for line in origin_cell["source"]:
if line.startswith(IGNORED_CELL_FLAG):
is_ignored_cell = True
break
if is_ignored_cell:
continue
if self.__extract_output(origin_cell["outputs"]) != \
self.__extract_output(output_cell["outputs"]):
cell_num = origin_cell["execution_count"]
break
origin_nb_file.close()
output_nb_file.close()
return cell_num
def __extract_output(self, outputs):
"""Extract text part of output of a notebook cell.
Parasmeters
-----------
outputs : list
list of output
Returns
-------
ret : str
Concatenation of all text output contents
"""
ret = ''
for out_dict in outputs:
for key, val in out_dict.items():
if str(key).startswith('text'):
for content in val:
ret += str(content)
elif key == 'data':
for dt_key, dt_val in val.items():
if str(dt_key).startswith('text') and not \
str(dt_key).startswith('text/html'):
for dt_content in dt_val:
if not str(dt_content).startswith('<matplotlib') and not \
str(dt_content).startswith('<graphviz'):
ret += str(dt_content)
return ret
def run_test(self):
"""Run test using config file
"""
nb_to_test = self.__read_config(self.test_config)
test_summary = open('test_summary.txt', mode='w')
fail_nb_dict = {}
test_summary.write("%d notebooks were tested:\n" % len(nb_to_test))
for test_nb in nb_to_test:
test_summary.write("%s\n" % test_nb)
print "Start to test %s.\n" % test_nb
error = self.__notebook_run(test_nb)
if len(error) == 0:
print "Tests for %s all passed!\n" % test_nb
else:
fail_nb_dict[test_nb] = error
print "Tests for %s failed:\n" % test_nb
print error + '\n'
if error == 'Cell execution timed out, see log for details.' or \
error == 'Kernel died before replying to kernel_info':
print "Please manually run this notebook to debug.\n"
print "%d notebooks tested, %d succeeded, %d failed" % (len(nb_to_test),
len(nb_to_test) - len(fail_nb_dict),
len(fail_nb_dict))
if len(fail_nb_dict) > 0:
test_summary.write("\n%d notebook tests failed:\n" % len(fail_nb_dict))
print "Following are failed notebooks:"
for fail_nb, error in fail_nb_dict.items():
test_summary.write("\n%s:\n" % fail_nb)
test_summary.write("%s\n" % error)
print fail_nb
else:
test_summary.write("\nAll notebook tests passed!\n")
test_summary.close()
print "Test summarys are stored in test_summary.txt"
if __name__ == "__main__":
NB_TESTER = NotebookTester('test_config.txt')
NB_TESTER.run_test()