blob: 855af55911a1dbe0f56360300f63489c247826e1 [file] [log] [blame]
#
# Licensed to the Apache Software Foundation (ASF) under one or more
# contributor license agreements. See the NOTICE file distributed with
# this work for additional information regarding copyright ownership.
# The ASF licenses this file to You under the Apache License, Version 2.0
# (the "License"); you may not use this file except in compliance with
# the License. You may obtain a copy of the License at
#
# http://www.apache.org/licenses/LICENSE-2.0
#
# Unless required by applicable law or agreed to in writing, software
# distributed under the License is distributed on an "AS IS" BASIS,
# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
# See the License for the specific language governing permissions and
# limitations under the License.
#
"""Pytest configuration and custom hooks."""
import os
import sys
from types import SimpleNamespace
import pytest
from apache_beam.options import pipeline_options
from apache_beam.testing.test_pipeline import TestPipeline
MAX_SUPPORTED_PYTHON_VERSION = (3, 13)
def pytest_addoption(parser):
parser.addoption(
'--test-pipeline-options',
help='Options to use in test pipelines. NOTE: Tests may '
'ignore some or all of these options.')
# See pytest.ini for main collection rules.
collect_ignore_glob = [
'*_py3%d.py' % minor for minor in range(
sys.version_info.minor + 1, MAX_SUPPORTED_PYTHON_VERSION[1] + 1)
]
@pytest.fixture(scope="session", autouse=True)
def configure_beam_rpc_timeouts():
"""
Configure gRPC and RPC timeouts for Beam tests
to prevent DEADLINE_EXCEEDED errors.
"""
print("\n--- Applying Beam RPC timeout configuration ---")
# Set gRPC keepalive and timeout settings
timeout_env_vars = {
'GRPC_ARG_KEEPALIVE_TIME_MS': '30000',
'GRPC_ARG_KEEPALIVE_TIMEOUT_MS': '5000',
'GRPC_ARG_HTTP2_MAX_PINGS_WITHOUT_DATA': '0',
'GRPC_ARG_KEEPALIVE_PERMIT_WITHOUT_CALLS': '1',
'GRPC_ARG_HTTP2_MIN_RECV_PING_INTERVAL_WITHOUT_DATA_MS': '300000',
'GRPC_ARG_HTTP2_MIN_SENT_PING_INTERVAL_WITHOUT_DATA_MS': '10000',
# Additional stability settings for DinD environment
'GRPC_ARG_MAX_RECONNECT_BACKOFF_MS': '120000',
'GRPC_ARG_INITIAL_RECONNECT_BACKOFF_MS': '1000',
'GRPC_ARG_MAX_CONNECTION_IDLE_MS': '300000',
'GRPC_ARG_MAX_CONNECTION_AGE_MS': '1800000',
# Beam-specific retry and timeout settings
'BEAM_RETRY_MAX_ATTEMPTS': '5',
'BEAM_RETRY_INITIAL_DELAY_MS': '1000',
'BEAM_RETRY_MAX_DELAY_MS': '60000',
'BEAM_RUNNER_BUNDLE_TIMEOUT_MS': '300000',
# Force deterministic execution in DinD environment
'BEAM_TESTING_FORCE_SINGLE_BUNDLE': 'true',
'BEAM_TESTING_DETERMINISTIC_ORDER': 'true',
'BEAM_SDK_WORKER_PARALLELISM': '1',
'BEAM_WORKER_POOL_SIZE': '1',
'BEAM_FN_API_CONTROL_PORT': '0',
'BEAM_FN_API_DATA_PORT': '0',
# Container-specific stability settings
'PYTHONHASHSEED': '0',
'OMP_NUM_THREADS': '1',
'OPENBLAS_NUM_THREADS': '1',
# Force sequential pytest execution (CRITICAL for DinD stability)
'PYTEST_XDIST_WORKER_COUNT': '1',
'PYTEST_CURRENT_TEST_TIMEOUT': '300',
# Mock and test isolation improvements
'PYTEST_MOCK_TIMEOUT': '60',
'BEAM_TEST_ISOLATION_MODE': 'strict',
}
for key, value in timeout_env_vars.items():
os.environ[key] = value
print(f"Set {key}={value}")
print("Successfully configured Beam RPC timeouts")
@pytest.fixture(autouse=True)
def ensure_clean_state():
"""
Ensure clean state before each test
to prevent cross-test contamination.
"""
import gc
import threading
import time
# Force garbage collection to clean up any lingering resources
gc.collect()
# Log active thread count for debugging
thread_count = threading.active_count()
if thread_count > 50: # Increased threshold since we see 104 threads
print(f"Warning: {thread_count} active threads detected before test")
# Force a brief pause to let threads settle
time.sleep(0.5)
gc.collect()
yield
# Enhanced cleanup after test
try:
# Force more aggressive cleanup
gc.collect()
# Brief pause to let any async operations complete
time.sleep(0.1)
# Additional garbage collection
gc.collect()
except Exception as e:
print(f"Warning: Cleanup error: {e}")
@pytest.fixture(autouse=True)
def enhance_mock_stability():
"""Enhance mock stability in DinD environment."""
import time
# Brief pause before test to ensure clean mock state
time.sleep(0.05)
yield
# Brief pause after test to let mocks clean up
time.sleep(0.05)
def pytest_configure(config):
"""Saves options added in pytest_addoption for later use.
This is necessary since pytest-xdist workers do not have the
same sys.argv as the main pytest invocation.
xdist does seem to pickle TestPipeline
"""
# for the entire test session.
print("\n--- Applying global testcontainers timeout configuration ---")
try:
from testcontainers.core import waiting_utils
waiting_utils.config = SimpleNamespace(
timeout=int(os.getenv("TC_TIMEOUT", "120")),
max_tries=int(os.getenv("TC_MAX_TRIES", "120")),
sleep_time=float(os.getenv("TC_SLEEP_TIME", "1")),
)
print("Successfully set waiting utils config")
except ModuleNotFoundError:
print("The testcontainers library is not installed.")
TestPipeline.pytest_test_pipeline_options = config.getoption(
'test_pipeline_options', default='')
# Enable optional type checks on all tests.
pipeline_options.enable_all_additional_type_checks()