blob: a172c922a574c0dde114bb42b2c8ef1058e5217c [file] [log] [blame]
import random
import threading
import uuid
import logging
from cassandra import ConsistencyLevel
from cassandra.query import SimpleStatement
from dtest import Tester, create_ks
logger = logging.getLogger(__name__)
class TestDeleteInsert(Tester):
"""
Examines scenarios around deleting data and adding data back with the same key
"""
# Generate 1000 rows in memory so we can re-use the same ones over again:
rows = [(str(uuid.uuid1()), x, random.choice(['group1', 'group2', 'group3', 'group4'])) for x in range(1000)]
def create_ddl(self, session, rf={'dc1': 2, 'dc2': 2}):
create_ks(session, 'delete_insert_search_test', rf)
session.execute('CREATE TABLE test (id uuid PRIMARY KEY, val1 text, group text)')
session.execute('CREATE INDEX group_idx ON test (group)')
def delete_group_rows(self, session, group):
"""Delete rows from a given group and return them"""
rows = [r for r in self.rows if r[2] == group]
ids = [r[0] for r in rows]
session.execute('DELETE FROM test WHERE id in (%s)' % ', '.join(ids))
return list(rows)
def insert_all_rows(self, session):
self.insert_some_rows(session, self.rows)
def insert_some_rows(self, session, rows):
for row in rows:
session.execute("INSERT INTO test (id, val1, group) VALUES (%s, '%s', '%s')" % row)
def test_delete_insert_search(self):
cluster = self.cluster
cluster.populate([2, 2]).start()
node1 = cluster.nodelist()[0]
session = self.patient_cql_connection(node1)
session.consistency_level = 'LOCAL_QUORUM'
self.create_ddl(session)
# Create 1000 rows:
self.insert_all_rows(session)
# Delete all of group2:
deleted = self.delete_group_rows(session, 'group2')
# Put that group back:
self.insert_some_rows(session, rows=deleted)
# Verify that all of group2 is back, 20 times, in parallel
# querying across all nodes:
class ThreadedQuery(threading.Thread):
def __init__(self, connection):
threading.Thread.__init__(self)
self.connection = connection
def run(self):
session = self.connection
query = SimpleStatement("SELECT * FROM delete_insert_search_test.test WHERE group = 'group2'",
consistency_level=ConsistencyLevel.LOCAL_QUORUM)
rows = session.execute(query)
assert len(list(rows)) == len(deleted), "Expecting the length of {} to be equal to the " \
"length of {}.".format(list(rows), deleted)
threads = []
for x in range(20):
conn = self.cql_connection(random.choice(cluster.nodelist()))
threads.append(ThreadedQuery(conn))
for t in threads:
t.start()
for t in threads:
t.join()