blob: 211d76cf5cfd65585059988c399d24d478447455 [file] [log] [blame]
# Licensed to the Apache Software Foundation (ASF) under one
# or more contributor license agreements. See the NOTICE file
# distributed with this work for additional information
# regarding copyright ownership. The ASF licenses this file
# to you under the Apache License, Version 2.0 (the
# "License"); you may not use this file except in compliance
# with the License. You may obtain a copy of the License at
#
# http://www.apache.org/licenses/LICENSE-2.0
#
# Unless required by applicable law or agreed to in writing, software
# distributed under the License is distributed on an "AS IS" BASIS,
# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
# See the License for the specific language governing permissions and
# limitations under the License.
import unittest
from uuid import uuid4
from cassandra.query import FETCH_SIZE_UNSET
from cassandra.cqlengine.statements import BaseCQLStatement
from cassandra.cqlengine.management import sync_table, drop_table
from cassandra.cqlengine.statements import InsertStatement, UpdateStatement, SelectStatement, DeleteStatement, \
WhereClause
from cassandra.cqlengine.operators import EqualsOperator, LikeOperator
from cassandra.cqlengine.columns import Column
from tests.integration.cqlengine.base import BaseCassEngTestCase, TestQueryUpdateModel
from tests.integration.cqlengine import DEFAULT_KEYSPACE
from tests.integration import greaterthanorequalcass3_10, lessthandse69, TestCluster
from cassandra.cqlengine.connection import execute
class BaseStatementTest(unittest.TestCase):
def test_fetch_size(self):
""" tests that fetch_size is correctly set """
stmt = BaseCQLStatement('table', None, fetch_size=1000)
self.assertEqual(stmt.fetch_size, 1000)
stmt = BaseCQLStatement('table', None, fetch_size=None)
self.assertEqual(stmt.fetch_size, FETCH_SIZE_UNSET)
stmt = BaseCQLStatement('table', None)
self.assertEqual(stmt.fetch_size, FETCH_SIZE_UNSET)
class ExecuteStatementTest(BaseCassEngTestCase):
text = "text_for_db"
@classmethod
def setUpClass(cls):
super(ExecuteStatementTest, cls).setUpClass()
sync_table(TestQueryUpdateModel)
cls.table_name = '{0}.test_query_update_model'.format(DEFAULT_KEYSPACE)
@classmethod
def tearDownClass(cls):
super(ExecuteStatementTest, cls).tearDownClass()
drop_table(TestQueryUpdateModel)
def _verify_statement(self, original):
st = SelectStatement(self.table_name)
result = execute(st)
response = result[0]
for assignment in original.assignments:
self.assertEqual(response[assignment.field], assignment.value)
self.assertEqual(len(response), 7)
def test_insert_statement_execute(self):
"""
Test to verify the execution of BaseCQLStatements using connection.execute
@since 3.10
@jira_ticket PYTHON-505
@expected_result inserts a row in C*, updates the rows and then deletes
all the rows using BaseCQLStatements
@test_category data_types:object_mapper
"""
partition = uuid4()
cluster = 1
self._insert_statement(partition, cluster)
# Verifying update statement
where = [WhereClause('partition', EqualsOperator(), partition),
WhereClause('cluster', EqualsOperator(), cluster)]
st = UpdateStatement(self.table_name, where=where)
st.add_assignment(Column(db_field='count'), 2)
st.add_assignment(Column(db_field='text'), "text_for_db_update")
st.add_assignment(Column(db_field='text_set'), set(("foo_update", "bar_update")))
st.add_assignment(Column(db_field='text_list'), ["foo_update", "bar_update"])
st.add_assignment(Column(db_field='text_map'), {"foo": '3', "bar": '4'})
execute(st)
self._verify_statement(st)
# Verifying delete statement
execute(DeleteStatement(self.table_name, where=where))
self.assertEqual(TestQueryUpdateModel.objects.count(), 0)
@greaterthanorequalcass3_10
@lessthandse69
def test_like_operator(self):
"""
Test to verify the like operator works appropriately
@since 3.13
@jira_ticket PYTHON-512
@expected_result the expected row is read using LIKE
@test_category data_types:object_mapper
"""
cluster = TestCluster()
session = cluster.connect()
self.addCleanup(cluster.shutdown)
session.execute("""CREATE CUSTOM INDEX text_index ON {} (text)
USING 'org.apache.cassandra.index.sasi.SASIIndex';""".format(self.table_name))
self.addCleanup(session.execute, "DROP INDEX {}.text_index".format(DEFAULT_KEYSPACE))
partition = uuid4()
cluster = 1
self._insert_statement(partition, cluster)
ss = SelectStatement(self.table_name)
like_clause = "text_for_%"
ss.add_where(Column(db_field='text'), LikeOperator(), like_clause)
self.assertEqual(str(ss),
'SELECT * FROM {} WHERE "text" LIKE %(0)s'.format(self.table_name))
result = execute(ss)
self.assertEqual(result[0]["text"], self.text)
q = TestQueryUpdateModel.objects.filter(text__like=like_clause).allow_filtering()
self.assertEqual(q[0].text, self.text)
q = TestQueryUpdateModel.objects.filter(text__like=like_clause)
self.assertEqual(q[0].text, self.text)
def _insert_statement(self, partition, cluster):
# Verifying insert statement
st = InsertStatement(self.table_name)
st.add_assignment(Column(db_field='partition'), partition)
st.add_assignment(Column(db_field='cluster'), cluster)
st.add_assignment(Column(db_field='count'), 1)
st.add_assignment(Column(db_field='text'), self.text)
st.add_assignment(Column(db_field='text_set'), set(("foo", "bar")))
st.add_assignment(Column(db_field='text_list'), ["foo", "bar"])
st.add_assignment(Column(db_field='text_map'), {"foo": '1', "bar": '2'})
execute(st)
self._verify_statement(st)