blob: 168ae0d0073bdb989c549e2f4cac5d519ddb3b44 [file]
#!/usr/bin/env python3
#
# Licensed to the Apache Software Foundation (ASF) under one
# or more contributor license agreements. See the NOTICE file
# distributed with this work for additional information
# regarding copyright ownership. The ASF licenses this file
# to you under the Apache License, Version 2.0 (the
# "License"); you may not use this file except in compliance
# with the License. You may obtain a copy of the License at
#
# http://www.apache.org/licenses/LICENSE-2.0
#
# Unless required by applicable law or agreed to in writing,
# software distributed under the License is distributed on an
# "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
# KIND, either express or implied. See the License for the
# specific language governing permissions and limitations
# under the License.
#
import shutil
import time
from unittest import SkipTest, TestCase, main
from urllib.error import URLError
from urllib.request import Request, urlopen
import pulsar
from pulsar import AutoClusterFailover, Client, MessageId, ServiceInfo
try:
from testcontainers.core.container import DockerContainer
except ImportError:
DockerContainer = None
class AutoClusterFailoverDockerTest(TestCase):
PRIMARY_URL = "pulsar://localhost:16650"
PRIMARY_ADMIN_URL = "http://localhost:18080"
SECONDARY_URL = "pulsar://localhost:26650"
SECONDARY_ADMIN_URL = "http://localhost:28080"
RECEIVE_TIMEOUT_MS = 10000
FAILOVER_WAIT_SECONDS = 30
@classmethod
def setUpClass(cls):
if shutil.which("docker") is None:
raise SkipTest("docker is required for auto_cluster_failover_test")
if DockerContainer is None:
raise SkipTest("testcontainers is required for auto_cluster_failover_test")
try:
cls.primary_container = cls._create_container(
service_port=16650,
admin_port=18080,
cluster_name="standalone-0",
)
cls.secondary_container = cls._create_container(
service_port=26650,
admin_port=28080,
cluster_name="standalone-1",
)
cls.primary_container.start()
cls.secondary_container.start()
cls._wait_for_http(cls.PRIMARY_ADMIN_URL + "/metrics")
cls._wait_for_http(cls.SECONDARY_ADMIN_URL + "/metrics")
cls._configure_cluster(
cls.PRIMARY_ADMIN_URL,
cls.PRIMARY_URL,
"standalone-0",
)
cls._configure_cluster(
cls.SECONDARY_ADMIN_URL,
cls.SECONDARY_URL,
"standalone-1",
)
except Exception:
cls._print_container_logs()
raise
@classmethod
def tearDownClass(cls):
for container in (
getattr(cls, "primary_container", None),
getattr(cls, "secondary_container", None),
):
if container is not None:
container.stop()
@classmethod
def _create_container(cls, service_port, admin_port, cluster_name):
return (
DockerContainer("apachepulsar/pulsar:latest")
.with_env("clusterName", cluster_name)
.with_env("advertisedAddress", "localhost")
.with_env("advertisedListeners", f"external:pulsar://localhost:{service_port}")
.with_env("PULSAR_MEM", "-Xms512m -Xmx512m -XX:MaxDirectMemorySize=256m")
.with_bind_ports(6650, service_port)
.with_bind_ports(8080, admin_port)
.with_command(
'bash -c "bin/apply-config-from-env.py conf/standalone.conf && '
'exec bin/pulsar standalone -nss -nfw"'
)
)
@classmethod
def _print_container_logs(cls):
for container in (
getattr(cls, "primary_container", None),
getattr(cls, "secondary_container", None),
):
if container is None:
continue
wrapped = container.get_wrapped_container()
try:
print(wrapped.logs().decode("utf-8", errors="replace"))
except Exception:
pass
@classmethod
def _wait_for_http(cls, url, timeout_seconds=180):
deadline = time.time() + timeout_seconds
last_error = None
while time.time() < deadline:
try:
with urlopen(url, timeout=5) as response:
if response.status == 200:
return
except (URLError, OSError) as e:
last_error = e
time.sleep(1)
raise AssertionError(f"Timed out waiting for {url}: {last_error}")
@staticmethod
def _http_put(url, data):
request = Request(url, data.encode("utf-8"))
request.add_header("Content-Type", "application/json")
request.get_method = lambda: "PUT"
try:
with urlopen(request, timeout=10):
return
except URLError as e:
if "409" in str(e):
return
raise
@classmethod
def _configure_cluster(cls, admin_url, service_url, cluster_name):
cls._http_put(
f"{admin_url}/admin/v2/clusters/{cluster_name}",
"""
{
"serviceUrl": "%s/",
"brokerServiceUrl": "%s/"
}
""" % (admin_url, service_url),
)
cls._http_put(
f"{admin_url}/admin/v2/tenants/public",
"""
{
"adminRoles": ["anonymous"],
"allowedClusters": ["%s"]
}
""" % cluster_name,
)
cls._http_put(
f"{admin_url}/admin/v2/namespaces/public/default",
"""
{
"replication_clusters": ["%s"]
}
""" % cluster_name,
)
@staticmethod
def _wait_until(predicate, timeout_seconds, description):
deadline = time.time() + timeout_seconds
while time.time() < deadline:
if predicate():
return
time.sleep(0.2)
raise AssertionError(f"Timed out waiting for {description}")
@staticmethod
def _ensure_topic_exists(service_url, topic):
client = Client(service_url)
producer = client.create_producer(topic)
producer.close()
client.close()
def test_producer_failover_between_two_standalones(self):
topic = "test-auto-cluster-failover-%d" % int(time.time() * 1000)
message_before_failover = b"before-failover"
message_after_failover = b"after-failover"
self._ensure_topic_exists(self.PRIMARY_URL, topic)
self._ensure_topic_exists(self.SECONDARY_URL, topic)
primary_client = Client(self.PRIMARY_URL)
primary_reader = primary_client.create_reader(topic, MessageId.earliest)
secondary_client = Client(self.SECONDARY_URL)
secondary_reader = secondary_client.create_reader(topic, MessageId.earliest)
failover_client = Client(
AutoClusterFailover(
ServiceInfo(self.PRIMARY_URL),
[ServiceInfo(self.SECONDARY_URL)],
check_interval_ms=200,
failover_threshold=1,
switch_back_threshold=1,
),
operation_timeout_seconds=10,
)
producer = failover_client.create_producer(
topic,
send_timeout_millis=3000,
batching_enabled=False,
)
self.assertEqual(failover_client.get_service_info().service_url, self.PRIMARY_URL)
producer.send(message_before_failover)
self.assertEqual(primary_reader.read_next(self.RECEIVE_TIMEOUT_MS).data(), message_before_failover)
primary_reader.close()
primary_client.close()
self.primary_container.get_wrapped_container().kill(signal="SIGTERM")
self._wait_until(
lambda: failover_client.get_service_info().service_url == self.SECONDARY_URL,
self.FAILOVER_WAIT_SECONDS,
"client service info to switch to the secondary broker",
)
last_error = None
deadline = time.time() + self.FAILOVER_WAIT_SECONDS
while time.time() < deadline:
try:
producer.send(message_after_failover)
break
except pulsar.PulsarException as e:
last_error = e
time.sleep(0.5)
else:
raise AssertionError(f"Producer did not recover after failover: {last_error}")
self.assertEqual(secondary_reader.read_next(self.RECEIVE_TIMEOUT_MS).data(), message_after_failover)
self.assertEqual(failover_client.get_service_info().service_url, self.SECONDARY_URL)
producer.close()
failover_client.close()
secondary_reader.close()
secondary_client.close()
if __name__ == "__main__":
main()