blob: ff098fd79941d39ac63da13e3130864cb5bc25a1 [file] [log] [blame]
# Licensed to the Apache Software Foundation (ASF) under one or more
# contributor license agreements. See the NOTICE file distributed with
# this work for additional information regarding copyright ownership.
# The ASF licenses this file to You under the Apache License, Version 2.0
# (the "License"); you may not use this file except in compliance with
# the License. You may obtain a copy of the License at
#
# http://www.apache.org/licenses/LICENSE-2.0
#
# Unless required by applicable law or agreed to in writing, software
# distributed under the License is distributed on an "AS IS" BASIS,
# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
# See the License for the specific language governing permissions and
# limitations under the License.
import os
import psutil
import signal
import subprocess
import time
import pytest
import pyignite_dbapi
server_host = os.getenv("IGNITE_CLUSTER_HOST", '127.0.0.1')
server_addresses_invalid = [server_host + ':10000']
server_addresses_basic = [server_host + ':10942', server_host + ':10943']
server_addresses_ssl_basic = [server_host + ':10944']
server_addresses_ssl_client_auth = [server_host + ':10945']
server_addresses_all = server_addresses_basic + server_addresses_ssl_basic + server_addresses_ssl_client_auth
def wait_for_condition(condition, interval=0.1, timeout=10, error=None):
start = time.time()
res = condition()
while not res and time.time() - start < timeout:
time.sleep(interval)
res = condition()
if res:
return True
if error is not None:
raise Exception(error)
return False
def is_windows():
return os.name == "nt"
def get_test_dir():
return os.path.dirname(os.path.realpath(__file__))
def get_proj_dir():
return os.path.abspath(os.path.join(get_test_dir(), "..", "..", "..", ".."))
def get_ignite_dirs():
ignite_home = os.getenv("IGNITE_HOME")
if ignite_home is not None:
yield ignite_home
yield get_proj_dir()
def get_ignite_runner():
ext = ".bat" if is_windows() else ""
for ignite_dir in get_ignite_dirs():
runner = os.path.join(ignite_dir, "gradlew" + ext)
print("Probing Ignite runner at '{0}'...".format(runner))
if os.path.exists(runner):
return runner
raise Exception("Ignite not found. Please make sure your IGNITE_HOME environment variable points to directory with "
"a valid Ignite instance")
def kill_process_tree(pid):
if is_windows():
subprocess.call(['taskkill', '/F', '/T', '/PID', str(pid)])
else:
children = psutil.Process(pid).children(recursive=True)
for child in children:
os.kill(child.pid, signal.SIGKILL)
os.kill(pid, signal.SIGKILL)
# noinspection PyBroadException
def check_server_started(addr: str) -> bool:
try:
conn = pyignite_dbapi.connect(address=[addr], timeout=1)
except pyignite_dbapi.Error:
return False
conn.close()
return True
def check_cluster_started() -> bool:
for addr in server_addresses_basic:
if not check_server_started(addr):
return False
return True
def start_cluster(debug=False, jvm_opts='') -> subprocess.Popen:
runner = get_ignite_runner()
env = os.environ.copy()
env['JVM_OPTS'] = env.get('JVM_OPTS', '') + jvm_opts
if debug:
env['JVM_OPTS'] = env.get('JVM_OPTS', '') + \
'-Djava.net.preferIPv4Stack=true -Xdebug -Xnoagent -Djava.compiler=NONE ' \
'-agentlib:jdwp=transport=dt_socket,server=y,suspend=n,address=5005 '
ignite_cmd = [runner, ':ignite-runner:runnerPlatformTest', '--no-daemon', '-x', 'compileJava', '-x',
'compileTestFixturesJava', '-x', 'compileIntegrationTestJava', '-x', 'compileTestJava']
print('Starting Ignite runner:', ignite_cmd)
ignite_dir = next(get_ignite_dirs())
if ignite_dir is None:
raise Exception('Can not resolve an Ignite project directory')
cluster = subprocess.Popen(ignite_cmd, env=env, cwd=ignite_dir)
for addr in server_addresses_basic:
started = wait_for_condition(lambda: check_server_started(addr), timeout=180)
if not started:
kill_process_tree(cluster.pid)
raise Exception('Failed to start Ignite Cluster: timeout while trying to connect')
return cluster
def start_cluster_gen(debug=False):
srv = start_cluster(debug=debug)
try:
yield srv
finally:
kill_process_tree(srv.pid)
def get_row(index):
return index, f'Value-{index * 2}', index / 2.0
def row_generator(begin, rows_num):
for i in range(begin, begin + rows_num):
yield get_row(i)
def create_and_populate_test_table(cursor, rows_num, table_name, batch_size=1):
cursor.execute(f'drop table if exists {table_name}')
cursor.execute(f'create table {table_name}(id int primary key, data varchar, fl double)')
if batch_size == 1:
for i in range(rows_num):
cursor.execute(f"insert into {table_name} values (?, ?, ?)", params=get_row(i))
else:
batch = 0
for batch in range(rows_num // batch_size):
cursor.executemany(f"insert into {table_name} values(?, ?, ?)",
list(row_generator(batch * batch_size, batch_size)))
if rows_num % batch_size:
cursor.executemany(f"insert into {table_name} values(?, ?, ?)",
list(row_generator(batch * batch_size, rows_num % batch_size)))
def check_row(i, row):
assert len(row) == 3
assert row[0] == i
assert row[1] == f'Value-{i * 2}'
assert row[2] == pytest.approx(i / 2.0)