blob: 69a65855a0afa5e25883a6114a679dcb9977cd03 [file] [log] [blame]
# Licensed to the Apache Software Foundation (ASF) under one
# or more contributor license agreements. See the NOTICE file
# distributed with this work for additional information
# regarding copyright ownership. The ASF licenses this file
# to you under the Apache License, Version 2.0 (the
# "License"); you may not use this file except in compliance
# with the License. You may obtain a copy of the License at
#
# http://www.apache.org/licenses/LICENSE-2.0
#
# Unless required by applicable law or agreed to in writing, software
# distributed under the License is distributed on an "AS IS" BASIS,
# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
# See the License for the specific language governing permissions and
# limitations under the License.
import unittest
import logging
import socket
import uuid
from unittest.mock import patch, Mock
from cassandra import ConsistencyLevel, DriverException, Timeout, Unavailable, RequestExecutionException, ReadTimeout, WriteTimeout, CoordinationFailure, ReadFailure, WriteFailure, FunctionFailure, AlreadyExists,\
InvalidRequest, Unauthorized, AuthenticationFailed, OperationTimedOut, UnsupportedOperation, RequestValidationException, ConfigurationException, ProtocolVersion
from cassandra.cluster import _Scheduler, Session, Cluster, default_lbp_factory, \
ExecutionProfile, _ConfigMode, EXEC_PROFILE_DEFAULT
from cassandra.connection import SniEndPoint, SniEndPointFactory
from cassandra.pool import Host
from cassandra.policies import HostDistance, RetryPolicy, RoundRobinPolicy, DowngradingConsistencyRetryPolicy, SimpleConvictionPolicy
from cassandra.query import SimpleStatement, named_tuple_factory, tuple_factory
from tests.unit.utils import mock_session_pools
from tests import connection_class
log = logging.getLogger(__name__)
class ExceptionTypeTest(unittest.TestCase):
def test_exception_types(self):
"""
PYTHON-443
Sanity check to ensure we don't unintentionally change class hierarchy of exception types
"""
self.assertTrue(issubclass(Unavailable, DriverException))
self.assertTrue(issubclass(Unavailable, RequestExecutionException))
self.assertTrue(issubclass(ReadTimeout, DriverException))
self.assertTrue(issubclass(ReadTimeout, RequestExecutionException))
self.assertTrue(issubclass(ReadTimeout, Timeout))
self.assertTrue(issubclass(WriteTimeout, DriverException))
self.assertTrue(issubclass(WriteTimeout, RequestExecutionException))
self.assertTrue(issubclass(WriteTimeout, Timeout))
self.assertTrue(issubclass(CoordinationFailure, DriverException))
self.assertTrue(issubclass(CoordinationFailure, RequestExecutionException))
self.assertTrue(issubclass(ReadFailure, DriverException))
self.assertTrue(issubclass(ReadFailure, RequestExecutionException))
self.assertTrue(issubclass(ReadFailure, CoordinationFailure))
self.assertTrue(issubclass(WriteFailure, DriverException))
self.assertTrue(issubclass(WriteFailure, RequestExecutionException))
self.assertTrue(issubclass(WriteFailure, CoordinationFailure))
self.assertTrue(issubclass(FunctionFailure, DriverException))
self.assertTrue(issubclass(FunctionFailure, RequestExecutionException))
self.assertTrue(issubclass(RequestValidationException, DriverException))
self.assertTrue(issubclass(ConfigurationException, DriverException))
self.assertTrue(issubclass(ConfigurationException, RequestValidationException))
self.assertTrue(issubclass(AlreadyExists, DriverException))
self.assertTrue(issubclass(AlreadyExists, RequestValidationException))
self.assertTrue(issubclass(AlreadyExists, ConfigurationException))
self.assertTrue(issubclass(InvalidRequest, DriverException))
self.assertTrue(issubclass(InvalidRequest, RequestValidationException))
self.assertTrue(issubclass(Unauthorized, DriverException))
self.assertTrue(issubclass(Unauthorized, RequestValidationException))
self.assertTrue(issubclass(AuthenticationFailed, DriverException))
self.assertTrue(issubclass(OperationTimedOut, DriverException))
self.assertTrue(issubclass(UnsupportedOperation, DriverException))
class MockOrderedPolicy(RoundRobinPolicy):
all_hosts = set()
def make_query_plan(self, working_keyspace=None, query=None):
return sorted(self.all_hosts, key=lambda x: x.endpoint.ssl_options['server_hostname'])
class ClusterTest(unittest.TestCase):
def test_tuple_for_contact_points(self):
cluster = Cluster(contact_points=[('localhost', 9045), ('127.0.0.2', 9046), '127.0.0.3'], port=9999)
localhost_addr = set([addr[0] for addr in [t for (_,_,_,_,t) in socket.getaddrinfo("localhost",80)]])
for cp in cluster.endpoints_resolved:
if cp.address in localhost_addr:
self.assertEqual(cp.port, 9045)
elif cp.address == '127.0.0.2':
self.assertEqual(cp.port, 9046)
else:
self.assertEqual(cp.address, '127.0.0.3')
self.assertEqual(cp.port, 9999)
def test_invalid_contact_point_types(self):
with self.assertRaises(ValueError):
Cluster(contact_points=[None], protocol_version=4, connect_timeout=1)
with self.assertRaises(TypeError):
Cluster(contact_points="not a sequence", protocol_version=4, connect_timeout=1)
def test_requests_in_flight_threshold(self):
d = HostDistance.LOCAL
mn = 3
mx = 5
c = Cluster(protocol_version=2)
c.set_min_requests_per_connection(d, mn)
c.set_max_requests_per_connection(d, mx)
# min underflow, max, overflow
for n in (-1, mx, 127):
self.assertRaises(ValueError, c.set_min_requests_per_connection, d, n)
# max underflow, under min, overflow
for n in (0, mn, 128):
self.assertRaises(ValueError, c.set_max_requests_per_connection, d, n)
# Validate that at least the default LBP can create a query plan with end points that resolve
# to different addresses initially. This may not be exactly how things play out in practice
# (the control connection will muck with this even if nothing else does) but it should be
# a pretty good approximation.
def test_query_plan_for_sni_contains_unique_addresses(self):
node_cnt = 5
def _mocked_proxy_dns_resolution(self):
return [(socket.AF_UNIX, socket.SOCK_STREAM, 0, None, ('127.0.0.%s' % (i,), 9042)) for i in range(node_cnt)]
c = Cluster()
lbp = c.load_balancing_policy
lbp.local_dc = "dc1"
factory = SniEndPointFactory("proxy.foo.bar", 9042)
for host in (Host(factory.create({"host_id": uuid.uuid4().hex, "dc": "dc1"}), SimpleConvictionPolicy) for _ in range(node_cnt)):
lbp.on_up(host)
with patch.object(SniEndPoint, '_resolve_proxy_addresses', _mocked_proxy_dns_resolution):
addrs = [host.endpoint.resolve() for host in lbp.make_query_plan()]
# single SNI endpoint should be resolved to multiple unique IP addresses
self.assertEqual(len(addrs), len(set(addrs)))
class SchedulerTest(unittest.TestCase):
# TODO: this suite could be expanded; for now just adding a test covering a ticket
@patch('time.time', return_value=3) # always queue at same time
@patch('cassandra.cluster._Scheduler.run') # don't actually run the thread
def test_event_delay_timing(self, *_):
"""
Schedule something with a time collision to make sure the heap comparison works
PYTHON-473
"""
sched = _Scheduler(None)
sched.schedule(0, lambda: None)
sched.schedule(0, lambda: None) # pre-473: "TypeError: unorderable types: function() < function()"t
class SessionTest(unittest.TestCase):
def setUp(self):
if connection_class is None:
raise unittest.SkipTest('libev does not appear to be installed correctly')
connection_class.initialize_reactor()
# TODO: this suite could be expanded; for now just adding a test covering a PR
@mock_session_pools
def test_default_serial_consistency_level_ep(self, *_):
"""
Make sure default_serial_consistency_level passes through to a query message using execution profiles.
Also make sure Statement.serial_consistency_level overrides the default.
PR #510
"""
c = Cluster(protocol_version=4)
s = Session(c, [Host("127.0.0.1", SimpleConvictionPolicy)])
# default is None
default_profile = c.profile_manager.default
self.assertIsNone(default_profile.serial_consistency_level)
for cl in (None, ConsistencyLevel.LOCAL_SERIAL, ConsistencyLevel.SERIAL):
s.get_execution_profile(EXEC_PROFILE_DEFAULT).serial_consistency_level = cl
# default is passed through
f = s.execute_async(query='')
self.assertEqual(f.message.serial_consistency_level, cl)
# any non-None statement setting takes precedence
for cl_override in (ConsistencyLevel.LOCAL_SERIAL, ConsistencyLevel.SERIAL):
f = s.execute_async(SimpleStatement(query_string='', serial_consistency_level=cl_override))
self.assertEqual(default_profile.serial_consistency_level, cl)
self.assertEqual(f.message.serial_consistency_level, cl_override)
@mock_session_pools
def test_default_serial_consistency_level_legacy(self, *_):
"""
Make sure default_serial_consistency_level passes through to a query message using legacy settings.
Also make sure Statement.serial_consistency_level overrides the default.
PR #510
"""
c = Cluster(protocol_version=4)
s = Session(c, [Host("127.0.0.1", SimpleConvictionPolicy)])
# default is None
self.assertIsNone(s.default_serial_consistency_level)
# Should fail
with self.assertRaises(ValueError):
s.default_serial_consistency_level = ConsistencyLevel.ANY
with self.assertRaises(ValueError):
s.default_serial_consistency_level = 1001
for cl in (None, ConsistencyLevel.LOCAL_SERIAL, ConsistencyLevel.SERIAL):
s.default_serial_consistency_level = cl
# any non-None statement setting takes precedence
for cl_override in (ConsistencyLevel.LOCAL_SERIAL, ConsistencyLevel.SERIAL):
f = s.execute_async(SimpleStatement(query_string='', serial_consistency_level=cl_override))
self.assertEqual(s.default_serial_consistency_level, cl)
self.assertEqual(f.message.serial_consistency_level, cl_override)
class ProtocolVersionTests(unittest.TestCase):
def test_protocol_downgrade_test(self):
lower = ProtocolVersion.get_lower_supported(ProtocolVersion.DSE_V2)
self.assertEqual(ProtocolVersion.DSE_V1, lower)
lower = ProtocolVersion.get_lower_supported(ProtocolVersion.DSE_V1)
self.assertEqual(ProtocolVersion.V5,lower)
lower = ProtocolVersion.get_lower_supported(ProtocolVersion.V5)
self.assertEqual(ProtocolVersion.V4,lower)
lower = ProtocolVersion.get_lower_supported(ProtocolVersion.V4)
self.assertEqual(ProtocolVersion.V3,lower)
lower = ProtocolVersion.get_lower_supported(ProtocolVersion.V3)
self.assertEqual(ProtocolVersion.V2,lower)
lower = ProtocolVersion.get_lower_supported(ProtocolVersion.V2)
self.assertEqual(ProtocolVersion.V1, lower)
lower = ProtocolVersion.get_lower_supported(ProtocolVersion.V1)
self.assertEqual(0, lower)
self.assertTrue(ProtocolVersion.uses_error_code_map(ProtocolVersion.DSE_V1))
self.assertTrue(ProtocolVersion.uses_int_query_flags(ProtocolVersion.DSE_V1))
self.assertFalse(ProtocolVersion.uses_error_code_map(ProtocolVersion.V4))
self.assertFalse(ProtocolVersion.uses_int_query_flags(ProtocolVersion.V4))
class ExecutionProfileTest(unittest.TestCase):
def setUp(self):
if connection_class is None:
raise unittest.SkipTest('libev does not appear to be installed correctly')
connection_class.initialize_reactor()
def _verify_response_future_profile(self, rf, prof):
self.assertEqual(rf._load_balancer, prof.load_balancing_policy)
self.assertEqual(rf._retry_policy, prof.retry_policy)
self.assertEqual(rf.message.consistency_level, prof.consistency_level)
self.assertEqual(rf.message.serial_consistency_level, prof.serial_consistency_level)
self.assertEqual(rf.timeout, prof.request_timeout)
self.assertEqual(rf.row_factory, prof.row_factory)
@mock_session_pools
def test_default_exec_parameters(self):
cluster = Cluster()
self.assertEqual(cluster._config_mode, _ConfigMode.UNCOMMITTED)
self.assertEqual(cluster.load_balancing_policy.__class__, default_lbp_factory().__class__)
self.assertEqual(cluster.profile_manager.default.load_balancing_policy.__class__, default_lbp_factory().__class__)
self.assertEqual(cluster.default_retry_policy.__class__, RetryPolicy)
self.assertEqual(cluster.profile_manager.default.retry_policy.__class__, RetryPolicy)
session = Session(cluster, hosts=[Host("127.0.0.1", SimpleConvictionPolicy)])
self.assertEqual(session.default_timeout, 10.0)
self.assertEqual(cluster.profile_manager.default.request_timeout, 10.0)
self.assertEqual(session.default_consistency_level, ConsistencyLevel.LOCAL_ONE)
self.assertEqual(cluster.profile_manager.default.consistency_level, ConsistencyLevel.LOCAL_ONE)
self.assertEqual(session.default_serial_consistency_level, None)
self.assertEqual(cluster.profile_manager.default.serial_consistency_level, None)
self.assertEqual(session.row_factory, named_tuple_factory)
self.assertEqual(cluster.profile_manager.default.row_factory, named_tuple_factory)
@mock_session_pools
def test_default_legacy(self):
cluster = Cluster(load_balancing_policy=RoundRobinPolicy(), default_retry_policy=DowngradingConsistencyRetryPolicy())
self.assertEqual(cluster._config_mode, _ConfigMode.LEGACY)
session = Session(cluster, hosts=[Host("127.0.0.1", SimpleConvictionPolicy)])
session.default_timeout = 3.7
session.default_consistency_level = ConsistencyLevel.ALL
session.default_serial_consistency_level = ConsistencyLevel.SERIAL
rf = session.execute_async("query")
expected_profile = ExecutionProfile(cluster.load_balancing_policy, cluster.default_retry_policy,
session.default_consistency_level, session.default_serial_consistency_level,
session.default_timeout, session.row_factory)
self._verify_response_future_profile(rf, expected_profile)
@mock_session_pools
def test_default_profile(self):
non_default_profile = ExecutionProfile(RoundRobinPolicy(), *[object() for _ in range(2)])
cluster = Cluster(execution_profiles={'non-default': non_default_profile})
session = Session(cluster, hosts=[Host("127.0.0.1", SimpleConvictionPolicy)])
self.assertEqual(cluster._config_mode, _ConfigMode.PROFILES)
default_profile = cluster.profile_manager.profiles[EXEC_PROFILE_DEFAULT]
rf = session.execute_async("query")
self._verify_response_future_profile(rf, default_profile)
rf = session.execute_async("query", execution_profile='non-default')
self._verify_response_future_profile(rf, non_default_profile)
for name, ep in cluster.profile_manager.profiles.items():
self.assertEqual(ep, session.get_execution_profile(name))
# invalid ep
with self.assertRaises(ValueError):
session.get_execution_profile('non-existent')
def test_serial_consistency_level_validation(self):
# should pass
ep = ExecutionProfile(RoundRobinPolicy(), serial_consistency_level=ConsistencyLevel.SERIAL)
ep = ExecutionProfile(RoundRobinPolicy(), serial_consistency_level=ConsistencyLevel.LOCAL_SERIAL)
# should not pass
with self.assertRaises(ValueError):
ep = ExecutionProfile(RoundRobinPolicy(), serial_consistency_level=ConsistencyLevel.ANY)
with self.assertRaises(ValueError):
ep = ExecutionProfile(RoundRobinPolicy(), serial_consistency_level=42)
@mock_session_pools
def test_statement_params_override_legacy(self):
cluster = Cluster(load_balancing_policy=RoundRobinPolicy(), default_retry_policy=DowngradingConsistencyRetryPolicy())
self.assertEqual(cluster._config_mode, _ConfigMode.LEGACY)
session = Session(cluster, hosts=[Host("127.0.0.1", SimpleConvictionPolicy)])
ss = SimpleStatement("query", retry_policy=DowngradingConsistencyRetryPolicy(),
consistency_level=ConsistencyLevel.ALL, serial_consistency_level=ConsistencyLevel.SERIAL)
my_timeout = 1.1234
self.assertNotEqual(ss.retry_policy.__class__, cluster.default_retry_policy)
self.assertNotEqual(ss.consistency_level, session.default_consistency_level)
self.assertNotEqual(ss._serial_consistency_level, session.default_serial_consistency_level)
self.assertNotEqual(my_timeout, session.default_timeout)
rf = session.execute_async(ss, timeout=my_timeout)
expected_profile = ExecutionProfile(load_balancing_policy=cluster.load_balancing_policy, retry_policy=ss.retry_policy,
request_timeout=my_timeout, consistency_level=ss.consistency_level,
serial_consistency_level=ss._serial_consistency_level)
self._verify_response_future_profile(rf, expected_profile)
@mock_session_pools
def test_statement_params_override_profile(self):
non_default_profile = ExecutionProfile(RoundRobinPolicy(), *[object() for _ in range(2)])
cluster = Cluster(execution_profiles={'non-default': non_default_profile})
session = Session(cluster, hosts=[Host("127.0.0.1", SimpleConvictionPolicy)])
self.assertEqual(cluster._config_mode, _ConfigMode.PROFILES)
rf = session.execute_async("query", execution_profile='non-default')
ss = SimpleStatement("query", retry_policy=DowngradingConsistencyRetryPolicy(),
consistency_level=ConsistencyLevel.ALL, serial_consistency_level=ConsistencyLevel.SERIAL)
my_timeout = 1.1234
self.assertNotEqual(ss.retry_policy.__class__, rf._load_balancer.__class__)
self.assertNotEqual(ss.consistency_level, rf.message.consistency_level)
self.assertNotEqual(ss._serial_consistency_level, rf.message.serial_consistency_level)
self.assertNotEqual(my_timeout, rf.timeout)
rf = session.execute_async(ss, timeout=my_timeout, execution_profile='non-default')
expected_profile = ExecutionProfile(non_default_profile.load_balancing_policy, ss.retry_policy,
ss.consistency_level, ss._serial_consistency_level, my_timeout, non_default_profile.row_factory)
self._verify_response_future_profile(rf, expected_profile)
@mock_session_pools
def test_no_profile_with_legacy(self):
# don't construct with both
self.assertRaises(ValueError, Cluster, load_balancing_policy=RoundRobinPolicy(), execution_profiles={'a': ExecutionProfile()})
self.assertRaises(ValueError, Cluster, default_retry_policy=DowngradingConsistencyRetryPolicy(), execution_profiles={'a': ExecutionProfile()})
self.assertRaises(ValueError, Cluster, load_balancing_policy=RoundRobinPolicy(),
default_retry_policy=DowngradingConsistencyRetryPolicy(), execution_profiles={'a': ExecutionProfile()})
# can't add after
cluster = Cluster(load_balancing_policy=RoundRobinPolicy())
self.assertRaises(ValueError, cluster.add_execution_profile, 'name', ExecutionProfile())
# session settings lock out profiles
cluster = Cluster()
session = Session(cluster, hosts=[Host("127.0.0.1", SimpleConvictionPolicy)])
for attr, value in (('default_timeout', 1),
('default_consistency_level', ConsistencyLevel.ANY),
('default_serial_consistency_level', ConsistencyLevel.SERIAL),
('row_factory', tuple_factory)):
cluster._config_mode = _ConfigMode.UNCOMMITTED
setattr(session, attr, value)
self.assertRaises(ValueError, cluster.add_execution_profile, 'name' + attr, ExecutionProfile())
# don't accept profile
self.assertRaises(ValueError, session.execute_async, "query", execution_profile='some name here')
@mock_session_pools
def test_no_legacy_with_profile(self):
cluster_init = Cluster(execution_profiles={'name': ExecutionProfile()})
cluster_add = Cluster()
cluster_add.add_execution_profile('name', ExecutionProfile())
# for clusters with profiles added either way...
for cluster in (cluster_init, cluster_init):
# don't allow legacy parameters set
for attr, value in (('default_retry_policy', RetryPolicy()),
('load_balancing_policy', default_lbp_factory())):
self.assertRaises(ValueError, setattr, cluster, attr, value)
session = Session(cluster, hosts=[Host("127.0.0.1", SimpleConvictionPolicy)])
for attr, value in (('default_timeout', 1),
('default_consistency_level', ConsistencyLevel.ANY),
('default_serial_consistency_level', ConsistencyLevel.SERIAL),
('row_factory', tuple_factory)):
self.assertRaises(ValueError, setattr, session, attr, value)
@mock_session_pools
def test_profile_name_value(self):
internalized_profile = ExecutionProfile(RoundRobinPolicy(), *[object() for _ in range(2)])
cluster = Cluster(execution_profiles={'by-name': internalized_profile})
session = Session(cluster, hosts=[Host("127.0.0.1", SimpleConvictionPolicy)])
self.assertEqual(cluster._config_mode, _ConfigMode.PROFILES)
rf = session.execute_async("query", execution_profile='by-name')
self._verify_response_future_profile(rf, internalized_profile)
by_value = ExecutionProfile(RoundRobinPolicy(), *[object() for _ in range(2)])
rf = session.execute_async("query", execution_profile=by_value)
self._verify_response_future_profile(rf, by_value)
@mock_session_pools
def test_exec_profile_clone(self):
cluster = Cluster(execution_profiles={EXEC_PROFILE_DEFAULT: ExecutionProfile(), 'one': ExecutionProfile()})
session = Session(cluster, hosts=[Host("127.0.0.1", SimpleConvictionPolicy)])
profile_attrs = {'request_timeout': 1,
'consistency_level': ConsistencyLevel.ANY,
'serial_consistency_level': ConsistencyLevel.SERIAL,
'row_factory': tuple_factory,
'retry_policy': RetryPolicy(),
'load_balancing_policy': default_lbp_factory()}
reference_attributes = ('retry_policy', 'load_balancing_policy')
# default and one named
for profile in (EXEC_PROFILE_DEFAULT, 'one'):
active = session.get_execution_profile(profile)
clone = session.execution_profile_clone_update(profile)
self.assertIsNot(clone, active)
all_updated = session.execution_profile_clone_update(clone, **profile_attrs)
self.assertIsNot(all_updated, clone)
for attr, value in profile_attrs.items():
self.assertEqual(getattr(clone, attr), getattr(active, attr))
if attr in reference_attributes:
self.assertIs(getattr(clone, attr), getattr(active, attr))
self.assertNotEqual(getattr(all_updated, attr), getattr(active, attr))
# cannot clone nonexistent profile
self.assertRaises(ValueError, session.execution_profile_clone_update, 'DOES NOT EXIST', **profile_attrs)
def test_no_profiles_same_name(self):
# can override default in init
cluster = Cluster(execution_profiles={EXEC_PROFILE_DEFAULT: ExecutionProfile(), 'one': ExecutionProfile()})
# cannot update default
self.assertRaises(ValueError, cluster.add_execution_profile, EXEC_PROFILE_DEFAULT, ExecutionProfile())
# cannot update named init
self.assertRaises(ValueError, cluster.add_execution_profile, 'one', ExecutionProfile())
# can add new name
cluster.add_execution_profile('two', ExecutionProfile())
# cannot add a profile added dynamically
self.assertRaises(ValueError, cluster.add_execution_profile, 'two', ExecutionProfile())
def test_warning_on_no_lbp_with_contact_points_legacy_mode(self):
"""
Test that users are warned when they instantiate a Cluster object in
legacy mode with contact points but no load-balancing policy.
@since 3.12.0
@jira_ticket PYTHON-812
@expected_result logs
@test_category configuration
"""
self._check_warning_on_no_lbp_with_contact_points(
cluster_kwargs={'contact_points': ['127.0.0.1']}
)
def test_warning_on_no_lbp_with_contact_points_profile_mode(self):
"""
Test that users are warned when they instantiate a Cluster object in
execution profile mode with contact points but no load-balancing
policy.
@since 3.12.0
@jira_ticket PYTHON-812
@expected_result logs
@test_category configuration
"""
self._check_warning_on_no_lbp_with_contact_points(cluster_kwargs={
'contact_points': ['127.0.0.1'],
'execution_profiles': {EXEC_PROFILE_DEFAULT: ExecutionProfile()}
})
@mock_session_pools
def _check_warning_on_no_lbp_with_contact_points(self, cluster_kwargs):
with patch('cassandra.cluster.log') as patched_logger:
Cluster(**cluster_kwargs)
patched_logger.warning.assert_called_once()
warning_message = patched_logger.warning.call_args[0][0]
self.assertIn('please specify a load-balancing policy', warning_message)
self.assertIn("contact_points = ['127.0.0.1']", warning_message)
def test_no_warning_on_contact_points_with_lbp_legacy_mode(self):
"""
Test that users aren't warned when they instantiate a Cluster object
with contact points and a load-balancing policy in legacy mode.
@since 3.12.0
@jira_ticket PYTHON-812
@expected_result no logs
@test_category configuration
"""
self._check_no_warning_on_contact_points_with_lbp({
'contact_points': ['127.0.0.1'],
'load_balancing_policy': object()
})
def test_no_warning_on_contact_points_with_lbp_profiles_mode(self):
"""
Test that users aren't warned when they instantiate a Cluster object
with contact points and a load-balancing policy in execution profile
mode.
@since 3.12.0
@jira_ticket PYTHON-812
@expected_result no logs
@test_category configuration
"""
ep_with_lbp = ExecutionProfile(load_balancing_policy=object())
self._check_no_warning_on_contact_points_with_lbp(cluster_kwargs={
'contact_points': ['127.0.0.1'],
'execution_profiles': {
EXEC_PROFILE_DEFAULT: ep_with_lbp
}
})
@mock_session_pools
def _check_no_warning_on_contact_points_with_lbp(self, cluster_kwargs):
"""
Test that users aren't warned when they instantiate a Cluster object
with contact points and a load-balancing policy.
@since 3.12.0
@jira_ticket PYTHON-812
@expected_result no logs
@test_category configuration
"""
with patch('cassandra.cluster.log') as patched_logger:
Cluster(**cluster_kwargs)
patched_logger.warning.assert_not_called()
@mock_session_pools
def test_warning_adding_no_lbp_ep_to_cluster_with_contact_points(self):
ep_with_lbp = ExecutionProfile(load_balancing_policy=object())
cluster = Cluster(
contact_points=['127.0.0.1'],
execution_profiles={EXEC_PROFILE_DEFAULT: ep_with_lbp})
with patch('cassandra.cluster.log') as patched_logger:
cluster.add_execution_profile(
name='no_lbp',
profile=ExecutionProfile()
)
patched_logger.warning.assert_called_once()
warning_message = patched_logger.warning.call_args[0][0]
self.assertIn('no_lbp', warning_message)
self.assertIn('trying to add', warning_message)
self.assertIn('please specify a load-balancing policy', warning_message)
@mock_session_pools
def test_no_warning_adding_lbp_ep_to_cluster_with_contact_points(self):
ep_with_lbp = ExecutionProfile(load_balancing_policy=object())
cluster = Cluster(
contact_points=['127.0.0.1'],
execution_profiles={EXEC_PROFILE_DEFAULT: ep_with_lbp})
with patch('cassandra.cluster.log') as patched_logger:
cluster.add_execution_profile(
name='with_lbp',
profile=ExecutionProfile(load_balancing_policy=Mock(name='lbp'))
)
patched_logger.warning.assert_not_called()