blob: 0a969e8462f0eb88ae6ceeae8c6068ce2bd6b4e5 [file] [log] [blame]
# Licensed to the Apache Software Foundation (ASF) under one
# or more contributor license agreements. See the NOTICE file
# distributed with this work for additional information
# regarding copyright ownership. The ASF licenses this file
# to you under the Apache License, Version 2.0 (the
# "License"); you may not use this file except in compliance
# with the License. You may obtain a copy of the License at
#
# http://www.apache.org/licenses/LICENSE-2.0
#
# Unless required by applicable law or agreed to in writing,
# software distributed under the License is distributed on an
# "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
# KIND, either express or implied. See the License for the
# specific language governing permissions and limitations
# under the License.
"""Shared fixtures for Fluss Python integration tests.
If FLUSS_BOOTSTRAP_SERVERS is set, tests connect to an existing cluster.
Otherwise, a Fluss cluster is started automatically via testcontainers.
Run with:
uv run maturin develop && uv run pytest test/ -v
"""
import os
import socket
import time
import pytest
import pytest_asyncio
import fluss
FLUSS_IMAGE = "apache/fluss"
FLUSS_VERSION = "0.8.0-incubating"
BOOTSTRAP_SERVERS_ENV = os.environ.get("FLUSS_BOOTSTRAP_SERVERS")
def _wait_for_port(host, port, timeout=60):
"""Wait for a TCP port to become available."""
start = time.time()
while time.time() - start < timeout:
try:
with socket.create_connection((host, port), timeout=1):
return
except (ConnectionRefusedError, TimeoutError, OSError):
time.sleep(1)
raise TimeoutError(f"Port {port} on {host} not available after {timeout}s")
@pytest.fixture(scope="session")
def fluss_cluster():
"""Start a Fluss cluster using testcontainers, or use an existing one."""
if BOOTSTRAP_SERVERS_ENV:
yield (BOOTSTRAP_SERVERS_ENV, BOOTSTRAP_SERVERS_ENV)
return
from testcontainers.core.container import DockerContainer
from testcontainers.core.network import Network
network = Network()
network.create()
zookeeper = (
DockerContainer("zookeeper:3.9.2")
.with_network(network)
.with_name("zookeeper-python-test")
)
sasl_jaas = (
"org.apache.fluss.security.auth.sasl.plain.PlainLoginModule required"
' user_admin="admin-secret" user_alice="alice-secret";'
)
coordinator_props = "\n".join([
"zookeeper.address: zookeeper-python-test:2181",
"bind.listeners: INTERNAL://coordinator-server-python-test:0,"
" CLIENT://coordinator-server-python-test:9123,"
" PLAIN_CLIENT://coordinator-server-python-test:9223",
"advertised.listeners: CLIENT://localhost:9123,"
" PLAIN_CLIENT://localhost:9223",
"internal.listener.name: INTERNAL",
"security.protocol.map: CLIENT:sasl",
"security.sasl.enabled.mechanisms: plain",
f"security.sasl.plain.jaas.config: {sasl_jaas}",
"netty.server.num-network-threads: 1",
"netty.server.num-worker-threads: 3",
])
coordinator = (
DockerContainer(f"{FLUSS_IMAGE}:{FLUSS_VERSION}")
.with_network(network)
.with_name("coordinator-server-python-test")
.with_bind_ports(9123, 9123)
.with_bind_ports(9223, 9223)
.with_command("coordinatorServer")
.with_env("FLUSS_PROPERTIES", coordinator_props)
)
tablet_props = "\n".join([
"zookeeper.address: zookeeper-python-test:2181",
"bind.listeners: INTERNAL://tablet-server-python-test:0,"
" CLIENT://tablet-server-python-test:9123,"
" PLAIN_CLIENT://tablet-server-python-test:9223",
"advertised.listeners: CLIENT://localhost:9124,"
" PLAIN_CLIENT://localhost:9224",
"internal.listener.name: INTERNAL",
"security.protocol.map: CLIENT:sasl",
"security.sasl.enabled.mechanisms: plain",
f"security.sasl.plain.jaas.config: {sasl_jaas}",
"tablet-server.id: 0",
"netty.server.num-network-threads: 1",
"netty.server.num-worker-threads: 3",
])
tablet_server = (
DockerContainer(f"{FLUSS_IMAGE}:{FLUSS_VERSION}")
.with_network(network)
.with_name("tablet-server-python-test")
.with_bind_ports(9123, 9124)
.with_bind_ports(9223, 9224)
.with_command("tabletServer")
.with_env("FLUSS_PROPERTIES", tablet_props)
)
zookeeper.start()
coordinator.start()
tablet_server.start()
_wait_for_port("localhost", 9123)
_wait_for_port("localhost", 9124)
_wait_for_port("localhost", 9223)
_wait_for_port("localhost", 9224)
# Extra wait for cluster to fully initialize
time.sleep(10)
# (plaintext_bootstrap, sasl_bootstrap)
yield ("127.0.0.1:9223", "127.0.0.1:9123")
tablet_server.stop()
coordinator.stop()
zookeeper.stop()
network.remove()
@pytest_asyncio.fixture(scope="session")
async def connection(fluss_cluster):
"""Session-scoped connection to the Fluss cluster (plaintext)."""
plaintext_addr, _sasl_addr = fluss_cluster
config = fluss.Config({"bootstrap.servers": plaintext_addr})
conn = await fluss.FlussConnection.create(config)
yield conn
conn.close()
@pytest.fixture(scope="session")
def sasl_bootstrap_servers(fluss_cluster):
"""Bootstrap servers for the SASL listener."""
_plaintext_addr, sasl_addr = fluss_cluster
return sasl_addr
@pytest.fixture(scope="session")
def plaintext_bootstrap_servers(fluss_cluster):
"""Bootstrap servers for the plaintext (non-SASL) listener."""
plaintext_addr, _sasl_addr = fluss_cluster
return plaintext_addr
@pytest_asyncio.fixture(scope="session")
async def admin(connection):
"""Session-scoped admin client."""
return await connection.get_admin()