blob: f7bbe71ebc9aff24ed7d962850a42bd09eb03a23 [file] [log] [blame]
# Licensed to the Apache Software Foundation (ASF) under one
# or more contributor license agreements. See the NOTICE file
# distributed with this work for additional information
# regarding copyright ownership. The ASF licenses this file
# to you under the Apache License, Version 2.0 (the
# "License"); you may not use this file except in compliance
# with the License. You may obtain a copy of the License at
# Unless required by applicable law or agreed to in writing,
# software distributed under the License is distributed on an
# KIND, either express or implied. See the License for the
# specific language governing permissions and limitations
# under the License
# Extends the subprocess module to use runtime checkers, and report stderr output.
import os
import re
import subprocess
import tempfile
from subprocess import PIPE
def in_path(name):
"""Look for name in the PATH"""
for path in os.environ["PATH"].split(os.pathsep):
f = os.path.join(path, name)
if os.path.isfile(f) and os.access(f, os.X_OK):
return f
class TestProcessError(Exception):
def __init__(self, proc, what, output=None):
self.output = output
sep = "\n%s stderr(%s) %s\n" % ("_" * 32,, "_" * 32)
error = sep + proc.error + sep if proc.error else ""
super(TestProcessError, self).__init__("%s pid=%s exit=%s: %s%s" % (
proc.cmd,, proc.returncode, what, error))
class Popen(subprocess.Popen):
Add TEST_EXE_PREFIX to the command, check stderr for runtime checker output.
In a 'with' statement it runs check_wait() on exit from the block, or
check_kill() if initialized with kill_me=True
def __init__(self, *args, **kwargs):
Takes all args and kwargs of subprocess.Popen except stdout, stderr, universal_newlines
kill_me=True runs check_kill() in __exit__() instead of check_wait()
self.on_exit = self.check_kill if kwargs.pop('kill_me', False) else self.check_wait
self.errfile = tempfile.NamedTemporaryFile(delete=False)
kwargs.update({'universal_newlines': True, 'stdout': PIPE, 'stderr': self.errfile})
prefix = os.environ.get("TEST_EXE_PREFIX")
if prefix:
args = [prefix.split() + args[0]] + list(args[1:])
self.cmd = args[0]
super(Popen, self).__init__(*args, **kwargs)
def check_wait(self):
if self.wait() or self.error:
raise TestProcessError(self, "check_wait")
def communicate(self, *args, **kwargs):
result = super(Popen, self).communicate(*args, **kwargs)
if self.returncode or self.error:
raise TestProcessError(self, "check_communicate", result[0])
return result
def check_kill(self):
"""Raise if process has already exited, kill and raise if self.error is not empty"""
if self.poll() is None:
self.stdout.close() # Doesn't get closed if killed
if self.error:
raise TestProcessError(self, "check_kill found error output")
raise TestProcessError(self, "check_kill process not running")
def expect(self, pattern):
line = self.stdout.readline()
match =, line)
if not match:
raise TestProcessError(self, "can't find '%s' in '%s'" % (pattern, line))
return match
def error(self):
"""Return stderr as string, may only be used after process has terminated."""
assert(self.poll is not None)
if not hasattr(self, "_error"):
self.errfile.close() # Not auto-deleted
with open( as f: # Re-open to read
self._error =
return self._error
def __enter__(self):
return self
def __exit__(self, *args):
def check_output(*args, **kwargs):
return Popen(*args, **kwargs).communicate()[0]
class Server(Popen):
"""A process that prints 'listening on <port>' to stdout"""
def __init__(self, *args, **kwargs):
super(Server, self).__init__(*args, **kwargs)
self.port = self.expect("listening on ([0-9]+)$").group(1)