blob: 1d6acd6ed0988195a602c4a2acafb191bc1612e9 [file] [log] [blame]
# Licensed to the Apache Software Foundation (ASF) under one or more
# contributor license agreements. See the NOTICE file distributed with
# this work for additional information regarding copyright ownership.
# The ASF licenses this file to You under the Apache License, Version 2.0
# (the "License"); you may not use this file except in compliance with
# the License. You may obtain a copy of the License at
#
# http://www.apache.org/licenses/LICENSE-2.0
#
# Unless required by applicable law or agreed to in writing, software
# distributed under the License is distributed on an "AS IS" BASIS,
# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
# See the License for the specific language governing permissions and
# limitations under the License.
import glob
import os
import psutil
import re
import signal
import subprocess
import time
def wait_for_condition(condition, interval=0.1, timeout=10, error=None):
start = time.time()
res = condition()
while not res and time.time() - start < timeout:
time.sleep(interval)
res = condition()
if res:
return True
if error is not None:
raise Exception(error)
return False
def is_windows():
return os.name == "nt"
def get_test_dir():
return os.path.dirname(os.path.realpath(__file__))
def get_ignite_dirs():
ignite_home = os.getenv("IGNITE_HOME")
if ignite_home is not None:
yield ignite_home
proj_dir = os.path.abspath(os.path.join(get_test_dir(), "..", ".."))
yield os.path.join(proj_dir, "ignite")
yield os.path.join(proj_dir, "incubator_ignite")
def get_ignite_runner():
ext = ".bat" if is_windows() else ".sh"
for ignite_dir in get_ignite_dirs():
runner = os.path.join(ignite_dir, "bin", "ignite" + ext)
print("Probing Ignite runner at '{0}'...".format(runner))
if os.path.exists(runner):
return runner
raise Exception(f"Ignite not found. IGNITE_HOME {os.getenv('IGNITE_HOME')}")
def get_ignite_config_path(use_ssl=False):
if use_ssl:
file_name = "ignite-config-ssl.xml"
else:
file_name = "ignite-config.xml"
return os.path.join(get_test_dir(), "config", file_name)
def check_server_started(idx=1):
log_file = os.path.join(get_test_dir(), "logs", f"ignite-log-{idx}.txt")
if not os.path.exists(log_file):
return False
pattern = re.compile('^Topology snapshot.*')
with open(log_file) as f:
for line in f.readlines():
if pattern.match(line):
return True
return False
def kill_process_tree(pid):
if is_windows():
subprocess.call(['taskkill', '/F', '/T', '/PID', str(pid)])
else:
children = psutil.Process(pid).children(recursive=True)
for child in children:
os.kill(child.pid, signal.SIGKILL)
os.kill(pid, signal.SIGKILL)
def _start_ignite(idx=1, debug=False, use_ssl=False):
clear_logs(idx)
runner = get_ignite_runner()
env = os.environ.copy()
env['IGNITE_INSTANCE_INDEX'] = str(idx)
env['IGNITE_CLIENT_PORT'] = str(10800 + idx)
if debug:
env["JVM_OPTS"] = "-Djava.net.preferIPv4Stack=true -Xdebug -Xnoagent -Djava.compiler=NONE " \
"-agentlib:jdwp=transport=dt_socket,server=y,suspend=n,address=5005 "
ignite_cmd = [runner, get_ignite_config_path(use_ssl)]
print("Starting Ignite server node:", ignite_cmd)
srv = subprocess.Popen(ignite_cmd, env=env, cwd=get_test_dir())
started = wait_for_condition(lambda: check_server_started(idx), timeout=30)
if started:
return srv
kill_process_tree(srv.pid)
raise Exception("Failed to start Ignite: timeout while trying to connect")
def start_ignite_gen(idx=1, use_ssl=False):
srv = _start_ignite(idx, use_ssl=use_ssl)
yield srv
kill_process_tree(srv.pid)
def get_log_files(idx=1):
logs_pattern = os.path.join(get_test_dir(), "logs", "ignite-log-{0}*.txt".format(idx))
return glob.glob(logs_pattern)
def clear_logs(idx=1):
for f in get_log_files(idx):
os.remove(f)
def read_log_file(file, idx):
i = -1
with open(file) as f:
lines = f.readlines()
for line in lines:
i += 1
if i < read_log_file.last_line[idx]:
continue
if i > read_log_file.last_line[idx]:
read_log_file.last_line[idx] = i
# Example: Client request received [reqId=1, addr=/127.0.0.1:51694,
# req=org.apache.ignite.internal.processors.platform.client.cache.ClientCachePutRequest@1f33101e]
res = re.match("Client request received .*?req=org.apache.ignite.internal.processors."
"platform.client.cache.ClientCache([a-zA-Z]+)Request@", line)
if res is not None:
yield res.group(1)
def get_request_grid_idx(message="Get"):
res = -1
for i in range(1, 5):
for log_file in get_log_files(i):
for log in read_log_file(log_file, i):
if log == message:
res = i # Do not exit early to advance all log positions
return res
read_log_file.last_line = [0, 0, 0, 0, 0]