blob: 5bd0d5ff93b01e8dfd7fdbabdb2dea59d275c8a4 [file] [log] [blame]
import collections
import re
import sys
import time
import traceback
import pytest
import threading
import logging
from flaky import flaky
from enum import Enum
from queue import Empty
from functools import partial
from multiprocessing import Process, Queue
from cassandra import ConsistencyLevel, InvalidRequest, WriteFailure
from cassandra.cluster import NoHostAvailable
from cassandra.concurrent import execute_concurrent_with_args
from cassandra.cluster import Cluster
from cassandra.query import SimpleStatement
from distutils.version import LooseVersion
from dtest import Tester, get_ip_from_node, create_ks
from tools.assertions import (assert_all, assert_crc_check_chance_equal,
assert_invalid, assert_none, assert_one,
assert_unavailable)
from tools.data import rows_to_list
from tools.misc import new_node
from tools.jmxutils import (JolokiaAgent, make_mbean)
since = pytest.mark.since
logger = logging.getLogger(__name__)
# CASSANDRA-10978. Migration wait (in seconds) to use in bootstrapping tests. Needed to handle
# pathological case of flushing schema keyspace for multiple data directories. See CASSANDRA-6696
# for multiple data directory changes and CASSANDRA-10421 for compaction logging that must be
# written.
MIGRATION_WAIT = 5
@flaky
@since('3.0')
class TestMaterializedViews(Tester):
"""
Test materialized views implementation.
@jira_ticket CASSANDRA-6477
@since 3.0
"""
def _rows_to_list(self, rows):
new_list = [list(row) for row in rows]
return new_list
def prepare(self, user_table=False, rf=1, options=None, nodes=3, install_byteman=False, **kwargs):
cluster = self.cluster
cluster.set_configuration_options({'enable_materialized_views': 'true'})
cluster.populate([nodes, 0], install_byteman=install_byteman)
if options:
cluster.set_configuration_options(values=options)
cluster.start()
node1 = cluster.nodelist()[0]
session = self.patient_cql_connection(node1, **kwargs)
create_ks(session, 'ks', rf)
if user_table:
session.execute(
("CREATE TABLE users (username varchar, password varchar, gender varchar, "
"session_token varchar, state varchar, birth_year bigint, "
"PRIMARY KEY (username));")
)
# create a materialized view
session.execute(("CREATE MATERIALIZED VIEW users_by_state AS "
"SELECT * FROM users WHERE STATE IS NOT NULL AND username IS NOT NULL "
"PRIMARY KEY (state, username)"))
return session
def update_view(self, session, query, flush, compact=False):
session.execute(query)
self._replay_batchlogs()
if flush:
self.cluster.flush()
if compact:
self.cluster.compact()
def _settle_nodes(self):
logger.debug("Settling all nodes")
stage_match = re.compile(r"(?P<name>\S+)\s+(?P<active>\d+)\s+(?P<pending>\d+)\s+(?P<completed>\d+)\s+(?P<blocked>\d+)\s+(?P<alltimeblocked>\d+)")
def _settled_stages(node):
(stdout, stderr, rc) = node.nodetool("tpstats")
lines = re.split("\n+", stdout)
for line in lines:
match = stage_match.match(line)
if match is not None:
active = int(match.group('active'))
pending = int(match.group('pending'))
if active != 0 or pending != 0:
logger.debug("%s - pool %s still has %d active and %d pending" % (node.name, match.group("name"), active, pending))
return False
return True
for node in self.cluster.nodelist():
if node.is_running():
node.nodetool("replaybatchlog")
attempts = 50 # 100 milliseconds per attempt, so 5 seconds total
while attempts > 0 and not _settled_stages(node):
time.sleep(0.1)
attempts -= 1
def _build_progress_table(self):
if self.cluster.version() >= '4':
return 'system.view_builds_in_progress'
else:
return 'system.views_builds_in_progress'
def _wait_for_view(self, ks, view):
logger.debug("waiting for view")
def _view_build_finished(node):
s = self.patient_exclusive_cql_connection(node)
query = "SELECT * FROM %s WHERE keyspace_name='%s' AND view_name='%s'" %\
(self._build_progress_table(), ks, view)
result = list(s.execute(query))
return len(result) == 0
for node in self.cluster.nodelist():
if node.is_running():
attempts = 50 # 1 sec per attempt, so 50 seconds total
while attempts > 0 and not _view_build_finished(node):
time.sleep(1)
attempts -= 1
if attempts <= 0:
raise RuntimeError("View {}.{} build not finished after 50 seconds.".format(ks, view))
def _wait_for_view_build_start(self, session, ks, view, wait_minutes=2):
"""Wait for the start of a MV build, ensuring that it has saved some progress"""
start = time.time()
while True:
try:
query = "SELECT COUNT(*) FROM %s WHERE keyspace_name='%s' AND view_name='%s'" %\
(self._build_progress_table(), ks, view)
result = list(session.execute(query))
assert 0 == result[0].count
except AssertionError:
break
elapsed = (time.time() - start) / 60
if elapsed > wait_minutes:
self.fail("The MV build hasn't started in 2 minutes.")
def _insert_data(self, session):
# insert data
insert_stmt = "INSERT INTO users (username, password, gender, state, birth_year) VALUES "
session.execute(insert_stmt + "('user1', 'ch@ngem3a', 'f', 'TX', 1968);")
session.execute(insert_stmt + "('user2', 'ch@ngem3b', 'm', 'CA', 1971);")
session.execute(insert_stmt + "('user3', 'ch@ngem3c', 'f', 'FL', 1978);")
session.execute(insert_stmt + "('user4', 'ch@ngem3d', 'm', 'TX', 1974);")
self._settle_nodes()
def _replay_batchlogs(self):
for node in self.cluster.nodelist():
if node.is_running():
logger.debug("Replaying batchlog on node {}".format(node.name))
node.nodetool("replaybatchlog")
# CASSANDRA-13069 - Ensure replayed mutations are removed from the batchlog
node_session = self.patient_exclusive_cql_connection(node)
result = list(node_session.execute("SELECT count(*) FROM system.batches;"))
assert result[0].count == 0
def _assert_view_meta(self, session, views, exists=True, nodes=2):
if exists:
assert_one(session, "SELECT COUNT(*) FROM system.built_views", [views])
if self.cluster.version() >= '3.11':
assert_one(session, "SELECT COUNT(*) FROM system_distributed.view_build_status", [views * nodes])
else:
assert_none(session, "SELECT * FROM system.built_views")
if self.cluster.version() >= '3.11':
assert_none(session, "SELECT * FROM system_distributed.view_build_status")
assert_none(session, "SELECT * FROM {}".format(self._build_progress_table()))
def test_view_metadata_cleanup(self):
"""
drop keyspace or view should clear built_views and view_build_status
"""
session = self.prepare(rf=2, nodes=2)
def populate_data(session, rows):
logger.debug("populate base data")
for v in range(rows):
session.execute("INSERT INTO t(k,c,a,b,e,f) VALUES({v},{v},{v},{v},{v},{v})".format(v=v))
def verify_data(session, rows, views):
logger.debug("verify view data")
for v in range(rows):
for view in range(views):
assert_one(session, "SELECT * FROM mv{} WHERE k={v} AND c={v}".format(view, v=v), [v, v, v, v, v, v])
def create_keyspace(session, ks="ks1", rf=2):
create_ks(session, ks, rf)
def create_table(session):
logger.debug("create base table")
session.execute("CREATE TABLE t (k int, c int, a int, b int, e int, f int, primary key(k, c))")
def create_views(session, views, keyspace="ks1"):
logger.debug("create view")
for view in range(views):
session.execute("CREATE MATERIALIZED VIEW mv{} AS SELECT * FROM t "
"WHERE k IS NOT NULL AND c IS NOT NULL PRIMARY KEY (c,k)".format(view),
timeout=60)
self._wait_for_view(keyspace, "mv{}".format(view))
def drop_keyspace(session, keyspace="ks1"):
logger.debug("drop keyspace {}".format(keyspace))
session.execute("DROP KEYSPACE IF EXISTS {}".format(keyspace),
timeout=60)
def drop_views(session, views):
logger.debug("drop all views")
for view in range(views):
session.execute("DROP MATERIALIZED VIEW IF EXISTS mv{}".format(view))
rows = 100
views = 5
create_keyspace(session)
create_table(session)
populate_data(session, rows)
create_views(session, views)
verify_data(session, rows, views)
self._assert_view_meta(session, views)
drop_keyspace(session)
self._assert_view_meta(session, views, exists=False)
create_keyspace(session)
create_table(session)
populate_data(session, rows)
create_views(session, views)
verify_data(session, rows, views)
self._assert_view_meta(session, views)
drop_views(session, views)
self._assert_view_meta(session, views, exists=False)
def test_create(self):
"""Test the materialized view creation"""
session = self.prepare(user_table=True)
result = list(session.execute(("SELECT * FROM system_schema.views "
"WHERE keyspace_name='ks' AND base_table_name='users' ALLOW FILTERING")))
assert len(result) == 1, "Expecting 1 materialized view == got" + str(result)
def test_gcgs_validation(self):
"""Verify that it's not possible to create or set a too low gc_grace_seconds on MVs"""
session = self.prepare(user_table=True)
# Shouldn't be able to alter the gc_grace_seconds of the base table to 0
assert_invalid(session,
"ALTER TABLE users WITH gc_grace_seconds = 0",
"Cannot alter gc_grace_seconds of the base table of a materialized view "
"to 0, since this value is used to TTL undelivered updates. Setting "
"gc_grace_seconds too low might cause undelivered updates to expire "
"before being replayed.")
# But can alter the gc_grace_seconds of the bease table to a value != 0
session.execute("ALTER TABLE users WITH gc_grace_seconds = 10")
# Shouldn't be able to alter the gc_grace_seconds of the MV to 0
assert_invalid(session,
"ALTER MATERIALIZED VIEW users_by_state WITH gc_grace_seconds = 0",
"Cannot alter gc_grace_seconds of a materialized view to 0, since "
"this value is used to TTL undelivered updates. Setting gc_grace_seconds "
"too low might cause undelivered updates to expire before being replayed.")
# Now let's drop MV
session.execute("DROP MATERIALIZED VIEW ks.users_by_state;")
# Now we should be able to set the gc_grace_seconds of the base table to 0
session.execute("ALTER TABLE users WITH gc_grace_seconds = 0")
# Now we shouldn't be able to create a new MV on this table
assert_invalid(session,
"CREATE MATERIALIZED VIEW users_by_state AS "
"SELECT * FROM users WHERE STATE IS NOT NULL AND username IS NOT NULL "
"PRIMARY KEY (state, username)",
"Cannot create materialized view 'users_by_state' for base table 'users' "
"with gc_grace_seconds of 0, since this value is used to TTL undelivered "
"updates. Setting gc_grace_seconds too low might cause undelivered updates"
" to expire before being replayed.")
def test_insert(self):
"""Test basic insertions"""
session = self.prepare(user_table=True)
self._insert_data(session)
result = list(session.execute("SELECT * FROM users;"))
assert len(result) == 4, "Expecting {} users, got {}".format(4 == len(result))
result = list(session.execute("SELECT * FROM users_by_state WHERE state='TX';"))
assert len(result) == 2, "Expecting {} users, got {}".format(2 == len(result))
result = list(session.execute("SELECT * FROM users_by_state WHERE state='CA';"))
assert len(result) == 1, "Expecting {} users, got {}".format(1 == len(result))
result = list(session.execute("SELECT * FROM users_by_state WHERE state='MA';"))
assert len(result) == 0, "Expecting {} users, got {}".format(0 == len(result))
def test_populate_mv_after_insert(self):
"""Test that a view is OK when created with existing data"""
session = self.prepare(consistency_level=ConsistencyLevel.QUORUM)
session.execute("CREATE TABLE t (id int PRIMARY KEY, v int)")
for i in range(1000):
session.execute("INSERT INTO t (id, v) VALUES ({v}, {v})".format(v=i))
session.execute(("CREATE MATERIALIZED VIEW t_by_v AS SELECT * FROM t WHERE v IS NOT NULL "
"AND id IS NOT NULL PRIMARY KEY (v, id)"))
logger.debug("wait for view to build")
self._wait_for_view("ks", "t_by_v")
logger.debug("wait that all batchlogs are replayed")
self._replay_batchlogs()
for i in range(1000):
assert_one(session, "SELECT * FROM t_by_v WHERE v = {}".format(i), [i, i])
@pytest.mark.xfail(reason="Should be addressed with CASSANDRA-15845")
@since('4.0')
def test_populate_mv_after_insert_wide_rows_version40(self):
self.test_populate_mv_after_insert_wide_rows()
@since('3.0', max_version='3.X')
def test_populate_mv_after_insert_wide_rows(self):
"""Test that a view is OK when created with existing data with wide rows"""
session = self.prepare(consistency_level=ConsistencyLevel.QUORUM)
session.execute("CREATE TABLE t (id int, v int, PRIMARY KEY (id, v))")
for i in range(5):
for j in range(10000):
session.execute("INSERT INTO t (id, v) VALUES ({}, {})".format(i, j))
session.execute(("CREATE MATERIALIZED VIEW t_by_v AS SELECT * FROM t WHERE v IS NOT NULL "
"AND id IS NOT NULL PRIMARY KEY (v, id)"))
logger.debug("wait for view to build")
self._wait_for_view("ks", "t_by_v")
logger.debug("wait that all batchlogs are replayed")
self._replay_batchlogs()
for i in range(5):
for j in range(10000):
assert_one(session, "SELECT * FROM t_by_v WHERE id = {} AND v = {}".format(i, j), [j, i])
def test_crc_check_chance(self):
"""Test that crc_check_chance parameter is properly populated after mv creation and update"""
session = self.prepare()
session.execute("CREATE TABLE t (id int PRIMARY KEY, v int)")
session.execute(("CREATE MATERIALIZED VIEW t_by_v AS SELECT * FROM t WHERE v IS NOT NULL "
"AND id IS NOT NULL PRIMARY KEY (v, id) WITH crc_check_chance = 0.5"))
assert_crc_check_chance_equal(session, "t_by_v", 0.5, view=True)
session.execute("ALTER MATERIALIZED VIEW t_by_v WITH crc_check_chance = 0.3")
assert_crc_check_chance_equal(session, "t_by_v", 0.3, view=True)
def test_prepared_statement(self):
"""Test basic insertions with prepared statement"""
session = self.prepare(user_table=True)
insertPrepared = session.prepare(
"INSERT INTO users (username, password, gender, state, birth_year) VALUES (?, ?, ?, ?, ?);"
)
selectPrepared = session.prepare(
"SELECT state, password, session_token FROM users_by_state WHERE state=?;"
)
# insert data
session.execute(insertPrepared.bind(('user1', 'ch@ngem3a', 'f', 'TX', 1968)))
session.execute(insertPrepared.bind(('user2', 'ch@ngem3b', 'm', 'CA', 1971)))
session.execute(insertPrepared.bind(('user3', 'ch@ngem3c', 'f', 'FL', 1978)))
session.execute(insertPrepared.bind(('user4', 'ch@ngem3d', 'm', 'TX', 1974)))
result = list(session.execute("SELECT * FROM users;"))
assert len(result) == 4, "Expecting {} users, got {}".format(4, len(result))
result = list(session.execute(selectPrepared.bind(['TX'])))
assert len(result) == 2, "Expecting {} users, got {}".format(2, len(result))
result = list(session.execute(selectPrepared.bind(['CA'])))
assert len(result) == 1, "Expecting {} users, got {}".format(1, len(result))
result = list(session.execute(selectPrepared.bind(['MA'])))
assert len(result) == 0, "Expecting {} users, got {}".format(0, len(result))
def test_immutable(self):
"""Test that a materialized view is immutable"""
session = self.prepare(user_table=True)
# cannot insert
assert_invalid(session, "INSERT INTO users_by_state (state, username) VALUES ('TX', 'user1');",
"Cannot directly modify a materialized view")
# cannot update
assert_invalid(session, "UPDATE users_by_state SET session_token='XYZ' WHERE username='user1' AND state = 'TX';",
"Cannot directly modify a materialized view")
# cannot delete a row
assert_invalid(session, "DELETE from users_by_state where state='TX';",
"Cannot directly modify a materialized view")
# cannot delete a cell
assert_invalid(session, "DELETE session_token from users_by_state where state='TX';",
"Cannot directly modify a materialized view")
# cannot alter a table
assert_invalid(session, "ALTER TABLE users_by_state ADD first_name varchar",
"Cannot use ALTER TABLE on Materialized View")
def test_drop_mv(self):
"""Test that we can drop a view properly"""
session = self.prepare(user_table=True)
# create another materialized view
session.execute(("CREATE MATERIALIZED VIEW users_by_birth_year AS "
"SELECT * FROM users WHERE birth_year IS NOT NULL AND "
"username IS NOT NULL PRIMARY KEY (birth_year, username)"))
result = list(session.execute(("SELECT * FROM system_schema.views "
"WHERE keyspace_name='ks' AND base_table_name='users' ALLOW FILTERING")))
assert len(result) == 2, "Expecting {} materialized view, got {}".format(2, len(result))
session.execute("DROP MATERIALIZED VIEW ks.users_by_state;")
result = list(session.execute(("SELECT * FROM system_schema.views "
"WHERE keyspace_name='ks' AND base_table_name='users' ALLOW FILTERING")))
assert len(result) == 1, "Expecting {} materialized view, got {}".format(1, len(result))
def test_drop_column(self):
"""Test that we cannot drop a column if it is used by a MV"""
session = self.prepare(user_table=True)
result = list(session.execute(("SELECT * FROM system_schema.views "
"WHERE keyspace_name='ks' AND base_table_name='users' ALLOW FILTERING")))
assert len(result) == 1, "Expecting {} materialized view, got {}".format(1, len(result))
assert_invalid(
session,
"ALTER TABLE ks.users DROP state;",
"Cannot drop column state on base table with materialized views."
)
def test_drop_table(self):
"""Test that we cannot drop a table without deleting its MVs first"""
session = self.prepare(user_table=True)
result = list(session.execute(("SELECT * FROM system_schema.views "
"WHERE keyspace_name='ks' AND base_table_name='users' ALLOW FILTERING")))
assert len(result) == 1, "Expecting {} materialized view, got {}".format(1, len(result))
assert_invalid(
session,
"DROP TABLE ks.users;",
"Cannot drop table when materialized views still depend on it"
)
result = list(session.execute(("SELECT * FROM system_schema.views "
"WHERE keyspace_name='ks' AND base_table_name='users' ALLOW FILTERING")))
assert len(result) == 1, "Expecting {} materialized view, got {}".format(1, len(result))
session.execute("DROP MATERIALIZED VIEW ks.users_by_state;")
session.execute("DROP TABLE ks.users;")
result = list(session.execute(("SELECT * FROM system_schema.views "
"WHERE keyspace_name='ks' AND base_table_name='users' ALLOW FILTERING")))
assert len(result) == 0, "Expecting {} materialized view, got {}".format(1, len(result))
def test_clustering_column(self):
"""Test that we can use clustering columns as primary key for a materialized view"""
session = self.prepare(consistency_level=ConsistencyLevel.QUORUM)
session.execute(("CREATE TABLE users (username varchar, password varchar, gender varchar, "
"session_token varchar, state varchar, birth_year bigint, "
"PRIMARY KEY (username, state, birth_year));"))
# create a materialized view that use a compound key
session.execute(("CREATE MATERIALIZED VIEW users_by_state_birth_year "
"AS SELECT * FROM users WHERE state IS NOT NULL AND birth_year IS NOT NULL "
"AND username IS NOT NULL PRIMARY KEY (state, birth_year, username)"))
session.cluster.control_connection.wait_for_schema_agreement()
self._insert_data(session)
result = list(session.execute("SELECT * FROM ks.users_by_state_birth_year WHERE state='TX'"))
assert len(result) == 2, "Expecting {} users, got {}".format(2, len(result))
result = list(session.execute("SELECT * FROM ks.users_by_state_birth_year WHERE state='TX' AND birth_year=1968"))
assert len(result) == 1, "Expecting {} users, got {}".format(1, len(result))
def _add_dc_after_mv_test(self, rf, nts):
"""
@jira_ticket CASSANDRA-10978
Add datacenter with configurable replication.
"""
session = self.prepare(rf=rf)
logger.debug("Creating schema")
session.execute("CREATE TABLE t (id int PRIMARY KEY, v int)")
session.execute(("CREATE MATERIALIZED VIEW t_by_v AS SELECT * FROM t "
"WHERE v IS NOT NULL AND id IS NOT NULL PRIMARY KEY (v, id)"))
logger.debug("Writing 1k to base")
for i in range(1000):
session.execute("INSERT INTO t (id, v) VALUES ({id}, {v})".format(id=i, v=-i))
logger.debug("Reading 1k from view")
for i in range(1000):
assert_one(session, "SELECT * FROM t_by_v WHERE v = {}".format(-i), [-i, i])
logger.debug("Reading 1k from base")
for i in range(1000):
assert_one(session, "SELECT * FROM t WHERE id = {}".format(i), [i, -i])
logger.debug("Bootstrapping new node in another dc")
node4 = new_node(self.cluster, data_center='dc2')
node4.start(wait_for_binary_proto=True, jvm_args=["-Dcassandra.migration_task_wait_in_seconds={}".format(MIGRATION_WAIT)])
logger.debug("Bootstrapping new node in another dc")
node5 = new_node(self.cluster, remote_debug_port='1414', data_center='dc2')
node5.start(jvm_args=["-Dcassandra.migration_task_wait_in_seconds={}".format(MIGRATION_WAIT)], wait_for_binary_proto=True)
if nts:
session.execute("alter keyspace ks with replication = {'class':'NetworkTopologyStrategy', 'dc1':1, 'dc2':1}")
session.execute("alter keyspace system_auth with replication = {'class':'NetworkTopologyStrategy', 'dc1':1, 'dc2':1}")
session.execute("alter keyspace system_traces with replication = {'class':'NetworkTopologyStrategy', 'dc1':1, 'dc2':1}")
node4.nodetool('rebuild dc1')
node5.nodetool('rebuild dc1')
cl = ConsistencyLevel.LOCAL_ONE if nts else ConsistencyLevel.ONE
session2 = self.patient_exclusive_cql_connection(node4, consistency_level=cl)
logger.debug("Verifying data from new node in view")
for i in range(1000):
assert_one(session2, "SELECT * FROM ks.t_by_v WHERE v = {}".format(-i), [-i, i])
logger.debug("Inserting 100 into base")
for i in range(1000, 1100):
session.execute("INSERT INTO t (id, v) VALUES ({id}, {v})".format(id=i, v=-i))
logger.debug("Verify 100 in view")
for i in range(1000, 1100):
assert_one(session, "SELECT * FROM t_by_v WHERE v = {}".format(-i), [-i, i])
@pytest.mark.resource_intensive
def test_add_dc_after_mv_simple_replication(self):
"""
@jira_ticket CASSANDRA-10634
Test that materialized views work as expected when adding a datacenter with SimpleStrategy.
"""
self._add_dc_after_mv_test(1, False)
@pytest.mark.resource_intensive
def test_add_dc_after_mv_network_replication(self):
"""
@jira_ticket CASSANDRA-10634
Test that materialized views work as expected when adding a datacenter with NetworkTopologyStrategy.
"""
self._add_dc_after_mv_test({'dc1': 1}, True)
@pytest.mark.resource_intensive
def test_add_node_after_mv(self):
"""
@jira_ticket CASSANDRA-10978
Test that materialized views work as expected when adding a node.
"""
session = self.prepare()
session.execute("CREATE TABLE t (id int PRIMARY KEY, v int)")
session.execute(("CREATE MATERIALIZED VIEW t_by_v AS SELECT * FROM t "
"WHERE v IS NOT NULL AND id IS NOT NULL PRIMARY KEY (v, id)"))
for i in range(1000):
session.execute("INSERT INTO t (id, v) VALUES ({id}, {v})".format(id=i, v=-i))
for i in range(1000):
assert_one(session, "SELECT * FROM t_by_v WHERE v = {}".format(-i), [-i, i])
node4 = new_node(self.cluster, data_center="dc1")
node4.start(wait_for_binary_proto=True, jvm_args=["-Dcassandra.migration_task_wait_in_seconds={}".format(MIGRATION_WAIT)])
session2 = self.patient_exclusive_cql_connection(node4)
"""
@jira_ticket CASSANDRA-12984
Assert that MVs are marked as build after bootstrap. Otherwise newly streamed MVs will be built again
"""
assert_one(session2, "SELECT count(*) FROM system.built_views WHERE keyspace_name = 'ks' AND view_name = 't_by_v'", [1])
for i in range(1000):
assert_one(session2, "SELECT * FROM ks.t_by_v WHERE v = {}".format(-i), [-i, i])
for i in range(1000, 1100):
session.execute("INSERT INTO t (id, v) VALUES ({id}, {v})".format(id=i, v=-i))
for i in range(1000, 1100):
assert_one(session, "SELECT * FROM t_by_v WHERE v = {}".format(-i), [-i, i])
def test_insert_during_range_movement_rf1(self):
self._base_test_insert_during_range_movement(rf=1)
def test_insert_during_range_movement_rf2(self):
self._base_test_insert_during_range_movement(rf=2)
def test_insert_during_range_movement_rf3(self):
self._base_test_insert_during_range_movement(rf=3)
def _base_test_insert_during_range_movement(self, rf):
"""
@jira_ticket CASSANDRA-14251
Test that materialized views replication work in the middle of a join
for different replication factors.
"""
session = self.prepare(rf=rf)
logger.debug("Creating table and view")
session.execute("CREATE TABLE t (id int PRIMARY KEY, v int)")
session.execute(("CREATE MATERIALIZED VIEW t_by_v AS SELECT * FROM t "
"WHERE v IS NOT NULL AND id IS NOT NULL PRIMARY KEY (v, id)"))
logger.debug("Starting new node4 in write survey mode")
node4 = new_node(self.cluster, data_center="dc1")
# Set batchlog.replay_timeout_seconds=1 so we can ensure batchlog will be replayed below
node4.start(wait_for_binary_proto=True, jvm_args=["-Dcassandra.write_survey=true",
"-Dcassandra.batchlog.replay_timeout_in_ms=1"])
logger.debug("Insert data while node4 is joining")
for i in range(1000):
session.execute("INSERT INTO t (id, v) VALUES ({id}, {v})".format(id=i, v=-i))
logger.debug("Finish joining node4")
node4.nodetool("join")
logger.debug('Replay batchlogs')
time.sleep(0.001) # Wait batchlog.replay_timeout_in_ms=1 (ms)
self._replay_batchlogs()
logger.debug("Verify data")
for i in range(1000):
assert_one(session, "SELECT * FROM t_by_v WHERE v = {}".format(-i), [-i, i])
@pytest.mark.resource_intensive
def test_add_node_after_wide_mv_with_range_deletions(self):
"""
@jira_ticket CASSANDRA-11670
Test that materialized views work with wide materialized views as expected when adding a node.
"""
session = self.prepare()
session.execute("CREATE TABLE t (id int, v int, PRIMARY KEY (id, v)) WITH compaction = { 'class': 'SizeTieredCompactionStrategy', 'enabled': 'false' }")
session.execute(("CREATE MATERIALIZED VIEW t_by_v AS SELECT * FROM t "
"WHERE v IS NOT NULL AND id IS NOT NULL PRIMARY KEY (v, id)"))
for i in range(10):
for j in range(100):
session.execute("INSERT INTO t (id, v) VALUES ({id}, {v})".format(id=i, v=j))
self.cluster.flush()
for i in range(10):
for j in range(100):
assert_one(session, "SELECT * FROM t WHERE id = {} and v = {}".format(i, j), [i, j])
assert_one(session, "SELECT * FROM t_by_v WHERE id = {} and v = {}".format(i, j), [j, i])
for i in range(10):
for j in range(100):
if j % 10 == 0:
session.execute("DELETE FROM t WHERE id = {} AND v >= {} and v < {}".format(i, j, j + 2))
self.cluster.flush()
for i in range(10):
for j in range(100):
if j % 10 == 0 or (j - 1) % 10 == 0:
assert_none(session, "SELECT * FROM t WHERE id = {} and v = {}".format(i, j))
assert_none(session, "SELECT * FROM t_by_v WHERE id = {} and v = {}".format(i, j))
else:
assert_one(session, "SELECT * FROM t WHERE id = {} and v = {}".format(i, j), [i, j])
assert_one(session, "SELECT * FROM t_by_v WHERE id = {} and v = {}".format(i, j), [j, i])
node4 = new_node(self.cluster, data_center="dc1")
node4.set_configuration_options(values={'max_mutation_size_in_kb': 20}) # CASSANDRA-11670
logger.debug("Start join at {}".format(time.strftime("%H:%M:%S")))
node4.start(wait_for_binary_proto=True, jvm_args=["-Dcassandra.migration_task_wait_in_seconds={}".format(MIGRATION_WAIT)])
session2 = self.patient_exclusive_cql_connection(node4)
for i in range(10):
for j in range(100):
if j % 10 == 0 or (j - 1) % 10 == 0:
assert_none(session2, "SELECT * FROM ks.t WHERE id = {} and v = {}".format(i, j))
assert_none(session2, "SELECT * FROM ks.t_by_v WHERE id = {} and v = {}".format(i, j))
else:
assert_one(session2, "SELECT * FROM ks.t WHERE id = {} and v = {}".format(i, j), [i, j])
assert_one(session2, "SELECT * FROM ks.t_by_v WHERE id = {} and v = {}".format(i, j), [j, i])
for i in range(10):
for j in range(100, 110):
session.execute("INSERT INTO t (id, v) VALUES ({id}, {v})".format(id=i, v=j))
for i in range(10):
for j in range(110):
if j < 100 and (j % 10 == 0 or (j - 1) % 10 == 0):
assert_none(session2, "SELECT * FROM ks.t WHERE id = {} and v = {}".format(i, j))
assert_none(session2, "SELECT * FROM ks.t_by_v WHERE id = {} and v = {}".format(i, j))
else:
assert_one(session2, "SELECT * FROM ks.t WHERE id = {} and v = {}".format(i, j), [i, j])
assert_one(session2, "SELECT * FROM ks.t_by_v WHERE id = {} and v = {}".format(i, j), [j, i])
@pytest.mark.resource_intensive
def test_add_node_after_very_wide_mv(self):
"""
@jira_ticket CASSANDRA-11670
Test that materialized views work with very wide materialized views as expected when adding a node.
"""
session = self.prepare()
session.execute("CREATE TABLE t (id int, v int, PRIMARY KEY (id, v))")
session.execute(("CREATE MATERIALIZED VIEW t_by_v AS SELECT * FROM t "
"WHERE v IS NOT NULL AND id IS NOT NULL PRIMARY KEY (v, id)"))
for i in range(5):
for j in range(5000):
session.execute("INSERT INTO t (id, v) VALUES ({id}, {v})".format(id=i, v=j))
self.cluster.flush()
for i in range(5):
for j in range(5000):
assert_one(session, "SELECT * FROM t_by_v WHERE id = {} and v = {}".format(i, j), [j, i])
node4 = new_node(self.cluster, data_center="dc1")
node4.set_configuration_options(values={'max_mutation_size_in_kb': 20}) # CASSANDRA-11670
logger.debug("Start join at {}".format(time.strftime("%H:%M:%S")))
node4.start(wait_for_binary_proto=True, jvm_args=["-Dcassandra.migration_task_wait_in_seconds={}".format(MIGRATION_WAIT)])
session2 = self.patient_exclusive_cql_connection(node4)
for i in range(5):
for j in range(5000):
assert_one(session2, "SELECT * FROM ks.t_by_v WHERE id = {} and v = {}".format(i, j), [j, i])
for i in range(5):
for j in range(5100):
session.execute("INSERT INTO t (id, v) VALUES ({id}, {v})".format(id=i, v=j))
for i in range(5):
for j in range(5100):
assert_one(session, "SELECT * FROM t_by_v WHERE id = {} and v = {}".format(i, j), [j, i])
@pytest.mark.resource_intensive
def test_add_write_survey_node_after_mv(self):
"""
@jira_ticket CASSANDRA-10621
@jira_ticket CASSANDRA-10978
Test that materialized views work as expected when adding a node in write survey mode.
"""
session = self.prepare()
session.execute("CREATE TABLE t (id int PRIMARY KEY, v int)")
session.execute(("CREATE MATERIALIZED VIEW t_by_v AS SELECT * FROM t "
"WHERE v IS NOT NULL AND id IS NOT NULL PRIMARY KEY (v, id)"))
for i in range(1000):
session.execute("INSERT INTO t (id, v) VALUES ({id}, {v})".format(id=i, v=-i))
for i in range(1000):
assert_one(session, "SELECT * FROM t_by_v WHERE v = {}".format(-i), [-i, i])
node4 = new_node(self.cluster, data_center="dc1")
node4.start(wait_for_binary_proto=True, jvm_args=["-Dcassandra.write_survey=true", "-Dcassandra.migration_task_wait_in_seconds={}".format(MIGRATION_WAIT)])
for i in range(1000, 1100):
session.execute("INSERT INTO t (id, v) VALUES ({id}, {v})".format(id=i, v=-i))
for i in range(1100):
assert_one(session, "SELECT * FROM t_by_v WHERE v = {}".format(-i), [-i, i])
def test_allow_filtering(self):
"""Test that allow filtering works as usual for a materialized view"""
session = self.prepare()
session.execute("CREATE TABLE t (id int PRIMARY KEY, v int, v2 text, v3 decimal)")
session.execute(("CREATE MATERIALIZED VIEW t_by_v AS SELECT * FROM t "
"WHERE v IS NOT NULL AND id IS NOT NULL PRIMARY KEY (v, id)"))
session.execute(("CREATE MATERIALIZED VIEW t_by_v2 AS SELECT * FROM t "
"WHERE v2 IS NOT NULL AND id IS NOT NULL PRIMARY KEY (v2, id)"))
for i in range(1000):
session.execute("INSERT INTO t (id, v, v2, v3) VALUES ({v}, {v}, 'a', 3.0)".format(v=i))
for i in range(1000):
assert_one(session, "SELECT * FROM t_by_v WHERE v = {v}".format(v=i), [i, i, 'a', 3.0])
rows = list(session.execute("SELECT * FROM t_by_v2 WHERE v2 = 'a'"))
assert len(rows) == 1000, "Expected 1000 rows but got {}".format(len(rows))
assert_invalid(session, "SELECT * FROM t_by_v WHERE v = 1 AND v2 = 'a'")
assert_invalid(session, "SELECT * FROM t_by_v2 WHERE v2 = 'a' AND v = 1")
for i in range(1000):
assert_one(
session,
"SELECT * FROM t_by_v WHERE v = {} AND v3 = 3.0 ALLOW FILTERING".format(i),
[i, i, 'a', 3.0]
)
assert_one(
session,
"SELECT * FROM t_by_v2 WHERE v2 = 'a' AND v = {} ALLOW FILTERING".format(i),
['a', i, i, 3.0]
)
def test_secondary_index(self):
"""Test that secondary indexes cannot be created on a materialized view"""
session = self.prepare()
session.execute("CREATE TABLE t (id int PRIMARY KEY, v int, v2 text, v3 decimal)")
session.execute(("CREATE MATERIALIZED VIEW t_by_v AS SELECT * FROM t "
"WHERE v IS NOT NULL AND id IS NOT NULL PRIMARY KEY (v, id)"))
assert_invalid(session, "CREATE INDEX ON t_by_v (v2)",
"Secondary indexes are not supported on materialized views")
def test_ttl(self):
"""
Test that TTL works as expected for a materialized view
@expected_result The TTL is propagated properly between tables.
"""
session = self.prepare()
session.execute("CREATE TABLE t (id int PRIMARY KEY, v int, v2 int, v3 int)")
session.execute(("CREATE MATERIALIZED VIEW t_by_v2 AS SELECT * FROM t "
"WHERE v2 IS NOT NULL AND id IS NOT NULL PRIMARY KEY (v2, id)"))
for i in range(100):
session.execute("INSERT INTO t (id, v, v2, v3) VALUES ({v}, {v}, {v}, {v}) USING TTL 10".format(v=i))
for i in range(100):
assert_one(session, "SELECT * FROM t_by_v2 WHERE v2 = {}".format(i), [i, i, i, i])
time.sleep(20)
rows = list(session.execute("SELECT * FROM t_by_v2"))
assert len(rows) == 0, "Expected 0 rows but got {}".format(len(rows))
def test_query_all_new_column(self):
"""
Test that a materialized view created with a 'SELECT *' works as expected when adding a new column
@expected_result The new column is present in the view.
"""
session = self.prepare(user_table=True)
self._insert_data(session)
assert_one(
session,
"SELECT * FROM users_by_state WHERE state = 'TX' AND username = 'user1'",
['TX', 'user1', 1968, 'f', 'ch@ngem3a', None]
)
session.execute("ALTER TABLE users ADD first_name varchar;")
results = list(session.execute("SELECT * FROM users_by_state WHERE state = 'TX' AND username = 'user1'"))
assert len(results) == 1
assert hasattr(results[0], 'first_name'), 'Column "first_name" not found'
assert_one(
session,
"SELECT * FROM users_by_state WHERE state = 'TX' AND username = 'user1'",
['TX', 'user1', 1968, None, 'f', 'ch@ngem3a', None]
)
def test_query_new_column(self):
"""
Test that a materialized view created with 'SELECT <col1, ...>' works as expected when adding a new column
@expected_result The new column is not present in the view.
"""
session = self.prepare(user_table=True)
session.execute(("CREATE MATERIALIZED VIEW users_by_state2 AS SELECT state, username FROM users "
"WHERE STATE IS NOT NULL AND USERNAME IS NOT NULL PRIMARY KEY (state, username)"))
self._insert_data(session)
assert_one(
session,
"SELECT * FROM users_by_state2 WHERE state = 'TX' AND username = 'user1'",
['TX', 'user1']
)
session.execute("ALTER TABLE users ADD first_name varchar;")
results = list(session.execute("SELECT * FROM users_by_state2 WHERE state = 'TX' AND username = 'user1'"))
assert len(results) == 1
assert not hasattr(results[0], 'first_name'), 'Column "first_name" found in view'
assert_one(
session,
"SELECT * FROM users_by_state2 WHERE state = 'TX' AND username = 'user1'",
['TX', 'user1']
)
def test_rename_column(self):
"""
Test that a materialized view created with a 'SELECT *' works as expected when renaming a column
@expected_result The column is also renamed in the view.
"""
session = self.prepare(user_table=True)
self._insert_data(session)
assert_one(
session,
"SELECT * FROM users_by_state WHERE state = 'TX' AND username = 'user1'",
['TX', 'user1', 1968, 'f', 'ch@ngem3a', None]
)
session.execute("ALTER TABLE users RENAME username TO user")
results = list(session.execute("SELECT * FROM users_by_state WHERE state = 'TX' AND user = 'user1'"))
assert len(results) == 1
assert hasattr(results[0], 'user'), 'Column "user" not found'
assert_one(
session,
"SELECT state, user, birth_year, gender FROM users_by_state WHERE state = 'TX' AND user = 'user1'",
['TX', 'user1', 1968, 'f']
)
def test_rename_column_atomicity(self):
"""
Test that column renaming is atomically done between a table and its materialized views
@jira_ticket CASSANDRA-12952
"""
session = self.prepare(nodes=1, user_table=True, install_byteman=True)
node = self.cluster.nodelist()[0]
self._insert_data(session)
assert_one(
session,
"SELECT * FROM users_by_state WHERE state = 'TX' AND username = 'user1'",
['TX', 'user1', 1968, 'f', 'ch@ngem3a', None]
)
# Rename a column with an injected byteman rule to kill the node after the first schema update
self.fixture_dtest_setup.allow_log_errors = True
script_version = '4x' if self.cluster.version() >= '4' else '3x'
node.byteman_submit(['./byteman/merge_schema_failure_{}.btm'.format(script_version)])
with pytest.raises(NoHostAvailable):
session.execute("ALTER TABLE users RENAME username TO user")
logger.debug('Restarting node')
node.stop()
node.start(wait_for_binary_proto=True)
session = self.patient_cql_connection(node, consistency_level=ConsistencyLevel.ONE)
# Both the table and its view should have the new schema after restart
assert_one(
session,
"SELECT * FROM ks.users WHERE state = 'TX' AND user = 'user1' ALLOW FILTERING",
['user1', 1968, 'f', 'ch@ngem3a', None, 'TX']
)
assert_one(
session,
"SELECT * FROM ks.users_by_state WHERE state = 'TX' AND user = 'user1'",
['TX', 'user1', 1968, 'f', 'ch@ngem3a', None]
)
def test_lwt(self):
"""Test that lightweight transaction behave properly with a materialized view"""
session = self.prepare()
session.execute("CREATE TABLE t (id int PRIMARY KEY, v int, v2 text, v3 decimal)")
session.execute(("CREATE MATERIALIZED VIEW t_by_v AS SELECT * FROM t "
"WHERE v IS NOT NULL AND id IS NOT NULL PRIMARY KEY (v, id)"))
logger.debug("Inserting initial data using IF NOT EXISTS")
for i in range(1000):
session.execute(
"INSERT INTO t (id, v, v2, v3) VALUES ({v}, {v}, 'a', 3.0) IF NOT EXISTS".format(v=i)
)
self._replay_batchlogs()
logger.debug("All rows should have been inserted")
for i in range(1000):
assert_one(
session,
"SELECT * FROM t_by_v WHERE v = {}".format(i),
[i, i, 'a', 3.0]
)
logger.debug("Tyring to UpInsert data with a different value using IF NOT EXISTS")
for i in range(1000):
v = i * 2
session.execute(
"INSERT INTO t (id, v, v2, v3) VALUES ({id}, {v}, 'a', 3.0) IF NOT EXISTS".format(id=i, v=v)
)
self._replay_batchlogs()
logger.debug("No rows should have changed")
for i in range(1000):
assert_one(
session,
"SELECT * FROM t_by_v WHERE v = {}".format(i),
[i, i, 'a', 3.0]
)
logger.debug("Update the 10 first rows with a different value")
for i in range(1000):
v = i + 2000
session.execute(
"UPDATE t SET v={v} WHERE id = {id} IF v < 10".format(id=i, v=v)
)
self._replay_batchlogs()
logger.debug("Verify that only the 10 first rows changed.")
results = list(session.execute("SELECT * FROM t_by_v;"))
assert len(results) == 1000
for i in range(1000):
v = i + 2000 if i < 10 else i
assert_one(
session,
"SELECT * FROM t_by_v WHERE v = {}".format(v),
[v, i, 'a', 3.0]
)
logger.debug("Deleting the first 10 rows")
for i in range(1000):
v = i + 2000
session.execute(
"DELETE FROM t WHERE id = {id} IF v = {v} ".format(id=i, v=v)
)
self._replay_batchlogs()
logger.debug("Verify that only the 10 first rows have been deleted.")
results = list(session.execute("SELECT * FROM t_by_v;"))
assert len(results) == 990
for i in range(10, 1000):
assert_one(
session,
"SELECT * FROM t_by_v WHERE v = {}".format(i),
[i, i, 'a', 3.0]
)
def test_interrupt_build_process(self):
"""Test that an interrupted MV build process is resumed as it should"""
options = {'hinted_handoff_enabled': False}
if self.cluster.version() >= '4':
options['concurrent_materialized_view_builders'] = 4
session = self.prepare(options=options, install_byteman=True)
node1, node2, node3 = self.cluster.nodelist()
logger.debug("Avoid premature MV build finalization with byteman")
for node in self.cluster.nodelist():
if self.cluster.version() >= '4':
node.byteman_submit(['./byteman/4.0/skip_view_build_finalization.btm'])
node.byteman_submit(['./byteman/4.0/skip_view_build_task_finalization.btm'])
else:
node.byteman_submit(['./byteman/pre4.0/skip_finish_view_build_status.btm'])
node.byteman_submit(['./byteman/pre4.0/skip_view_build_update_distributed.btm'])
session.execute("CREATE TABLE t (id int PRIMARY KEY, v int, v2 text, v3 decimal)")
logger.debug("Inserting initial data")
for i in range(10000):
session.execute(
"INSERT INTO t (id, v, v2, v3) VALUES ({v}, {v}, 'a', 3.0) IF NOT EXISTS".format(v=i)
)
logger.debug("Create a MV")
session.execute(("CREATE MATERIALIZED VIEW t_by_v AS SELECT * FROM t "
"WHERE v IS NOT NULL AND id IS NOT NULL PRIMARY KEY (v, id)"))
logger.debug("Wait and ensure the MV build has started. Waiting up to 2 minutes.")
self._wait_for_view_build_start(session, 'ks', 't_by_v', wait_minutes=2)
logger.debug("Stop the cluster. Interrupt the MV build process.")
self.cluster.stop()
logger.debug("Checking logs to verify that the view build tasks have been created")
for node in self.cluster.nodelist():
assert node.grep_log('Starting new view build', filename='debug.log')
assert not node.grep_log('Resuming view build', filename='debug.log')
node.mark_log(filename='debug.log')
logger.debug("Restart the cluster")
self.cluster.start()
session = self.patient_cql_connection(node1)
session.execute("USE ks")
logger.debug("MV shouldn't be built yet.")
assert len(list(session.execute("SELECT COUNT(*) FROM t_by_v"))) != 10000
logger.debug("Wait and ensure the MV build resumed. Waiting up to 2 minutes.")
self._wait_for_view("ks", "t_by_v")
logger.debug("Verify all data")
assert_one(session, "SELECT COUNT(*) FROM t_by_v", [10000])
for i in range(10000):
assert_one(
session,
"SELECT * FROM t_by_v WHERE v = {}".format(i),
[i, i, 'a', 3.0],
cl=ConsistencyLevel.ALL
)
logger.debug("Checking logs to verify that some view build tasks have been resumed")
for node in self.cluster.nodelist():
assert node.grep_log('Resuming view build', filename='debug.log')
@pytest.mark.skip(reason="Frequently fails in CI. Skipping until fixed as tracked by CASSANDRA-14148")
@since('4.0')
def test_drop_while_building(self):
"""Test that a parallel MV build is interrupted when the view is removed"""
session = self.prepare(options={'concurrent_materialized_view_builders': 4}, install_byteman=True)
session.execute("CREATE TABLE t (id int PRIMARY KEY, v int, v2 text, v3 decimal)")
logger.debug("Inserting initial data")
for i in range(5000):
session.execute("INSERT INTO t (id, v, v2, v3) VALUES ({v}, {v}, 'a', 3.0) IF NOT EXISTS".format(v=i))
logger.debug("Slowing down MV build with byteman")
for node in self.cluster.nodelist():
node.byteman_submit(['./byteman/4.0/view_builder_task_sleep.btm'])
logger.debug("Create a MV")
session.execute(("CREATE MATERIALIZED VIEW t_by_v AS SELECT * FROM t "
"WHERE v IS NOT NULL AND id IS NOT NULL PRIMARY KEY (v, id)"))
logger.debug("Wait and ensure the MV build has started. Waiting up to 2 minutes.")
self._wait_for_view_build_start(session, 'ks', 't_by_v', wait_minutes=2)
logger.debug("Drop the MV while it is still building")
session.execute("DROP MATERIALIZED VIEW t_by_v")
logger.debug("Verify that the build has been stopped before its finalization without errors")
for node in self.cluster.nodelist():
self.check_logs_for_errors()
assert not node.grep_log('Marking view', filename='debug.log')
assert node.grep_log('Stopping current view builder due to schema change', filename='debug.log')
logger.debug("Verify that the view has been removed")
failed = False
try:
session.execute("SELECT COUNT(*) FROM t_by_v")
except InvalidRequest:
failed = True
self.assertTrue(failed, "The view shouldn't be queryable")
self._assert_view_meta(session, views=1, exists=False)
logger.debug("Create the MV again")
session.execute(("CREATE MATERIALIZED VIEW t_by_v AS SELECT * FROM t "
"WHERE v IS NOT NULL AND id IS NOT NULL PRIMARY KEY (v, id)"))
logger.debug("Verify that the MV has been successfully created")
self._wait_for_view('ks', 't_by_v')
assert_one(session, "SELECT COUNT(*) FROM t_by_v", [5000])
@since('4.0')
def test_drop_with_stopped_build(self):
"""Test that MV whose build has been stopped with `nodetool stop` can be dropped"""
session = self.prepare(options={'concurrent_materialized_view_builders': 4}, install_byteman=True)
session.execute("CREATE TABLE t (id int PRIMARY KEY, v int, v2 text, v3 decimal)")
nodes = self.cluster.nodelist()
logger.debug("Inserting initial data")
for i in range(5000):
session.execute("INSERT INTO t (id, v, v2, v3) VALUES ({v}, {v}, 'a', 3.0) IF NOT EXISTS".format(v=i))
logger.debug("Slowing down MV build with byteman")
for node in nodes:
node.byteman_submit(['./byteman/4.0/view_builder_task_sleep.btm'])
logger.debug("Create a MV")
session.execute(("CREATE MATERIALIZED VIEW t_by_v AS SELECT * FROM t "
"WHERE v IS NOT NULL AND id IS NOT NULL PRIMARY KEY (v, id)"))
logger.debug("Wait and ensure the MV build has started. Waiting up to 2 minutes.")
self._wait_for_view_build_start(session, 'ks', 't_by_v', wait_minutes=2)
logger.debug("Stopping all running view build tasks with nodetool")
for node in nodes:
node.watch_log_for('Starting new view build for range', filename='debug.log', timeout=120)
node.nodetool('stop VIEW_BUILD')
logger.debug("Checking logs to verify that some view build tasks have been stopped")
for node in nodes:
node.watch_log_for('Stopped build for view', filename='debug.log', timeout=120)
node.watch_log_for('Compaction interrupted: View build', filename='system.log', timeout=120)
self.check_logs_for_errors()
logger.debug("Drop the MV while it is still building")
session.execute("DROP MATERIALIZED VIEW t_by_v")
logger.debug("Verify that the build has been stopped before its finalization without errors")
for node in nodes:
self.check_logs_for_errors()
assert not node.grep_log('Marking view', filename='debug.log')
assert node.grep_log('Stopping current view builder due to schema change', filename='debug.log')
logger.debug("Verify that the view has been removed")
failed = False
try:
session.execute("SELECT COUNT(*) FROM t_by_v")
except InvalidRequest:
failed = True
assert failed, "The view shouldn't be queryable"
logger.debug("Create the MV again")
session.execute(("CREATE MATERIALIZED VIEW t_by_v AS SELECT * FROM t "
"WHERE v IS NOT NULL AND id IS NOT NULL PRIMARY KEY (v, id)"))
logger.debug("Verify that the MV has been successfully created")
self._wait_for_view('ks', 't_by_v')
assert_one(session, "SELECT COUNT(*) FROM t_by_v", [5000])
@since('4.0')
def test_resume_stopped_build(self):
"""Test that MV builds stopped with `nodetool stop` are resumed after restart"""
session = self.prepare(options={'concurrent_materialized_view_builders': 4}, install_byteman=True)
session.execute("CREATE TABLE t (id int PRIMARY KEY, v int, v2 text, v3 decimal)")
nodes = self.cluster.nodelist()
logger.debug("Inserting initial data")
for i in range(5000):
session.execute("INSERT INTO t (id, v, v2, v3) VALUES ({v}, {v}, 'a', 3.0) IF NOT EXISTS".format(v=i))
logger.debug("Slowing down MV build with byteman")
for node in nodes:
node.byteman_submit(['./byteman/4.0/view_builder_task_sleep.btm'])
logger.debug("Create a MV")
session.execute(("CREATE MATERIALIZED VIEW t_by_v AS SELECT * FROM t "
"WHERE v IS NOT NULL AND id IS NOT NULL PRIMARY KEY (v, id)"))
logger.debug("Wait and ensure the MV build has started. Waiting up to 2 minutes.")
self._wait_for_view_build_start(session, 'ks', 't_by_v', wait_minutes=2)
logger.debug("Stopping all running view build tasks with nodetool")
for node in nodes:
node.watch_log_for('Starting new view build for range', filename='debug.log', timeout=120)
node.nodetool('stop VIEW_BUILD')
logger.debug("Checking logs to verify that some view build tasks have been stopped")
for node in nodes:
node.watch_log_for('Stopped build for view', filename='debug.log', timeout=120)
node.watch_log_for('Compaction interrupted: View build', filename='system.log', timeout=120)
node.watch_log_for('Interrupted build for view', filename='debug.log', timeout=120)
assert not node.grep_log('Marking view', filename='debug.log')
self.check_logs_for_errors()
logger.debug("Check that MV shouldn't be built yet.")
assert len(list(session.execute("SELECT COUNT(*) FROM t_by_v"))) != 5000
logger.debug("Restart the cluster")
self.cluster.stop()
marks = [node.mark_log() for node in nodes]
self.cluster.start()
session = self.patient_cql_connection(nodes[0])
logger.debug("Verify that the MV has been successfully created")
self._wait_for_view('ks', 't_by_v')
assert_one(session, "SELECT COUNT(*) FROM ks.t_by_v", [5000])
logger.debug("Checking logs to verify that the view build has been resumed and completed after restart")
for node, mark in zip(nodes, marks):
assert node.grep_log('Resuming view build', filename='debug.log', from_mark=mark)
assert node.grep_log('Marking view', filename='debug.log', from_mark=mark)
self.check_logs_for_errors()
@since('3.0')
def test_mv_with_default_ttl_with_flush(self):
self._test_mv_with_default_ttl(True)
@since('3.0')
def test_mv_with_default_ttl_without_flush(self):
self._test_mv_with_default_ttl(False)
def _test_mv_with_default_ttl(self, flush):
"""
Verify mv with default_time_to_live can be deleted properly using expired livenessInfo
@jira_ticket CASSANDRA-14071
"""
session = self.prepare(rf=3, nodes=3, options={'hinted_handoff_enabled': False}, consistency_level=ConsistencyLevel.QUORUM)
node1, node2, node3 = self.cluster.nodelist()
session.execute('USE ks')
logger.debug("MV with same key and unselected columns")
session.execute("CREATE TABLE t2 (k int, a int, b int, c int, primary key(k, a)) with default_time_to_live=600")
session.execute(("CREATE MATERIALIZED VIEW mv2 AS SELECT k,a,b FROM t2 "
"WHERE k IS NOT NULL AND a IS NOT NULL PRIMARY KEY (a, k)"))
session.cluster.control_connection.wait_for_schema_agreement()
self.update_view(session, "UPDATE t2 SET c=1 WHERE k=1 AND a=1;", flush)
assert_one(session, "SELECT k,a,b,c FROM t2", [1, 1, None, 1])
assert_one(session, "SELECT k,a,b FROM mv2", [1, 1, None])
self.update_view(session, "UPDATE t2 SET c=null WHERE k=1 AND a=1;", flush)
assert_none(session, "SELECT k,a,b,c FROM t2")
assert_none(session, "SELECT k,a,b FROM mv2")
self.update_view(session, "UPDATE t2 SET c=2 WHERE k=1 AND a=1;", flush)
assert_one(session, "SELECT k,a,b,c FROM t2", [1, 1, None, 2])
assert_one(session, "SELECT k,a,b FROM mv2", [1, 1, None])
self.update_view(session, "DELETE c FROM t2 WHERE k=1 AND a=1;", flush)
assert_none(session, "SELECT k,a,b,c FROM t2")
assert_none(session, "SELECT k,a,b FROM mv2")
if flush:
self.cluster.compact()
assert_none(session, "SELECT * FROM t2")
assert_none(session, "SELECT * FROM mv2")
# test with user-provided ttl
self.update_view(session, "INSERT INTO t2(k,a,b,c) VALUES(2,2,2,2) USING TTL 5", flush)
self.update_view(session, "UPDATE t2 USING TTL 100 SET c=1 WHERE k=2 AND a=2;", flush)
self.update_view(session, "UPDATE t2 USING TTL 50 SET c=2 WHERE k=2 AND a=2;", flush)
self.update_view(session, "DELETE c FROM t2 WHERE k=2 AND a=2;", flush)
time.sleep(5)
assert_none(session, "SELECT k,a,b,c FROM t2")
assert_none(session, "SELECT k,a,b FROM mv2")
if flush:
self.cluster.compact()
assert_none(session, "SELECT * FROM t2")
assert_none(session, "SELECT * FROM mv2")
logger.debug("MV with extra key")
session.execute("CREATE TABLE t (k int PRIMARY KEY, a int, b int) with default_time_to_live=600")
session.execute(("CREATE MATERIALIZED VIEW mv AS SELECT * FROM t "
"WHERE k IS NOT NULL AND a IS NOT NULL PRIMARY KEY (k, a)"))
session.cluster.control_connection.wait_for_schema_agreement()
self.update_view(session, "INSERT INTO t (k, a, b) VALUES (1, 1, 1);", flush)
assert_one(session, "SELECT * FROM t", [1, 1, 1])
assert_one(session, "SELECT * FROM mv", [1, 1, 1])
self.update_view(session, "INSERT INTO t (k, a, b) VALUES (1, 2, 1);", flush)
assert_one(session, "SELECT * FROM t", [1, 2, 1])
assert_one(session, "SELECT * FROM mv", [1, 2, 1])
self.update_view(session, "INSERT INTO t (k, a, b) VALUES (1, 3, 1);", flush)
assert_one(session, "SELECT * FROM t", [1, 3, 1])
assert_one(session, "SELECT * FROM mv", [1, 3, 1])
if flush:
self.cluster.compact()
assert_one(session, "SELECT * FROM t", [1, 3, 1])
assert_one(session, "SELECT * FROM mv", [1, 3, 1])
# user provided ttl
self.update_view(session, "UPDATE t USING TTL 50 SET a = 4 WHERE k = 1", flush)
assert_one(session, "SELECT * FROM t", [1, 4, 1])
assert_one(session, "SELECT * FROM mv", [1, 4, 1])
self.update_view(session, "UPDATE t USING TTL 40 SET a = 5 WHERE k = 1", flush)
assert_one(session, "SELECT * FROM t", [1, 5, 1])
assert_one(session, "SELECT * FROM mv", [1, 5, 1])
self.update_view(session, "UPDATE t USING TTL 30 SET a = 6 WHERE k = 1", flush)
assert_one(session, "SELECT * FROM t", [1, 6, 1])
assert_one(session, "SELECT * FROM mv", [1, 6, 1])
if flush:
self.cluster.compact()
assert_one(session, "SELECT * FROM t", [1, 6, 1])
assert_one(session, "SELECT * FROM mv", [1, 6, 1])
@flaky
@since('3.0')
def test_no_base_column_in_view_pk_complex_timestamp_with_flush(self):
self._test_no_base_column_in_view_pk_complex_timestamp(flush=True)
@pytest.mark.skip(reason="Frequently fails in CI. Skipping until fixed as tracked by CASSANDRA-14148")
@since('3.0')
def test_no_base_column_in_view_pk_complex_timestamp_without_flush(self):
self._test_no_base_column_in_view_pk_complex_timestamp(flush=False)
def _test_no_base_column_in_view_pk_complex_timestamp(self, flush):
"""
Able to shadow old view row if all columns in base are removed including unselected
Able to recreate view row if at least one selected column alive
@jira_ticket CASSANDRA-11500
"""
session = self.prepare(rf=3, nodes=3, options={'hinted_handoff_enabled': False}, consistency_level=ConsistencyLevel.QUORUM)
node1, node2, node3 = self.cluster.nodelist()
session.execute('USE ks')
session.execute("CREATE TABLE t (k int, c int, a int, b int, e int, f int, primary key(k, c))")
session.execute(("CREATE MATERIALIZED VIEW mv AS SELECT k,c,a,b FROM t "
"WHERE k IS NOT NULL AND c IS NOT NULL PRIMARY KEY (c, k)"))
session.cluster.control_connection.wait_for_schema_agreement()
# update unselected, view row should be alive
self.update_view(session, "UPDATE t USING TIMESTAMP 1 SET e=1 WHERE k=1 AND c=1;", flush)
assert_one(session, "SELECT * FROM t", [1, 1, None, None, 1, None])
assert_one(session, "SELECT * FROM mv", [1, 1, None, None])
# remove unselected, add selected column, view row should be alive
self.update_view(session, "UPDATE t USING TIMESTAMP 2 SET e=null, b=1 WHERE k=1 AND c=1;", flush)
assert_one(session, "SELECT * FROM t", [1, 1, None, 1, None, None])
assert_one(session, "SELECT * FROM mv", [1, 1, None, 1])
# remove selected column, view row is removed
self.update_view(session, "UPDATE t USING TIMESTAMP 2 SET e=null, b=null WHERE k=1 AND c=1;", flush)
assert_none(session, "SELECT * FROM t")
assert_none(session, "SELECT * FROM mv")
# update unselected with ts=3, view row should be alive
self.update_view(session, "UPDATE t USING TIMESTAMP 3 SET f=1 WHERE k=1 AND c=1;", flush)
assert_one(session, "SELECT * FROM t", [1, 1, None, None, None, 1])
assert_one(session, "SELECT * FROM mv", [1, 1, None, None])
# insert livenesssInfo, view row should be alive
self.update_view(session, "INSERT INTO t(k,c) VALUES(1,1) USING TIMESTAMP 3", flush)
assert_one(session, "SELECT * FROM t", [1, 1, None, None, None, 1])
assert_one(session, "SELECT * FROM mv", [1, 1, None, None])
# remove unselected, view row should be alive because of base livenessInfo alive
self.update_view(session, "UPDATE t USING TIMESTAMP 3 SET f=null WHERE k=1 AND c=1;", flush)
assert_one(session, "SELECT * FROM t", [1, 1, None, None, None, None])
assert_one(session, "SELECT * FROM mv", [1, 1, None, None])
# add selected column, view row should be alive
self.update_view(session, "UPDATE t USING TIMESTAMP 3 SET a=1 WHERE k=1 AND c=1;", flush)
assert_one(session, "SELECT * FROM t", [1, 1, 1, None, None, None])
assert_one(session, "SELECT * FROM mv", [1, 1, 1, None])
# update unselected, view row should be alive
self.update_view(session, "UPDATE t USING TIMESTAMP 4 SET f=1 WHERE k=1 AND c=1;", flush)
assert_one(session, "SELECT * FROM t", [1, 1, 1, None, None, 1])
assert_one(session, "SELECT * FROM mv", [1, 1, 1, None])
# delete with ts=3, view row should be alive due to unselected@ts4
self.update_view(session, "DELETE FROM t USING TIMESTAMP 3 WHERE k=1 AND c=1;", flush)
assert_one(session, "SELECT * FROM t", [1, 1, None, None, None, 1])
assert_one(session, "SELECT * FROM mv", [1, 1, None, None])
# remove unselected, view row should be removed
self.update_view(session, "UPDATE t USING TIMESTAMP 4 SET f=null WHERE k=1 AND c=1;", flush)
assert_none(session, "SELECT * FROM t")
assert_none(session, "SELECT * FROM mv")
# add selected with ts=7, view row is alive
self.update_view(session, "UPDATE t USING TIMESTAMP 7 SET b=1 WHERE k=1 AND c=1;", flush)
assert_one(session, "SELECT * FROM t", [1, 1, None, 1, None, None])
assert_one(session, "SELECT * FROM mv", [1, 1, None, 1])
# remove selected with ts=7, view row is dead
self.update_view(session, "UPDATE t USING TIMESTAMP 7 SET b=null WHERE k=1 AND c=1;", flush)
assert_none(session, "SELECT * FROM t")
assert_none(session, "SELECT * FROM mv")
# add selected with ts=5, view row is alive (selected column should not affects each other)
self.update_view(session, "UPDATE t USING TIMESTAMP 5 SET a=1 WHERE k=1 AND c=1;", flush)
assert_one(session, "SELECT * FROM t", [1, 1, 1, None, None, None])
assert_one(session, "SELECT * FROM mv", [1, 1, 1, None])
# add selected with ttl=20 (we apparently need a long ttl because the flushing etc that self.update_view does can take a long time)
self.update_view(session, "UPDATE t USING TTL 20 SET a=1 WHERE k=1 AND c=1;", flush)
assert_one(session, "SELECT * FROM t", [1, 1, 1, None, None, None])
assert_one(session, "SELECT * FROM mv", [1, 1, 1, None])
time.sleep(20)
# update unselected with ttl=10, view row should be alive
self.update_view(session, "UPDATE t USING TTL 20 SET f=1 WHERE k=1 AND c=1;", flush)
assert_one(session, "SELECT * FROM t", [1, 1, None, None, None, 1])
assert_one(session, "SELECT * FROM mv", [1, 1, None, None])
time.sleep(20)
# view row still alive due to base livenessInfo
assert_none(session, "SELECT * FROM t")
assert_none(session, "SELECT * FROM mv")
@since('3.0')
def test_base_column_in_view_pk_complex_timestamp_with_flush(self):
self._test_base_column_in_view_pk_complex_timestamp(flush=True)
@since('3.0')
def test_base_column_in_view_pk_complex_timestamp_without_flush(self):
self._test_base_column_in_view_pk_complex_timestamp(flush=False)
def _test_base_column_in_view_pk_complex_timestamp(self, flush):
"""
Able to shadow old view row with column ts greater than pk's ts and re-insert the view row
Able to shadow old view row with column ts smaller than pk's ts and re-insert the view row
@jira_ticket CASSANDRA-11500
"""
session = self.prepare(rf=3, nodes=3, options={'hinted_handoff_enabled': False}, consistency_level=ConsistencyLevel.QUORUM)
node1, node2, node3 = self.cluster.nodelist()
session.execute('USE ks')
session.execute("CREATE TABLE t (k int PRIMARY KEY, a int, b int)")
session.execute(("CREATE MATERIALIZED VIEW mv AS SELECT * FROM t "
"WHERE k IS NOT NULL AND a IS NOT NULL PRIMARY KEY (k, a)"))
session.cluster.control_connection.wait_for_schema_agreement()
# Set initial values TS=1
self.update_view(session, "INSERT INTO t (k, a, b) VALUES (1, 1, 1) USING TIMESTAMP 1;", flush)
assert_one(session, "SELECT * FROM t", [1, 1, 1])
assert_one(session, "SELECT * FROM mv", [1, 1, 1])
# increase b ts to 10
self.update_view(session, "UPDATE t USING TIMESTAMP 10 SET b = 2 WHERE k = 1;", flush)
assert_one(session, "SELECT k,a,b,writetime(b) FROM t", [1, 1, 2, 10])
assert_one(session, "SELECT k,a,b,writetime(b) FROM mv", [1, 1, 2, 10])
# switch entries. shadow a = 1, insert a = 2
self.update_view(session, "UPDATE t USING TIMESTAMP 2 SET a = 2 WHERE k = 1;", flush)
assert_one(session, "SELECT k,a,b,writetime(b) FROM t", [1, 2, 2, 10])
assert_one(session, "SELECT k,a,b,writetime(b) FROM mv", [1, 2, 2, 10])
# switch entries. shadow a = 2, insert a = 1
self.update_view(session, "UPDATE t USING TIMESTAMP 3 SET a = 1 WHERE k = 1;", flush)
assert_one(session, "SELECT k,a,b,writetime(b) FROM t", [1, 1, 2, 10])
assert_one(session, "SELECT k,a,b,writetime(b) FROM mv", [1, 1, 2, 10])
# switch entries. shadow a = 1, insert a = 2
self.update_view(session, "UPDATE t USING TIMESTAMP 4 SET a = 2 WHERE k = 1;", flush, compact=True)
assert_one(session, "SELECT k,a,b,writetime(b) FROM t", [1, 2, 2, 10])
assert_one(session, "SELECT k,a,b,writetime(b) FROM mv", [1, 2, 2, 10])
# able to shadow view row even if base-column in view pk's ts is smaller than row timestamp
# set row TS = 20, a@6, b@20
self.update_view(session, "DELETE FROM t USING TIMESTAMP 5 where k = 1;", flush)
assert_one(session, "SELECT k,a,b,writetime(b) FROM t", [1, None, 2, 10])
assert_none(session, "SELECT k,a,b,writetime(b) FROM mv")
self.update_view(session, "INSERT INTO t (k, a, b) VALUES (1, 1, 1) USING TIMESTAMP 6;", flush)
assert_one(session, "SELECT k,a,b,writetime(b) FROM t", [1, 1, 2, 10])
assert_one(session, "SELECT k,a,b,writetime(b) FROM mv", [1, 1, 2, 10])
self.update_view(session, "INSERT INTO t (k, b) VALUES (1, 1) USING TIMESTAMP 20;", flush)
assert_one(session, "SELECT k,a,b,writetime(b) FROM t", [1, 1, 1, 20])
assert_one(session, "SELECT k,a,b,writetime(b) FROM mv", [1, 1, 1, 20])
# switch entries. shadow a = 1, insert a = 2
self.update_view(session, "UPDATE t USING TIMESTAMP 7 SET a = 2 WHERE k = 1;", flush)
assert_one(session, "SELECT k,a,b,writetime(a),writetime(b) FROM t", [1, 2, 1, 7, 20])
assert_one(session, "SELECT k,a,b,writetime(b) FROM mv", [1, 2, 1, 20])
# switch entries. shadow a = 2, insert a = 1
self.update_view(session, "UPDATE t USING TIMESTAMP 8 SET a = 1 WHERE k = 1;", flush)
assert_one(session, "SELECT k,a,b,writetime(a),writetime(b) FROM t", [1, 1, 1, 8, 20])
assert_one(session, "SELECT k,a,b,writetime(b) FROM mv", [1, 1, 1, 20])
# create another view row
self.update_view(session, "INSERT INTO t (k, a, b) VALUES (2, 2, 2);", flush)
assert_one(session, "SELECT k,a,b FROM t WHERE k = 2", [2, 2, 2])
assert_one(session, "SELECT k,a,b FROM mv WHERE k = 2", [2, 2, 2])
# stop node2, node3
logger.debug('Shutdown node2')
node2.stop(wait_other_notice=True)
logger.debug('Shutdown node3')
node3.stop(wait_other_notice=True)
# shadow a = 1, create a = 2
query = SimpleStatement("UPDATE t USING TIMESTAMP 9 SET a = 2 WHERE k = 1", consistency_level=ConsistencyLevel.ONE)
self.update_view(session, query, flush)
# shadow (a=2, k=2) after 3 second
query = SimpleStatement("UPDATE t USING TTL 3 SET a = 2 WHERE k = 2", consistency_level=ConsistencyLevel.ONE)
self.update_view(session, query, flush)
logger.debug('Starting node2')
node2.start(wait_for_binary_proto=True)
logger.debug('Starting node3')
node3.start(wait_for_binary_proto=True)
# For k = 1 & a = 1, We should get a digest mismatch of tombstones and repaired
query = SimpleStatement("SELECT * FROM mv WHERE k = 1 AND a = 1", consistency_level=ConsistencyLevel.ALL)
result = session.execute(query, trace=True)
self.check_trace_events(result.get_query_trace(), True)
assert 0 == len(result.current_rows)
# For k = 1 & a = 1, second time no digest mismatch
result = session.execute(query, trace=True)
self.check_trace_events(result.get_query_trace(), False)
assert_none(session, "SELECT * FROM mv WHERE k = 1 AND a = 1")
assert 0 == len(result.current_rows)
# For k = 1 & a = 2, We should get a digest mismatch of data and repaired for a = 2
query = SimpleStatement("SELECT * FROM mv WHERE k = 1 AND a = 2", consistency_level=ConsistencyLevel.ALL)
result = session.execute(query, trace=True)
self.check_trace_events(result.get_query_trace(), True)
assert 1 == len(result.current_rows)
# For k = 1 & a = 2, second time no digest mismatch
result = session.execute(query, trace=True)
self.check_trace_events(result.get_query_trace(), False)
assert 1 == len(result.current_rows)
assert_one(session, "SELECT k,a,b,writetime(b) FROM mv WHERE k = 1", [1, 2, 1, 20])
time.sleep(3)
# For k = 2 & a = 2, We should get a digest mismatch of expired and repaired
query = SimpleStatement("SELECT * FROM mv WHERE k = 2 AND a = 2", consistency_level=ConsistencyLevel.ALL)
result = session.execute(query, trace=True)
self.check_trace_events(result.get_query_trace(), True)
logger.debug(result.current_rows)
assert 0 == len(result.current_rows)
# For k = 2 & a = 2, second time no digest mismatch
result = session.execute(query, trace=True)
self.check_trace_events(result.get_query_trace(), False)
assert 0 == len(result.current_rows)
@since('3.0')
def test_expired_liveness_with_limit_rf1_nodes1(self):
self._test_expired_liveness_with_limit(rf=1, nodes=1)
@since('3.0')
def test_expired_liveness_with_limit_rf1_nodes3(self):
self._test_expired_liveness_with_limit(rf=1, nodes=3)
@since('3.0')
def test_expired_liveness_with_limit_rf3(self):
self._test_expired_liveness_with_limit(rf=3, nodes=3)
def _test_expired_liveness_with_limit(self, rf, nodes):
"""
Test MV with expired liveness limit is properly handled
@jira_ticket CASSANDRA-13883
"""
session = self.prepare(rf=rf, nodes=nodes, options={'hinted_handoff_enabled': False}, consistency_level=ConsistencyLevel.QUORUM)
node1 = self.cluster.nodelist()[0]
session.execute('USE ks')
session.execute("CREATE TABLE t (k int PRIMARY KEY, a int, b int)")
session.execute(("CREATE MATERIALIZED VIEW mv AS SELECT * FROM t "
"WHERE k IS NOT NULL AND a IS NOT NULL PRIMARY KEY (k, a)"))
session.cluster.control_connection.wait_for_schema_agreement()
for k in range(100):
session.execute("INSERT INTO t (k, a, b) VALUES ({}, {}, {})".format(k, k, k))
# generate view row with expired liveness except for row 50 and 99
for k in range(100):
if k == 50 or k == 99:
continue
session.execute("DELETE a FROM t where k = {};".format(k))
# there should be 2 live data
assert_one(session, "SELECT k,a,b FROM mv limit 1", [50, 50, 50])
assert_all(session, "SELECT k,a,b FROM mv limit 2", [[50, 50, 50], [99, 99, 99]])
assert_all(session, "SELECT k,a,b FROM mv", [[50, 50, 50], [99, 99, 99]])
# verify IN
keys = range(100)
assert_one(session, "SELECT k,a,b FROM mv WHERE k in ({}) limit 1".format(', '.join(str(x) for x in keys)),
[50, 50, 50])
assert_all(session, "SELECT k,a,b FROM mv WHERE k in ({}) limit 2".format(', '.join(str(x) for x in keys)),
[[50, 50, 50], [99, 99, 99]])
assert_all(session, "SELECT k,a,b FROM mv WHERE k in ({})".format(', '.join(str(x) for x in keys)),
[[50, 50, 50], [99, 99, 99]])
# verify fetch size
session.default_fetch_size = 1
assert_one(session, "SELECT k,a,b FROM mv limit 1", [50, 50, 50])
assert_all(session, "SELECT k,a,b FROM mv limit 2", [[50, 50, 50], [99, 99, 99]])
assert_all(session, "SELECT k,a,b FROM mv", [[50, 50, 50], [99, 99, 99]])
@since('3.0')
def test_base_column_in_view_pk_commutative_tombstone_with_flush(self):
self._test_base_column_in_view_pk_commutative_tombstone_(flush=True)
@since('3.0')
def test_base_column_in_view_pk_commutative_tombstone_without_flush(self):
self._test_base_column_in_view_pk_commutative_tombstone_(flush=False)
def _test_base_column_in_view_pk_commutative_tombstone_(self, flush):
"""
view row deletion should be commutative with newer view livenessInfo, otherwise deleted columns may be resurrected.
@jira_ticket CASSANDRA-13409
"""
session = self.prepare(rf=3, nodes=3, options={'hinted_handoff_enabled': False}, consistency_level=ConsistencyLevel.QUORUM)
node1 = self.cluster.nodelist()[0]
session.execute('USE ks')
session.execute("CREATE TABLE t (id int PRIMARY KEY, v int, v2 text, v3 decimal)")
session.execute(("CREATE MATERIALIZED VIEW t_by_v AS SELECT * FROM t "
"WHERE v IS NOT NULL AND id IS NOT NULL PRIMARY KEY (v,id)"))
session.cluster.control_connection.wait_for_schema_agreement()
for node in self.cluster.nodelist():
node.nodetool("disableautocompaction")
# sstable 1, Set initial values TS=1
self.update_view(session, "INSERT INTO t (id, v, v2, v3) VALUES (1, 1, 'a', 3.0) USING TIMESTAMP 1", flush)
assert_one(session, "SELECT * FROM t_by_v", [1, 1, 'a', 3.0])
# sstable 2, change v's value and TS=2, tombstones v=1 and adds v=0 record
self.update_view(session, "DELETE FROM t USING TIMESTAMP 2 WHERE id = 1;", flush)
assert_none(session, "SELECT * FROM t_by_v")
assert_none(session, "SELECT * FROM t")
# sstable 3, tombstones of mv created by base deletion should remain.
self.update_view(session, "INSERT INTO t (id, v) VALUES (1, 1) USING TIMESTAMP 3", flush)
assert_one(session, "SELECT * FROM t_by_v", [1, 1, None, None])
assert_one(session, "SELECT * FROM t", [1, 1, None, None])
# sstable 4, shadow view row (id=1, v=1), insert (id=1, v=2, ts=4)
self.update_view(session, "UPDATE t USING TIMESTAMP 4 set v = 2 WHERE id = 1;", flush)
assert_one(session, "SELECT * FROM t_by_v", [2, 1, None, None])
assert_one(session, "SELECT * FROM t", [1, 2, None, None])
# sstable 5, shadow view row (id=1, v=2), insert (id=1, v=1 ts=5)
self.update_view(session, "UPDATE t USING TIMESTAMP 5 set v = 1 WHERE id = 1;", flush)
assert_one(session, "SELECT * FROM t_by_v", [1, 1, None, None])
assert_one(session, "SELECT * FROM t", [1, 1, None, None]) # data deleted by row-tombstone@2 should not resurrect
if flush:
self.cluster.compact()
assert_one(session, "SELECT * FROM t_by_v", [1, 1, None, None])
assert_one(session, "SELECT * FROM t", [1, 1, None, None]) # data deleted by row-tombstone@2 should not resurrect
# shadow view row (id=1, v=1)
self.update_view(session, "UPDATE t USING TIMESTAMP 5 set v = null WHERE id = 1;", flush)
assert_none(session, "SELECT * FROM t_by_v")
assert_one(session, "SELECT * FROM t", [1, None, None, None])
def test_view_tombstone(self):
"""
Test that a materialized views properly tombstone
@jira_ticket CASSANDRA-10261
@jira_ticket CASSANDRA-10910
"""
self.prepare(rf=3, options={'hinted_handoff_enabled': False})
node1, node2, node3 = self.cluster.nodelist()
session = self.patient_exclusive_cql_connection(node1)
session.max_trace_wait = 120
session.execute('USE ks')
session.execute("CREATE TABLE t (id int PRIMARY KEY, v int, v2 text, v3 decimal)")
session.execute(("CREATE MATERIALIZED VIEW t_by_v AS SELECT * FROM t "
"WHERE v IS NOT NULL AND id IS NOT NULL PRIMARY KEY (v,id)"))
session.cluster.control_connection.wait_for_schema_agreement()
# Set initial values TS=0, verify
session.execute(SimpleStatement("INSERT INTO t (id, v, v2, v3) VALUES (1, 1, 'a', 3.0) USING TIMESTAMP 0",
consistency_level=ConsistencyLevel.ALL))
self._replay_batchlogs()
assert_one(
session,
"SELECT * FROM t_by_v WHERE v = 1",
[1, 1, 'a', 3.0]
)
session.execute(SimpleStatement("INSERT INTO t (id, v2) VALUES (1, 'b') USING TIMESTAMP 1",
consistency_level=ConsistencyLevel.ALL))
self._replay_batchlogs()
assert_one(
session,
"SELECT * FROM t_by_v WHERE v = 1",
[1, 1, 'b', 3.0]
)
# change v's value and TS=3, tombstones v=1 and adds v=0 record
session.execute(SimpleStatement("UPDATE t USING TIMESTAMP 3 SET v = 0 WHERE id = 1",
consistency_level=ConsistencyLevel.ALL))
self._replay_batchlogs()
assert_none(session, "SELECT * FROM t_by_v WHERE v = 1")
logger.debug('Shutdown node2')
node2.stop(wait_other_notice=True)
session.execute(SimpleStatement("UPDATE t USING TIMESTAMP 4 SET v = 1 WHERE id = 1",
consistency_level=ConsistencyLevel.QUORUM))
self._replay_batchlogs()
assert_one(
session,
"SELECT * FROM t_by_v WHERE v = 1",
[1, 1, 'b', 3.0]
)
node2.start(wait_for_binary_proto=True)
# We should get a digest mismatch
query = SimpleStatement("SELECT * FROM t_by_v WHERE v = 1",
consistency_level=ConsistencyLevel.ALL)
result = session.execute(query, trace=True)
self.check_trace_events(result.get_query_trace(), True)
# We should not get a digest mismatch the second time
query = SimpleStatement("SELECT * FROM t_by_v WHERE v = 1", consistency_level=ConsistencyLevel.ALL)
result = session.execute(query, trace=True)
self.check_trace_events(result.get_query_trace(), False)
# Verify values one last time
assert_one(
session,
"SELECT * FROM t_by_v WHERE v = 1",
[1, 1, 'b', 3.0],
cl=ConsistencyLevel.ALL
)
def check_trace_events(self, trace, expect_digest):
# we should see multiple requests get enqueued prior to index scan
# execution happening
# Look for messages like:
# 4.0+ Digest mismatch: Mismatch for key DecoratedKey
# <4.0 Digest mismatch: org.apache.cassandra.service.DigestMismatchException: Mismatch for key DecoratedKey
regex = r"Digest mismatch: ([a-zA-Z.]+:\s)?Mismatch for key DecoratedKey"
for event in trace.events:
desc = event.description
match = re.match(regex, desc)
if match:
if expect_digest:
break
else:
self.fail("Encountered digest mismatch when we shouldn't")
else:
if expect_digest:
self.fail("Didn't find digest mismatch")
def test_simple_repair_by_base(self):
self._simple_repair_test(repair_base=True)
def test_simple_repair_by_view(self):
self._simple_repair_test(repair_view=True)
def _simple_repair_test(self, repair_base=False, repair_view=False):
"""
Test that a materialized view are consistent after a simple repair.
"""
session = self.prepare(rf=3, options={'hinted_handoff_enabled': False})
node1, node2, node3 = self.cluster.nodelist()
session.execute("CREATE TABLE t (id int PRIMARY KEY, v int, v2 text, v3 decimal)")
session.execute(("CREATE MATERIALIZED VIEW t_by_v AS SELECT * FROM t "
"WHERE v IS NOT NULL AND id IS NOT NULL PRIMARY KEY (v, id)"))
session.cluster.control_connection.wait_for_schema_agreement()
logger.debug('Shutdown node2')
node2.stop(wait_other_notice=True)
for i in range(1000):
session.execute("INSERT INTO t (id, v, v2, v3) VALUES ({v}, {v}, 'a', 3.0)".format(v=i))
self._replay_batchlogs()
logger.debug('Verify the data in the MV with CL=ONE')
for i in range(1000):
assert_one(
session,
"SELECT * FROM t_by_v WHERE v = {}".format(i),
[i, i, 'a', 3.0]
)
logger.debug('Verify the data in the MV with CL=ALL. All should be unavailable.')
for i in range(1000):
statement = SimpleStatement(
"SELECT * FROM t_by_v WHERE v = {}".format(i),
consistency_level=ConsistencyLevel.ALL
)
assert_unavailable(
session.execute,
statement
)
logger.debug('Start node2, and repair')
node2.start(wait_for_binary_proto=True)
if repair_base:
node1.nodetool("repair ks t")
if repair_view:
node1.nodetool("repair ks t_by_v")
logger.debug('Verify the data in the MV with CL=ALL. All should be available now and no digest mismatch')
for i in range(1000):
query = SimpleStatement(
"SELECT * FROM t_by_v WHERE v = {}".format(i),
consistency_level=ConsistencyLevel.ALL
)
result = session.execute(query, trace=True)
self.check_trace_events(result.get_query_trace(), False)
assert self._rows_to_list(result.current_rows), [[i, i, 'a' == 3.0]]
def test_base_replica_repair(self):
self._base_replica_repair_test()
def test_base_replica_repair_with_contention(self):
"""
Test repair does not fail when there is MV lock contention
@jira_ticket CASSANDRA-12905
"""
self._base_replica_repair_test(fail_mv_lock=True)
def _base_replica_repair_test(self, fail_mv_lock=False):
"""
Test that a materialized view are consistent after the repair of the base replica.
"""
self.prepare(rf=3)
node1, node2, node3 = self.cluster.nodelist()
session = self.patient_exclusive_cql_connection(node1)
session.execute('USE ks')
session.execute("CREATE TABLE t (id int PRIMARY KEY, v int, v2 text, v3 decimal)")
session.execute(("CREATE MATERIALIZED VIEW t_by_v AS SELECT * FROM t "
"WHERE v IS NOT NULL AND id IS NOT NULL PRIMARY KEY (v, id)"))
session.cluster.control_connection.wait_for_schema_agreement()
logger.debug('Write initial data')
for i in range(1000):
session.execute("INSERT INTO t (id, v, v2, v3) VALUES ({v}, {v}, 'a', 3.0)".format(v=i))
self._replay_batchlogs()
logger.debug('Verify the data in the MV with CL=ALL')
for i in range(1000):
assert_one(
session,
"SELECT * FROM t_by_v WHERE v = {}".format(i),
[i, i, 'a', 3.0],
cl=ConsistencyLevel.ALL
)
logger.debug('Shutdown node1')
node1.stop(wait_other_notice=True)
logger.debug('Delete node1 data')
node1.clear(clear_all=True)
jvm_args = []
if fail_mv_lock:
if self.cluster.version() >= LooseVersion('3.10'): # CASSANDRA-10134
jvm_args = ['-Dcassandra.allow_unsafe_replace=true', '-Dcassandra.replace_address={}'.format(node1.address())]
jvm_args.append("-Dcassandra.test.fail_mv_locks_count=1000")
# this should not make Keyspace.apply throw WTE on failure to acquire lock
node1.set_configuration_options(values={'write_request_timeout_in_ms': 100})
logger.debug('Restarting node1 with jvm_args={}'.format(jvm_args))
node1.start(wait_for_binary_proto=True, jvm_args=jvm_args)
logger.debug('Shutdown node2 and node3')
node2.stop(wait_other_notice=True)
node3.stop(wait_other_notice=True)
session = self.patient_exclusive_cql_connection(node1)
session.execute('USE ks')
logger.debug('Verify that there is no data on node1')
for i in range(1000):
assert_none(
session,
"SELECT * FROM t_by_v WHERE v = {}".format(i)
)
logger.debug('Restarting node2 and node3')
node2.start(wait_for_binary_proto=True)
node3.start(wait_for_binary_proto=True)
# Just repair the base replica
logger.debug('Starting repair on node1')
node1.nodetool("repair ks t")
logger.debug('Verify data with cl=ALL')
for i in range(1000):
assert_one(
session,
"SELECT * FROM t_by_v WHERE v = {}".format(i),
[i, i, 'a', 3.0]
)
@pytest.mark.resource_intensive
def test_complex_repair(self):
"""
Test that a materialized view are consistent after a more complex repair.
"""
session = self.prepare(rf=5, options={'hinted_handoff_enabled': False}, nodes=5)
node1, node2, node3, node4, node5 = self.cluster.nodelist()
# we create the base table with gc_grace_seconds=5 so batchlog will expire after 5 seconds
session.execute("CREATE TABLE ks.t (id int PRIMARY KEY, v int, v2 text, v3 decimal)"
"WITH gc_grace_seconds = 5")
session.execute(("CREATE MATERIALIZED VIEW ks.t_by_v AS SELECT * FROM t "
"WHERE v IS NOT NULL AND id IS NOT NULL PRIMARY KEY (v, id)"))
session.cluster.control_connection.wait_for_schema_agreement()
logger.debug('Shutdown node2 and node3')
node2.stop()
node3.stop(wait_other_notice=True)
logger.debug('Write initial data to node1 (will be replicated to node4 and node5)')
for i in range(1000):
session.execute("INSERT INTO ks.t (id, v, v2, v3) VALUES ({v}, {v}, 'a', 3.0)".format(v=i))
logger.debug('Verify the data in the MV on node1 with CL=ONE')
for i in range(1000):
assert_one(
session,
"SELECT * FROM ks.t_by_v WHERE v = {}".format(i),
[i, i, 'a', 3.0]
)
logger.debug('Close connection to node1')
session.cluster.shutdown()
logger.debug('Shutdown node1, node4 and node5')
node1.stop()
node4.stop()
node5.stop()
logger.debug('Start nodes 2 and 3')
node2.start()
node3.start(wait_for_binary_proto=True)
session2 = self.patient_cql_connection(node2)
logger.debug('Verify the data in the MV on node2 with CL=ONE. No rows should be found.')
for i in range(1000):
assert_none(
session2,
"SELECT * FROM ks.t_by_v WHERE v = {}".format(i)
)
logger.debug('Write new data in node2 and node3 that overlap those in node1, node4 and node5')
for i in range(1000):
# we write i*2 as value, instead of i
session2.execute("INSERT INTO ks.t (id, v, v2, v3) VALUES ({v}, {v}, 'a', 3.0)".format(v=i * 2))
logger.debug('Verify the new data in the MV on node2 with CL=ONE')
for i in range(1000):
v = i * 2
assert_one(
session2,
"SELECT * FROM ks.t_by_v WHERE v = {}".format(v),
[v, v, 'a', 3.0]
)
logger.debug('Wait for batchlogs to expire from node2 and node3')
time.sleep(5)
logger.debug('Start remaining nodes')
node1.start(wait_for_binary_proto=True)
node4.start(wait_for_binary_proto=True)
node5.start(wait_for_binary_proto=True)
session = self.patient_cql_connection(node1)
logger.debug('Read data from MV at QUORUM (old data should be returned)')
for i in range(1000):
assert_one(
session,
"SELECT * FROM ks.t_by_v WHERE v = {}".format(i),
[i, i, 'a', 3.0],
cl=ConsistencyLevel.QUORUM
)
logger.debug('Run global repair on node1')
node1.repair()
logger.debug('Read data from MV at quorum (new data should be returned after repair)')
for i in range(1000):
v = i * 2
assert_one(
session,
"SELECT * FROM ks.t_by_v WHERE v = {}".format(v),
[v, v, 'a', 3.0],
cl=ConsistencyLevel.QUORUM
)
@pytest.mark.resource_intensive
def test_throttled_partition_update(self):
"""
@jira_ticket: CASSANDRA-13299, test break up large partition when repairing base with mv.
Provide a configuable batch size(cassandra.mv.mutation.row.count=100) to trottle number
of rows to be applied in one mutation
"""
session = self.prepare(rf=5, options={'hinted_handoff_enabled': False}, nodes=5)
node1, node2, node3, node4, node5 = self.cluster.nodelist()
for node in self.cluster.nodelist():
node.nodetool("disableautocompaction")
session.execute("CREATE TABLE ks.t (pk int, ck1 int, ck2 int, v1 int, v2 int, PRIMARY KEY(pk, ck1, ck2))")
session.execute(("CREATE MATERIALIZED VIEW ks.t_by_v AS SELECT * FROM t "
"WHERE pk IS NOT NULL AND ck1 IS NOT NULL AND ck2 IS NOT NULL "
"PRIMARY KEY (pk, ck2, ck1)"))
session.cluster.control_connection.wait_for_schema_agreement()
logger.debug('Shutdown node2 and node3')
node2.stop(wait_other_notice=True)
node3.stop(wait_other_notice=True)
size = 50
range_deletion_ts = 30
partition_deletion_ts = 10
for ck1 in range(size):
for ck2 in range(size):
session.execute("INSERT INTO ks.t (pk, ck1, ck2, v1, v2)"
" VALUES (1, {}, {}, {}, {}) USING TIMESTAMP {}".format(ck1, ck2, ck1, ck2, ck1))
self._replay_batchlogs()
for ck1 in range(size):
for ck2 in range(size):
assert_one(session, "SELECT pk,ck1,ck2,v1,v2 FROM ks.t WHERE pk=1 AND ck1={} AND ck2={}".format(ck1, ck2),
[1, ck1, ck2, ck1, ck2])
assert_one(session, "SELECT pk,ck1,ck2,v1,v2 FROM ks.t_by_v WHERE pk=1 AND ck1={} AND ck2={}".format(ck1, ck2),
[1, ck1, ck2, ck1, ck2])
logger.debug('Shutdown node4 and node5')
node4.stop(wait_other_notice=True)
node5.stop(wait_other_notice=True)
for ck1 in range(size):
for ck2 in range(size):
if ck1 % 2 == 0: # range tombstone
session.execute("DELETE FROM ks.t USING TIMESTAMP 50 WHERE pk=1 AND ck1={}".format(ck1))
elif ck1 == ck2: # row tombstone
session.execute("DELETE FROM ks.t USING TIMESTAMP 60 WHERE pk=1 AND ck1={} AND ck2={}".format(ck1, ck2))
elif ck1 == ck2 - 1: # cell tombstone
session.execute("DELETE v2 FROM ks.t USING TIMESTAMP 70 WHERE pk=1 AND ck1={} AND ck2={}".format(ck1, ck2))
# range deletion
session.execute("DELETE FROM ks.t USING TIMESTAMP {} WHERE pk=1 and ck1 < 30 and ck1 > 20".format(range_deletion_ts))
session.execute("DELETE FROM ks.t USING TIMESTAMP {} WHERE pk=1 and ck1 = 20 and ck2 < 10".format(range_deletion_ts))
# partition deletion for ck1 <= partition_deletion_ts
session.execute("DELETE FROM ks.t USING TIMESTAMP {} WHERE pk=1".format(partition_deletion_ts))
# only partition deletion for the pk=2000
session.execute("DELETE FROM ks.t USING TIMESTAMP {} WHERE pk=2000".format(partition_deletion_ts))
self._replay_batchlogs()
# start nodes with different batch size
logger.debug('Starting nodes')
node2.start(wait_for_binary_proto=True, jvm_args=["-Dcassandra.repair.mutation_repair_rows_per_batch={}".format(2)])
node3.start(wait_for_binary_proto=True, jvm_args=["-Dcassandra.repair.mutation_repair_rows_per_batch={}".format(5)])
node4.start(wait_for_binary_proto=True, jvm_args=["-Dcassandra.repair.mutation_repair_rows_per_batch={}".format(50)])
node5.start(wait_for_binary_proto=True, jvm_args=["-Dcassandra.repair.mutation_repair_rows_per_batch={}".format(5000)])
self._replay_batchlogs()
logger.debug('repairing base table')
node1.nodetool("repair ks t")
# insert data to the deleted partition with pk=2000, they should be considered dead
session.execute("INSERT INTO ks.t (pk, ck1, ck2, v1, v2)"
" VALUES (2000, 0, 0, 0, 0) USING TIMESTAMP {}".format(partition_deletion_ts - 1))
self._replay_batchlogs()
logger.debug('stop cluster')
self.cluster.stop()
logger.debug('rolling restart to check repaired data on each node')
for node in self.cluster.nodelist():
logger.debug('starting {}'.format(node.name))
node.start(wait_for_binary_proto=True)
session = self.patient_cql_connection(node, consistency_level=ConsistencyLevel.ONE)
for ck1 in range(size):
for ck2 in range(size):
if (
ck1 <= partition_deletion_ts or # partition deletion
ck1 == ck2 or ck1 % 2 == 0 or # row deletion or range tombstone
(ck1 > 20 and ck1 < 30) or (ck1 == 20 and ck2 < 10) # range tombstone
):
assert_none(session, "SELECT pk,ck1,ck2,v1,v2 FROM ks.t_by_v WHERE pk=1 AND "
"ck1={} AND ck2={}".format(ck1, ck2))
assert_none(session, "SELECT pk,ck1,ck2,v1,v2 FROM ks.t WHERE pk=1 AND "
"ck1={} AND ck2={}".format(ck1, ck2))
elif ck1 == ck2 - 1: # cell tombstone
assert_one(session, "SELECT pk,ck1,ck2,v1,v2 FROM ks.t_by_v WHERE pk=1 AND "
"ck1={} AND ck2={}".format(ck1, ck2), [1, ck1, ck2, ck1, None])
assert_one(session, "SELECT pk,ck1,ck2,v1,v2 FROM ks.t WHERE pk=1 AND "
"ck1={} AND ck2={}".format(ck1, ck2), [1, ck1, ck2, ck1, None])
else:
assert_one(session, "SELECT pk,ck1,ck2,v1,v2 FROM ks.t_by_v WHERE pk=1 AND "
"ck1={} AND ck2={}".format(ck1, ck2), [1, ck1, ck2, ck1, ck2])
assert_one(session, "SELECT pk,ck1,ck2,v1,v2 FROM ks.t WHERE pk=1 AND "
"ck1={} AND ck2={}".format(ck1, ck2), [1, ck1, ck2, ck1, ck2])
# Verify partition deletion with pk=2000 has no live data
assert_none(session, "SELECT pk,ck1,ck2,v1,v2 FROM ks.t WHERE pk=2000")
assert_none(session, "SELECT pk,ck1,ck2,v1,v2 FROM ks.t_by_v WHERE pk=2000")
logger.debug('stopping {}'.format(node.name))
node.stop(wait_other_notice=True, wait_for_binary_proto=True)
@pytest.mark.resource_intensive
def test_really_complex_repair(self):
"""
Test that a materialized view are consistent after a more complex repair.
"""
session = self.prepare(rf=5, options={'hinted_handoff_enabled': False}, nodes=5)
node1, node2, node3, node4, node5 = self.cluster.nodelist()
# we create the base table with gc_grace_seconds=5 so batchlog will expire after 5 seconds
session.execute("CREATE TABLE ks.t (id int, v int, v2 text, v3 decimal, PRIMARY KEY(id, v, v2))"
"WITH gc_grace_seconds = 1")
session.execute(("CREATE MATERIALIZED VIEW ks.t_by_v AS SELECT * FROM t "
"WHERE v IS NOT NULL AND id IS NOT NULL AND v IS NOT NULL AND "
"v2 IS NOT NULL PRIMARY KEY (v2, v, id)"))
session.cluster.control_connection.wait_for_schema_agreement()
logger.debug('Shutdown node2 and node3')
node2.stop(wait_other_notice=True)
node3.stop(wait_other_notice=True)
session.execute("INSERT INTO ks.t (id, v, v2, v3) VALUES (1, 1, 'a', 3.0)")
session.execute("INSERT INTO ks.t (id, v, v2, v3) VALUES (2, 2, 'a', 3.0)")
self._replay_batchlogs()
logger.debug('Verify the data in the MV on node1 with CL=ONE')
assert_all(session, "SELECT * FROM ks.t_by_v WHERE v2 = 'a'", [['a', 1, 1, 3.0], ['a', 2, 2, 3.0]])
session.execute("INSERT INTO ks.t (id, v, v2, v3) VALUES (1, 1, 'b', 3.0)")
session.execute("INSERT INTO ks.t (id, v, v2, v3) VALUES (2, 2, 'b', 3.0)")
self._replay_batchlogs()
logger.debug('Verify the data in the MV on node1 with CL=ONE')
assert_all(session, "SELECT * FROM ks.t_by_v WHERE v2 = 'b'", [['b', 1, 1, 3.0], ['b', 2, 2, 3.0]])
session.shutdown()
logger.debug('Shutdown node1, node4 and node5')
node1.stop()
node4.stop()
node5.stop()
logger.debug('Start nodes 2 and 3')
node2.start()
node3.start(wait_for_binary_proto=True)
session2 = self.patient_cql_connection(node2)
session2.execute('USE ks')
logger.debug('Verify the data in the MV on node2 with CL=ONE. No rows should be found.')
assert_none(session2, "SELECT * FROM ks.t_by_v WHERE v2 = 'a'")
logger.debug('Write new data in node2 that overlap those in node1')
session2.execute("INSERT INTO ks.t (id, v, v2, v3) VALUES (1, 1, 'c', 3.0)")
session2.execute("INSERT INTO ks.t (id, v, v2, v3) VALUES (2, 2, 'c', 3.0)")
self._replay_batchlogs()
assert_all(session2, "SELECT * FROM ks.t_by_v WHERE v2 = 'c'", [['c', 1, 1, 3.0], ['c', 2, 2, 3.0]])
session2.execute("INSERT INTO ks.t (id, v, v2, v3) VALUES (1, 1, 'd', 3.0)")
session2.execute("INSERT INTO ks.t (id, v, v2, v3) VALUES (2, 2, 'd', 3.0)")
self._replay_batchlogs()
assert_all(session2, "SELECT * FROM ks.t_by_v WHERE v2 = 'd'", [['d', 1, 1, 3.0], ['d', 2, 2, 3.0]])
logger.debug("Composite delete of everything")
session2.execute("DELETE FROM ks.t WHERE id = 1 and v = 1")
session2.execute("DELETE FROM ks.t WHERE id = 2 and v = 2")
self._replay_batchlogs()
assert_none(session2, "SELECT * FROM ks.t_by_v WHERE v2 = 'c'")
assert_none(session2, "SELECT * FROM ks.t_by_v WHERE v2 = 'd'")
logger.debug('Wait for batchlogs to expire from node2 and node3')
time.sleep(5)
logger.debug('Start remaining nodes')
node1.start(wait_for_binary_proto=True)
node4.start(wait_for_binary_proto=True)
node5.start(wait_for_binary_proto=True)
# at this point the data isn't repaired so we have an inconsistency.
# this value should return None
assert_all(
session2,
"SELECT * FROM ks.t_by_v WHERE v2 = 'a'", [['a', 1, 1, 3.0], ['a', 2, 2, 3.0]],
cl=ConsistencyLevel.QUORUM
)
logger.debug('Run global repair on node1')
node1.repair()
assert_none(session2, "SELECT * FROM ks.t_by_v WHERE v2 = 'a'", cl=ConsistencyLevel.QUORUM)
def test_complex_mv_select_statements(self):
"""
Test complex MV select statements
@jira_ticket CASSANDRA-9664
"""
cluster = self.cluster
cluster.set_configuration_options({'enable_materialized_views': 'true'})
cluster.populate(3).start()
node1 = cluster.nodelist()[0]
session = self.patient_cql_connection(node1, consistency_level=ConsistencyLevel.QUORUM)
logger.debug("Creating keyspace")
session.execute("CREATE KEYSPACE mvtest WITH replication = "
"{'class': 'SimpleStrategy', 'replication_factor': '3'}")
session.execute('USE mvtest')
mv_primary_keys = ["((a, b), c)",
"((b, a), c)",
"(a, b, c)",
"(c, b, a)",
"((c, a), b)"]
for mv_primary_key in mv_primary_keys:
session.execute("CREATE TABLE test (a int, b int, c int, d int, PRIMARY KEY (a, b, c))")
insert_stmt = session.prepare("INSERT INTO test (a, b, c, d) VALUES (?, ?, ?, ?)")
update_stmt = session.prepare("UPDATE test SET d = ? WHERE a = ? AND b = ? AND c = ?")
delete_stmt1 = session.prepare("DELETE FROM test WHERE a = ? AND b = ? AND c = ?")
delete_stmt2 = session.prepare("DELETE FROM test WHERE a = ?")
session.cluster.control_connection.wait_for_schema_agreement()
rows = [(0, 0, 0, 0),
(0, 0, 1, 0),
(0, 1, 0, 0),
(0, 1, 1, 0),
(1, 0, 0, 0),
(1, 0, 1, 0),
(1, 1, -1, 0),
(1, 1, 0, 0),
(1, 1, 1, 0)]
for row in rows:
session.execute(insert_stmt, row)
logger.debug("Testing MV primary key: {}".format(mv_primary_key))
session.execute("CREATE MATERIALIZED VIEW mv AS SELECT * FROM test WHERE "
"a = 1 AND b IS NOT NULL AND c = 1 PRIMARY KEY {}".format(mv_primary_key))
time.sleep(3)
assert_all(
session, "SELECT a, b, c, d FROM mv",
[[1, 0, 1, 0], [1, 1, 1, 0]],
ignore_order=True,
cl=ConsistencyLevel.QUORUM
)
# insert new rows that does not match the filter
session.execute(insert_stmt, (0, 0, 1, 0))
session.execute(insert_stmt, (1, 1, 0, 0))
assert_all(
session, "SELECT a, b, c, d FROM mv",
[[1, 0, 1, 0], [1, 1, 1, 0]],
ignore_order=True,
cl=ConsistencyLevel.QUORUM
)
# insert new row that does match the filter
session.execute(insert_stmt, (1, 2, 1, 0))
assert_all(
session, "SELECT a, b, c, d FROM mv",
[[1, 0, 1, 0], [1, 1, 1, 0], [1, 2, 1, 0]],
ignore_order=True,
cl=ConsistencyLevel.QUORUM
)
# update rows that does not match the filter
session.execute(update_stmt, (1, 1, -1, 0))
session.execute(update_stmt, (0, 1, 1, 0))
assert_all(
session, "SELECT a, b, c, d FROM mv",
[[1, 0, 1, 0], [1, 1, 1, 0], [1, 2, 1, 0]],
ignore_order=True,
cl=ConsistencyLevel.QUORUM
)
# update a row that does match the filter
session.execute(update_stmt, (2, 1, 1, 1))
assert_all(
session, "SELECT a, b, c, d FROM mv",
[[1, 0, 1, 0], [1, 1, 1, 2], [1, 2, 1, 0]],
ignore_order=True,
cl=ConsistencyLevel.QUORUM
)
# delete rows that does not match the filter
session.execute(delete_stmt1, (1, 1, -1))
session.execute(delete_stmt1, (2, 0, 1))
session.execute(delete_stmt2, (0,))
assert_all(
session, "SELECT a, b, c, d FROM mv",
[[1, 0, 1, 0], [1, 1, 1, 2], [1, 2, 1, 0]],
ignore_order=True,
cl=ConsistencyLevel.QUORUM
)