blob: 82b0818faeced86c3a96f1e6d92823c46f3ef572 [file]
# Licensed to the Apache Software Foundation (ASF) under one
# or more contributor license agreements. See the NOTICE file
# distributed with this work for additional information
# regarding copyright ownership. The ASF licenses this file
# to you under the Apache License, Version 2.0 (the
# "License"); you may not use this file except in compliance
# with the License. You may obtain a copy of the License at
#
# http://www.apache.org/licenses/LICENSE-2.0
#
# Unless required by applicable law or agreed to in writing, software
# distributed under the License is distributed on an "AS IS" BASIS,
# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
# See the License for the specific language governing permissions and
# limitations under the License.
import unittest
from packaging.version import Version
from cassandra.cluster import (EXEC_PROFILE_DEFAULT,
ContinuousPagingOptions, ExecutionProfile,
ProtocolVersion)
from cassandra.cqlengine import columns, connection, models
from cassandra.cqlengine.management import drop_table, sync_table
from tests.integration import (DSE_VERSION, greaterthanorequaldse51,
greaterthanorequaldse60, requiredse, TestCluster)
class TestMultiKeyModel(models.Model):
partition = columns.Integer(primary_key=True)
cluster = columns.Integer(primary_key=True)
count = columns.Integer(required=False)
text = columns.Text(required=False)
def setup_module():
if DSE_VERSION:
sync_table(TestMultiKeyModel)
for i in range(1000):
TestMultiKeyModel.create(partition=i, cluster=i, count=5, text="text to write")
def teardown_module():
if DSE_VERSION:
drop_table(TestMultiKeyModel)
@requiredse
class BasicConcurrentTests():
required_dse_version = None
protocol_version = None
connections = set()
sane_connections = {"CONTDEFAULT"}
@classmethod
def setUpClass(cls):
if DSE_VERSION:
cls._create_cluster_with_cp_options("CONTDEFAULT", ContinuousPagingOptions())
cls._create_cluster_with_cp_options("ONEPAGE", ContinuousPagingOptions(max_pages=1))
cls._create_cluster_with_cp_options("MANYPAGES", ContinuousPagingOptions(max_pages=10))
cls._create_cluster_with_cp_options("SLOW", ContinuousPagingOptions(max_pages_per_second=1))
@classmethod
def tearDownClass(cls):
if not DSE_VERSION or DSE_VERSION < cls.required_dse_version:
return
cls.cluster_default.shutdown()
connection.set_default_connection("default")
@classmethod
def _create_cluster_with_cp_options(cls, name, cp_options):
execution_profiles = {EXEC_PROFILE_DEFAULT:
ExecutionProfile(continuous_paging_options=cp_options)}
cls.cluster_default = TestCluster(protocol_version=cls.protocol_version,
execution_profiles=execution_profiles)
cls.session_default = cls.cluster_default.connect(wait_for_all_pools=True)
connection.register_connection(name, default=True, session=cls.session_default)
cls.connections.add(name)
def test_continuous_paging_basic(self):
"""
Test to ensure that various continuous paging works with cqlengine
for session
@since DSE 2.4
@jira_ticket PYTHON-872
@expected_result various continous paging options should fetch all the results
@test_category queries
"""
for connection_name in self.sane_connections:
connection.set_default_connection(connection_name)
row = TestMultiKeyModel.get(partition=0, cluster=0)
self.assertEqual(row.partition, 0)
self.assertEqual(row.cluster, 0)
rows = TestMultiKeyModel.objects().allow_filtering()
self.assertEqual(len(rows), 1000)
def test_fetch_size(self):
"""
Test to ensure that various continuous paging works with different fetch sizes
for session
@since DSE 2.4
@jira_ticket PYTHON-872
@expected_result various continous paging options should fetch all the results
@test_category queries
"""
for connection_name in self.connections:
conn = connection._connections[connection_name]
initial_default = conn.session.default_fetch_size
self.addCleanup(
setattr,
conn.session,
"default_fetch_size",
initial_default
)
connection.set_default_connection("ONEPAGE")
for fetch_size in (2, 3, 7, 10, 99, 100, 101, 150):
connection._connections["ONEPAGE"].session.default_fetch_size = fetch_size
rows = TestMultiKeyModel.objects().allow_filtering()
self.assertEqual(fetch_size, len(rows))
connection.set_default_connection("MANYPAGES")
for fetch_size in (2, 3, 7, 10, 15):
connection._connections["MANYPAGES"].session.default_fetch_size = fetch_size
rows = TestMultiKeyModel.objects().allow_filtering()
self.assertEqual(fetch_size * 10, len(rows))
for connection_name in self.sane_connections:
connection.set_default_connection(connection_name)
for fetch_size in (2, 3, 7, 10, 99, 100, 101, 150):
connection._connections[connection_name].session.default_fetch_size = fetch_size
rows = TestMultiKeyModel.objects().allow_filtering()
self.assertEqual(1000, len(rows))
@requiredse
@greaterthanorequaldse51
class ContPagingTestsDSEV1(BasicConcurrentTests, unittest.TestCase):
@classmethod
def setUpClass(cls):
BasicConcurrentTests.required_dse_version = Version('5.1')
if not DSE_VERSION or DSE_VERSION < BasicConcurrentTests.required_dse_version:
return
BasicConcurrentTests.protocol_version = ProtocolVersion.DSE_V1
BasicConcurrentTests.setUpClass()
@requiredse
@greaterthanorequaldse60
class ContPagingTestsDSEV2(BasicConcurrentTests, unittest.TestCase):
@classmethod
def setUpClass(cls):
BasicConcurrentTests.required_dse_version = Version('6.0')
if not DSE_VERSION or DSE_VERSION < BasicConcurrentTests.required_dse_version:
return
BasicConcurrentTests.protocol_version = ProtocolVersion.DSE_V2
BasicConcurrentTests.setUpClass()
cls.connections = cls.connections.union({"SMALL_QUEUE", "BIG_QUEUE"})
cls.sane_connections = cls.sane_connections.union({"SMALL_QUEUE", "BIG_QUEUE"})
cls._create_cluster_with_cp_options("SMALL_QUEUE", ContinuousPagingOptions(max_queue_size=2))
cls._create_cluster_with_cp_options("BIG_QUEUE", ContinuousPagingOptions(max_queue_size=400))