# Licensed to the Apache Software Foundation (ASF) under one
# or more contributor license agreements. See the NOTICE file
# distributed with this work for additional information
# regarding copyright ownership. The ASF licenses this file
# to you under the Apache License, Version 2.0 (the
# "License"); you may not use this file except in compliance
# with the License. You may obtain a copy of the License at
# Unless required by applicable law or agreed to in writing,
# software distributed under the License is distributed on an
# KIND, either express or implied. See the License for the
# specific language governing permissions and limitations
# under the License.
import copy
import multiprocessing
import os
import sys
from .compat import path_join
from .util import merge_dict, domain_socket_path
class TestProgram(object):
def __init__(self, kind, name, protocol, transport, socket, workdir, stop_signal, command, env=None,
extra_args=[], extra_args2=[], join_args=False, **kwargs):
self.kind = kind = name
self.protocol = protocol
self.transport = transport
self.socket = socket
self.workdir = workdir
self.stop_signal = stop_signal
self.command = None
self._base_command = self._fix_cmd_path(command)
if env:
self.env = copy.copy(os.environ)
self.env = os.environ
self._extra_args = extra_args
self._extra_args2 = extra_args2
self._join_args = join_args
def _fix_cmd_path(self, cmd):
# if the arg is a file in the current directory, make it path
def abs_if_exists(arg):
p = path_join(self.workdir, arg)
return p if os.path.exists(p) else arg
if cmd[0] == 'python':
cmd[0] = sys.executable
cmd[0] = abs_if_exists(cmd[0])
return cmd
def _socket_args(self, socket, port):
return {
'ip-ssl': ['--ssl'],
'domain': ['--domain-socket=%s' % domain_socket_path(port)],
'abstract': ['--abstract-namespace', '--domain-socket=%s' % domain_socket_path(port)],
}.get(socket, None)
def _transport_args(self, transport):
return {
'zlib': ['--zlib'],
}.get(transport, None)
def build_command(self, port):
cmd = copy.copy(self._base_command)
args = copy.copy(self._extra_args2)
args.append('--protocol=' + self.protocol)
args.append('--transport=' + self.transport)
transport_args = self._transport_args(self.transport)
if transport_args:
args += transport_args
socket_args = self._socket_args(self.socket, port)
if socket_args:
args += socket_args
args.append('--port=%d' % port)
if self._join_args:
cmd.append('%s' % " ".join(args))
if self._extra_args:
self.command = cmd
return self.command
class TestEntry(object):
def __init__(self, testdir, server, client, delay, timeout, **kwargs):
self.testdir = testdir
self._log = multiprocessing.get_logger()
self._config = kwargs
self.protocol = kwargs['protocol']
self.transport = kwargs['transport']
self.socket = kwargs['socket']
srv_dict = self._fix_workdir(merge_dict(self._config, server))
cli_dict = self._fix_workdir(merge_dict(self._config, client))
cli_dict['extra_args2'] = srv_dict.pop('remote_args', [])
srv_dict['extra_args2'] = cli_dict.pop('remote_args', [])
self.server = TestProgram('server', **srv_dict)
self.client = TestProgram('client', **cli_dict)
self.delay = delay
self.timeout = timeout
self._name = None
# results
self.success = None
self.as_expected = None
self.returncode = None
self.expired = False
self.retry_count = 0
def _fix_workdir(self, config):
key = 'workdir'
path = config.get(key, None)
if not path:
path = self.testdir
if os.path.isabs(path):
path = os.path.realpath(path)
path = os.path.realpath(path_join(self.testdir, path))
config.update({key: path})
return config
def get_name(cls, server, client, protocol, transport, socket, *args, **kwargs):
return '%s-%s_%s_%s-%s' % (server, client, protocol, transport, socket)
def name(self):
if not self._name:
self._name = self.get_name(,, self.protocol, self.transport, self.socket)
return self._name
def transport_name(self):
return '%s-%s' % (self.transport, self.socket)
def test_name(server, client, protocol, transport, socket, **kwargs):
return TestEntry.get_name(server['name'], client['name'], protocol, transport, socket)