blob: dff7623b246ad51ee1d3ef03a9575fe698424d08 [file] [log] [blame]
import os
import re
import time
from datetime import datetime, timedelta
from io import SEEK_END
from typing import List, Tuple, Any
class HttpdErrorLog:
"""Checking the httpd error log for errors and warnings, including
limiting checks from a last known position forward.
"""
RE_ERRLOG_ERROR = re.compile(r'.*\[(?P<module>[^:]+):error].*')
RE_ERRLOG_WARN = re.compile(r'.*\[(?P<module>[^:]+):warn].*')
RE_APLOGNO = re.compile(r'.*\[(?P<module>[^:]+):(error|warn)].* (?P<aplogno>AH\d+): .+')
RE_SSL_LIB_ERR = re.compile(r'.*\[ssl:error].* SSL Library Error: error:(?P<errno>\S+):.+')
def __init__(self, path: str):
self._path = path
self._ignored_modules = []
self._ignored_lognos = set()
self._ignored_patterns = []
# remember the file position we started with
self._start_pos = 0
if os.path.isfile(self._path):
with open(self._path) as fd:
self._start_pos = fd.seek(0, SEEK_END)
self._last_pos = self._start_pos
self._last_errors = []
self._last_warnings = []
self._observed_erros = set()
self._observed_warnings = set()
def __repr__(self):
return f"HttpdErrorLog[{self._path}, errors: {' '.join(self._last_errors)}, " \
f"warnings: {' '.join(self._last_warnings)}]"
@property
def path(self) -> str:
return self._path
def clear_log(self):
if os.path.isfile(self.path):
os.remove(self.path)
self._start_pos = 0
self._last_pos = self._start_pos
self._last_errors = []
self._last_warnings = []
self._observed_erros = set()
self._observed_warnings = set()
def set_ignored_modules(self, modules: List[str]):
self._ignored_modules = modules.copy() if modules else []
def set_ignored_lognos(self, lognos: List[str]):
if lognos:
for l in lognos:
self._ignored_lognos.add(l)
def add_ignored_patterns(self, patterns: List[Any]):
self._ignored_patterns.extend(patterns)
def _is_ignored(self, line: str) -> bool:
for p in self._ignored_patterns:
if p.match(line):
return True
m = self.RE_APLOGNO.match(line)
if m and m.group('aplogno') in self._ignored_lognos:
return True
return False
def get_recent(self, advance=True) -> Tuple[List[str], List[str]]:
"""Collect error and warning from the log since the last remembered position
:param advance: advance the position to the end of the log afterwards
:return: list of error and list of warnings as tuple
"""
self._last_errors = []
self._last_warnings = []
if os.path.isfile(self._path):
with open(self._path) as fd:
fd.seek(self._last_pos, os.SEEK_SET)
for line in fd:
if self._is_ignored(line):
continue
m = self.RE_ERRLOG_ERROR.match(line)
if m and m.group('module') not in self._ignored_modules:
self._last_errors.append(line)
continue
m = self.RE_ERRLOG_WARN.match(line)
if m:
if m and m.group('module') not in self._ignored_modules:
self._last_warnings.append(line)
continue
if advance:
self._last_pos = fd.tell()
self._observed_erros.update(set(self._last_errors))
self._observed_warnings.update(set(self._last_warnings))
return self._last_errors, self._last_warnings
def get_recent_count(self, advance=True):
errors, warnings = self.get_recent(advance=advance)
return len(errors), len(warnings)
def ignore_recent(self):
"""After a test case triggered errors/warnings on purpose, add
those to our 'observed' list so the do not get reported as 'missed'.
"""
self._last_errors = []
self._last_warnings = []
if os.path.isfile(self._path):
with open(self._path) as fd:
fd.seek(self._last_pos, os.SEEK_SET)
for line in fd:
if self._is_ignored(line):
continue
m = self.RE_ERRLOG_ERROR.match(line)
if m and m.group('module') not in self._ignored_modules:
self._observed_erros.add(line)
continue
m = self.RE_ERRLOG_WARN.match(line)
if m:
if m and m.group('module') not in self._ignored_modules:
self._observed_warnings.add(line)
continue
self._last_pos = fd.tell()
def get_missed(self) -> Tuple[List[str], List[str]]:
errors = []
warnings = []
if os.path.isfile(self._path):
with open(self._path) as fd:
fd.seek(self._start_pos, os.SEEK_SET)
for line in fd:
if self._is_ignored(line):
continue
m = self.RE_ERRLOG_ERROR.match(line)
if m and m.group('module') not in self._ignored_modules \
and line not in self._observed_erros:
errors.append(line)
continue
m = self.RE_ERRLOG_WARN.match(line)
if m:
if m and m.group('module') not in self._ignored_modules \
and line not in self._observed_warnings:
warnings.append(line)
continue
return errors, warnings
def scan_recent(self, pattern: re, timeout=10):
if not os.path.isfile(self.path):
return False
with open(self.path) as fd:
end = datetime.now() + timedelta(seconds=timeout)
while True:
fd.seek(self._last_pos, os.SEEK_SET)
for line in fd:
if pattern.match(line):
return True
if datetime.now() > end:
raise TimeoutError(f"pattern not found in error log after {timeout} seconds")
time.sleep(.1)
return False