blob: 4d2d90635b39d75d49f0950876c438772be8dbe8 [file] [log] [blame]
import pytest
import time
import logging
from cassandra.concurrent import execute_concurrent_with_args
from dtest import Tester, create_ks, create_cf_simple
logger = logging.getLogger(__name__)
class TestGlobalRowKeyCache(Tester):
@pytest.fixture(autouse=True)
def fixture_add_additional_log_patterns(self, fixture_dtest_setup):
fixture_dtest_setup.ignore_log_patterns = (
r'Failed to load Java8 implementation ohc-core-j8'
)
def test_functional(self):
cluster = self.cluster
cluster.populate(3)
node1 = cluster.nodelist()[0]
for keycache_size in (0, 10):
for rowcache_size in (0, 10):
cluster.stop()
logger.debug("Testing with keycache size of %d MB, rowcache size of %d MB " %
(keycache_size, rowcache_size))
keyspace_name = 'ks_%d_%d' % (keycache_size, rowcache_size)
# make the caches save every five seconds
cluster.set_configuration_options(values={
'key_cache_size_in_mb': keycache_size,
'row_cache_size_in_mb': rowcache_size,
'row_cache_save_period': 5,
'key_cache_save_period': 5,
})
cluster.start()
session = self.patient_cql_connection(node1)
create_ks(session, keyspace_name, rf=3)
session.set_keyspace(keyspace_name)
create_cf_simple(session, 'test', "CREATE TABLE test (k int PRIMARY KEY, v1 int, v2 int)")
create_cf_simple(session, 'test_clustering',
"CREATE TABLE test_clustering (k int, v1 int, v2 int, PRIMARY KEY (k, v1))")
create_cf_simple(session, 'test_counter', "CREATE TABLE test_counter (k int PRIMARY KEY, v1 counter)")
create_cf_simple(session, 'test_counter_clustering',
"CREATE TABLE test_counter_clustering (k int, v1 int, v2 counter, PRIMARY KEY (k, v1))")
# insert 100 rows into each table
for cf in ('test', 'test_clustering'):
execute_concurrent_with_args(
session, session.prepare("INSERT INTO %s (k, v1, v2) VALUES (?, ?, ?)" % (cf,)),
[(i, i, i) for i in range(100)])
execute_concurrent_with_args(
session, session.prepare("UPDATE test_counter SET v1 = v1 + ? WHERE k = ?"),
[(i, i) for i in range(100)],
concurrency=2)
execute_concurrent_with_args(
session, session.prepare("UPDATE test_counter_clustering SET v2 = v2 + ? WHERE k = ? AND v1 = ?"),
[(i, i, i) for i in range(100)],
concurrency=2)
# flush everything to get it into sstables
for node in cluster.nodelist():
node.flush()
# update the first 10 rows in every table
# on non-counter tables, delete the first (remaining) row each round
num_updates = 10
for validation_round in range(3):
session.execute("DELETE FROM test WHERE k = %s", (validation_round,))
execute_concurrent_with_args(
session, session.prepare("UPDATE test SET v1 = ?, v2 = ? WHERE k = ?"),
[(i, validation_round, i) for i in range(validation_round + 1, num_updates)])
session.execute("DELETE FROM test_clustering WHERE k = %s AND v1 = %s", (validation_round, validation_round))
execute_concurrent_with_args(
session, session.prepare("UPDATE test_clustering SET v2 = ? WHERE k = ? AND v1 = ?"),
[(validation_round, i, i) for i in range(validation_round + 1, num_updates)])
execute_concurrent_with_args(
session, session.prepare("UPDATE test_counter SET v1 = v1 + ? WHERE k = ?"),
[(1, i) for i in range(num_updates)],
concurrency=2)
execute_concurrent_with_args(
session, session.prepare("UPDATE test_counter_clustering SET v2 = v2 + ? WHERE k = ? AND v1 = ?"),
[(1, i, i) for i in range(num_updates)],
concurrency=2)
self._validate_values(session, num_updates, validation_round)
session.shutdown()
# let the data be written to the row/key caches.
logger.debug("Letting caches be saved to disk")
time.sleep(10)
logger.debug("Stopping cluster")
cluster.stop()
time.sleep(1)
logger.debug("Starting cluster")
cluster.start()
time.sleep(5) # read the data back from row and key caches
session = self.patient_cql_connection(node1)
session.set_keyspace(keyspace_name)
# check all values again
self._validate_values(session, num_updates, validation_round=2)
def _validate_values(self, session, num_updates, validation_round):
# check values of non-counter tables
for cf in ('test', 'test_clustering'):
rows = list(session.execute("SELECT * FROM %s" % (cf,)))
# one row gets deleted each validation round
assert 100 - (validation_round + 1) == len(rows)
# adjust enumeration start to account for row deletions
for i, row in enumerate(sorted(rows), start=(validation_round + 1)):
assert i == row.k
assert i == row.v1
# updated rows will have different values
expected_value = validation_round if i < num_updates else i
assert expected_value == row.v2
# check values of counter tables
rows = list(session.execute("SELECT * FROM test_counter"))
assert 100 == len(rows)
for i, row in enumerate(sorted(rows)):
assert i == row.k
# updated rows will get incremented once each round
expected_value = i
if i < num_updates:
expected_value += validation_round + 1
assert expected_value == row.v1
rows = list(session.execute("SELECT * FROM test_counter_clustering"))
assert 100 == len(rows)
for i, row in enumerate(sorted(rows)):
assert i == row.k
assert i == row.v1
expected_value = i
if i < num_updates:
expected_value += validation_round + 1
assert expected_value == row.v2