blob: 6101a09624fc4bd4c0b4632923d1671ddc2e49bc [file] [log] [blame]
# Licensed to the Apache Software Foundation (ASF) under one or more
# contributor license agreements. See the NOTICE file distributed with
# this work for additional information regarding copyright ownership.
# The ASF licenses this file to You under the Apache License, Version 2.0
# (the "License"); you may not use this file except in compliance with
# the License. You may obtain a copy of the License at
#
# http://www.apache.org/licenses/LICENSE-2.0
#
# Unless required by applicable law or agreed to in writing, software
# distributed under the License is distributed on an "AS IS" BASIS,
# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
# See the License for the specific language governing permissions and
# limitations under the License.
import asyncio
import contextlib
import glob
import inspect
import os
import shutil
import jinja2 as jinja2
import psutil
import re
import signal
import subprocess
import time
@contextlib.contextmanager
def get_or_create_cache(client, settings):
cache = client.get_or_create_cache(settings)
try:
yield cache
finally:
cache.destroy()
@contextlib.asynccontextmanager
async def get_or_create_cache_async(client, settings):
cache = await client.get_or_create_cache(settings)
try:
yield cache
finally:
await cache.destroy()
def wait_for_condition(condition, interval=0.1, timeout=10, error=None):
start = time.time()
res = condition()
while not res and time.time() - start < timeout:
time.sleep(interval)
res = condition()
if res:
return True
if error is not None:
raise Exception(error)
return False
async def wait_for_condition_async(condition, interval=0.1, timeout=10, error=None):
start = time.time()
res = await condition() if inspect.iscoroutinefunction(condition) else condition()
while not res and time.time() - start < timeout:
await asyncio.sleep(interval)
res = await condition() if inspect.iscoroutinefunction(condition) else condition()
if res:
return True
if error is not None:
raise Exception(error)
return False
def is_windows():
return os.name == "nt"
def get_test_dir():
return os.path.dirname(os.path.realpath(__file__))
def get_ignite_dirs():
ignite_home = os.getenv("IGNITE_HOME")
if ignite_home is not None:
yield ignite_home
proj_dir = os.path.abspath(os.path.join(get_test_dir(), "..", ".."))
yield os.path.join(proj_dir, "ignite")
yield os.path.join(proj_dir, "incubator_ignite")
def get_ignite_runner():
ext = ".bat" if is_windows() else ".sh"
for ignite_dir in get_ignite_dirs():
runner = os.path.join(ignite_dir, "bin", "ignite" + ext)
print("Probing Ignite runner at '{0}'...".format(runner))
if os.path.exists(runner):
return runner
raise Exception(f"Ignite not found. IGNITE_HOME {os.getenv('IGNITE_HOME')}")
def check_server_started(idx=1):
pattern = re.compile('^Topology snapshot.*')
for log_file in get_log_files(idx):
with open(log_file) as f:
for line in f.readlines():
if pattern.match(line):
return True
return False
def kill_process_tree(pid):
if is_windows():
subprocess.call(['taskkill', '/F', '/T', '/PID', str(pid)])
else:
children = psutil.Process(pid).children(recursive=True)
for child in children:
os.kill(child.pid, signal.SIGKILL)
os.kill(pid, signal.SIGKILL)
templateLoader = jinja2.FileSystemLoader(searchpath=os.path.join(get_test_dir(), "config"))
templateEnv = jinja2.Environment(loader=templateLoader)
def create_config_file(tpl_name, file_name, **kwargs):
template = templateEnv.get_template(tpl_name)
with open(os.path.join(get_test_dir(), "config", file_name), mode='w') as f:
f.write(template.render(**kwargs))
def start_ignite(idx=1, debug=False, use_ssl=False, use_auth=False, use_persistence=False):
clear_logs(idx)
runner = get_ignite_runner()
env = os.environ.copy()
if debug:
env["JVM_OPTS"] = "-Djava.net.preferIPv4Stack=true -Xdebug -Xnoagent -Djava.compiler=NONE " \
"-agentlib:jdwp=transport=dt_socket,server=y,suspend=n,address=5005 "
if use_auth:
use_persistence = True
params = {
'ignite_instance_idx': str(idx),
'ignite_client_port': 10800 + idx,
'use_ssl': use_ssl,
'use_auth': use_auth,
'use_persistence': use_persistence,
}
create_config_file('log4j.xml.jinja2', f'log4j-{idx}.xml', **params)
create_config_file('ignite-config.xml.jinja2', f'ignite-config-{idx}.xml', **params)
ignite_cmd = [runner, os.path.join(get_test_dir(), "config", f'ignite-config-{idx}.xml')]
print("Starting Ignite server node:", ignite_cmd)
srv = subprocess.Popen(ignite_cmd, env=env, cwd=get_test_dir())
started = wait_for_condition(lambda: check_server_started(idx), timeout=60)
if started:
return srv
kill_process_tree(srv.pid)
raise Exception("Failed to start Ignite: timeout while trying to connect")
def start_ignite_gen(idx=1, use_ssl=False, use_auth=False, use_persistence=False):
srv = start_ignite(idx, use_ssl=use_ssl, use_auth=use_auth, use_persistence=use_persistence)
try:
yield srv
finally:
kill_process_tree(srv.pid)
def get_log_files(idx=1):
logs_pattern = os.path.join(get_test_dir(), "logs", "ignite-log-{0}*.txt".format(idx))
return glob.glob(logs_pattern)
def clear_ignite_work_dir():
for ignite_dir in get_ignite_dirs():
work_dir = os.path.join(ignite_dir, 'work')
if os.path.exists(work_dir):
shutil.rmtree(work_dir, ignore_errors=True)
def clear_logs(idx=1):
for f in get_log_files(idx):
os.remove(f)