blob: bb0e269ff771aeeedc9622b4c0e3cce4ef3dff6e [file] [log] [blame]
package org.apache.cassandra.cql3;
import static org.junit.Assert.assertEquals;
import static org.junit.Assert.assertTrue;
import static org.junit.Assert.fail;
import java.nio.ByteBuffer;
import java.util.ArrayList;
import java.util.Arrays;
import java.util.Comparator;
import java.util.HashMap;
import java.util.Iterator;
import java.util.List;
import java.util.Map;
import java.util.concurrent.TimeUnit;
import java.util.stream.Collectors;
import org.apache.cassandra.concurrent.SEPExecutor;
import org.apache.cassandra.concurrent.Stage;
import org.apache.cassandra.concurrent.StageManager;
import org.apache.cassandra.db.ColumnFamilyStore;
import org.apache.cassandra.db.Keyspace;
import org.apache.cassandra.db.compaction.CompactionManager;
import org.apache.cassandra.utils.FBUtilities;
import org.junit.After;
import org.junit.Before;
import org.junit.BeforeClass;
import org.junit.Ignore;
import org.junit.Test;
import com.google.common.base.Objects;
public class ViewComplexTest extends CQLTester
{
int protocolVersion = 4;
private final List<String> views = new ArrayList<>();
@BeforeClass
public static void startup()
{
requireNetwork();
}
@Before
public void begin()
{
views.clear();
}
@After
public void end() throws Throwable
{
for (String viewName : views)
executeNet(protocolVersion, "DROP MATERIALIZED VIEW " + viewName);
}
private void createView(String name, String query) throws Throwable
{
executeNet(protocolVersion, String.format(query, name));
// If exception is thrown, the view will not be added to the list; since it shouldn't have been created, this is
// the desired behavior
views.add(name);
}
private void updateView(String query, Object... params) throws Throwable
{
updateViewWithFlush(query, false, params);
}
private void updateViewWithFlush(String query, boolean flush, Object... params) throws Throwable
{
executeNet(protocolVersion, query, params);
while (!(((SEPExecutor) StageManager.getStage(Stage.VIEW_MUTATION)).getPendingTasks() == 0
&& ((SEPExecutor) StageManager.getStage(Stage.VIEW_MUTATION)).getActiveCount() == 0))
{
Thread.sleep(1);
}
if (flush)
Keyspace.open(keyspace()).flush();
}
// for now, unselected column cannot be fully supported, SEE CASSANDRA-13826
@Ignore
@Test
public void testPartialDeleteUnselectedColumn() throws Throwable
{
boolean flush = true;
execute("USE " + keyspace());
executeNet(protocolVersion, "USE " + keyspace());
createTable("CREATE TABLE %s (k int, c int, a int, b int, PRIMARY KEY (k, c))");
createView("mv",
"CREATE MATERIALIZED VIEW %s AS SELECT k,c FROM %%s WHERE k IS NOT NULL AND c IS NOT NULL PRIMARY KEY (k,c)");
Keyspace ks = Keyspace.open(keyspace());
ks.getColumnFamilyStore("mv").disableAutoCompaction();
updateView("UPDATE %s USING TIMESTAMP 10 SET b=1 WHERE k=1 AND c=1");
if (flush)
FBUtilities.waitOnFutures(ks.flush());
assertRows(execute("SELECT * from %s"), row(1, 1, null, 1));
assertRows(execute("SELECT * from mv"), row(1, 1));
updateView("DELETE b FROM %s USING TIMESTAMP 11 WHERE k=1 AND c=1");
if (flush)
FBUtilities.waitOnFutures(ks.flush());
assertEmpty(execute("SELECT * from %s"));
assertEmpty(execute("SELECT * from mv"));
updateView("UPDATE %s USING TIMESTAMP 1 SET a=1 WHERE k=1 AND c=1");
if (flush)
FBUtilities.waitOnFutures(ks.flush());
assertRows(execute("SELECT * from %s"), row(1, 1, 1, null));
assertRows(execute("SELECT * from mv"), row(1, 1));
execute("truncate %s;");
// removal generated by unselected column should not shadow PK update with smaller timestamp
updateViewWithFlush("UPDATE %s USING TIMESTAMP 18 SET a=1 WHERE k=1 AND c=1", flush);
assertRows(execute("SELECT * from %s"), row(1, 1, 1, null));
assertRows(execute("SELECT * from mv"), row(1, 1));
updateViewWithFlush("UPDATE %s USING TIMESTAMP 20 SET a=null WHERE k=1 AND c=1", flush);
assertRows(execute("SELECT * from %s"));
assertRows(execute("SELECT * from mv"));
updateViewWithFlush("INSERT INTO %s(k,c) VALUES(1,1) USING TIMESTAMP 15", flush);
assertRows(execute("SELECT * from %s"), row(1, 1, null, null));
assertRows(execute("SELECT * from mv"), row(1, 1));
}
@Test
public void testPartialDeleteSelectedColumnWithFlush() throws Throwable
{
testPartialDeleteSelectedColumn(true);
}
@Test
public void testPartialDeleteSelectedColumnWithoutFlush() throws Throwable
{
testPartialDeleteSelectedColumn(false);
}
private void testPartialDeleteSelectedColumn(boolean flush) throws Throwable
{
execute("USE " + keyspace());
executeNet(protocolVersion, "USE " + keyspace());
createTable("CREATE TABLE %s (k int, c int, a int, b int, e int, f int, PRIMARY KEY (k, c))");
createView("mv",
"CREATE MATERIALIZED VIEW %s AS SELECT a, b FROM %%s WHERE k IS NOT NULL AND c IS NOT NULL PRIMARY KEY (k,c)");
Keyspace ks = Keyspace.open(keyspace());
ks.getColumnFamilyStore("mv").disableAutoCompaction();
updateViewWithFlush("UPDATE %s USING TIMESTAMP 10 SET b=1 WHERE k=1 AND c=1", flush);
assertRows(execute("SELECT * from %s"), row(1, 1, null, 1, null, null));
assertRows(execute("SELECT * from mv"), row(1, 1, null, 1));
updateViewWithFlush("DELETE b FROM %s USING TIMESTAMP 11 WHERE k=1 AND c=1", flush);
assertEmpty(execute("SELECT * from %s"));
assertEmpty(execute("SELECT * from mv"));
updateViewWithFlush("UPDATE %s USING TIMESTAMP 1 SET a=1 WHERE k=1 AND c=1", flush);
assertRows(execute("SELECT * from %s"), row(1, 1, 1, null, null, null));
assertRows(execute("SELECT * from mv"), row(1, 1, 1, null));
updateViewWithFlush("DELETE a FROM %s USING TIMESTAMP 1 WHERE k=1 AND c=1", flush);
assertEmpty(execute("SELECT * from %s"));
assertEmpty(execute("SELECT * from mv"));
// view livenessInfo should not be affected by selected column ts or tb
updateViewWithFlush("INSERT INTO %s(k,c) VALUES(1,1) USING TIMESTAMP 0", flush);
assertRows(execute("SELECT * from %s"), row(1, 1, null, null, null, null));
assertRows(execute("SELECT * from mv"), row(1, 1, null, null));
updateViewWithFlush("UPDATE %s USING TIMESTAMP 12 SET b=1 WHERE k=1 AND c=1", flush);
assertRows(execute("SELECT * from %s"), row(1, 1, null, 1, null, null));
assertRows(execute("SELECT * from mv"), row(1, 1, null, 1));
updateViewWithFlush("DELETE b FROM %s USING TIMESTAMP 13 WHERE k=1 AND c=1", flush);
assertRows(execute("SELECT * from %s"), row(1, 1, null, null, null, null));
assertRows(execute("SELECT * from mv"), row(1, 1, null, null));
updateViewWithFlush("DELETE FROM %s USING TIMESTAMP 14 WHERE k=1 AND c=1", flush);
assertEmpty(execute("SELECT * from %s"));
assertEmpty(execute("SELECT * from mv"));
updateViewWithFlush("INSERT INTO %s(k,c) VALUES(1,1) USING TIMESTAMP 15", flush);
assertRows(execute("SELECT * from %s"), row(1, 1, null, null, null, null));
assertRows(execute("SELECT * from mv"), row(1, 1, null, null));
updateViewWithFlush("UPDATE %s USING TTL 3 SET b=1 WHERE k=1 AND c=1", flush);
assertRows(execute("SELECT * from %s"), row(1, 1, null, 1, null, null));
assertRows(execute("SELECT * from mv"), row(1, 1, null, 1));
TimeUnit.SECONDS.sleep(4);
assertRows(execute("SELECT * from %s"), row(1, 1, null, null, null, null));
assertRows(execute("SELECT * from mv"), row(1, 1, null, null));
updateViewWithFlush("DELETE FROM %s USING TIMESTAMP 15 WHERE k=1 AND c=1", flush);
assertEmpty(execute("SELECT * from %s"));
assertEmpty(execute("SELECT * from mv"));
execute("truncate %s;");
// removal generated by unselected column should not shadow selected column with smaller timestamp
updateViewWithFlush("UPDATE %s USING TIMESTAMP 18 SET e=1 WHERE k=1 AND c=1", flush);
assertRows(execute("SELECT * from %s"), row(1, 1, null, null, 1, null));
assertRows(execute("SELECT * from mv"), row(1, 1, null, null));
updateViewWithFlush("UPDATE %s USING TIMESTAMP 18 SET e=null WHERE k=1 AND c=1", flush);
assertRows(execute("SELECT * from %s"));
assertRows(execute("SELECT * from mv"));
updateViewWithFlush("UPDATE %s USING TIMESTAMP 16 SET a=1 WHERE k=1 AND c=1", flush);
assertRows(execute("SELECT * from %s"), row(1, 1, 1, null, null, null));
assertRows(execute("SELECT * from mv"), row(1, 1, 1, null));
}
@Test
public void testUpdateColumnInViewPKWithTTLWithFlush() throws Throwable
{
// CASSANDRA-13657
testUpdateColumnInViewPKWithTTL(true);
}
@Test
public void testUpdateColumnInViewPKWithTTLWithoutFlush() throws Throwable
{
// CASSANDRA-13657
testUpdateColumnInViewPKWithTTL(false);
}
private void testUpdateColumnInViewPKWithTTL(boolean flush) throws Throwable
{
// CASSANDRA-13657 if base column used in view pk is ttled, then view row is considered dead
createTable("create table %s (k int primary key, a int, b int)");
execute("USE " + keyspace());
executeNet(protocolVersion, "USE " + keyspace());
Keyspace ks = Keyspace.open(keyspace());
createView("mv",
"CREATE MATERIALIZED VIEW %s AS SELECT * FROM %%s WHERE k IS NOT NULL AND a IS NOT NULL PRIMARY KEY (a, k)");
ks.getColumnFamilyStore("mv").disableAutoCompaction();
updateView("UPDATE %s SET a = 1 WHERE k = 1;");
if (flush)
FBUtilities.waitOnFutures(ks.flush());
assertRows(execute("SELECT * from %s"), row(1, 1, null));
assertRows(execute("SELECT * from mv"), row(1, 1, null));
updateView("DELETE a FROM %s WHERE k = 1");
if (flush)
FBUtilities.waitOnFutures(ks.flush());
assertRows(execute("SELECT * from %s"));
assertEmpty(execute("SELECT * from mv"));
updateView("INSERT INTO %s (k) VALUES (1);");
if (flush)
FBUtilities.waitOnFutures(ks.flush());
assertRows(execute("SELECT * from %s"), row(1, null, null));
assertEmpty(execute("SELECT * from mv"));
updateView("UPDATE %s USING TTL 5 SET a = 10 WHERE k = 1;");
if (flush)
FBUtilities.waitOnFutures(ks.flush());
assertRows(execute("SELECT * from %s"), row(1, 10, null));
assertRows(execute("SELECT * from mv"), row(10, 1, null));
updateView("UPDATE %s SET b = 100 WHERE k = 1;");
if (flush)
FBUtilities.waitOnFutures(ks.flush());
assertRows(execute("SELECT * from %s"), row(1, 10, 100));
assertRows(execute("SELECT * from mv"), row(10, 1, 100));
Thread.sleep(5000);
// 'a' is TTL of 5 and removed.
assertRows(execute("SELECT * from %s"), row(1, null, 100));
assertEmpty(execute("SELECT * from mv"));
assertEmpty(execute("SELECT * from mv WHERE k = ? AND a = ?", 1, 10));
updateView("DELETE b FROM %s WHERE k=1");
if (flush)
FBUtilities.waitOnFutures(ks.flush());
assertRows(execute("SELECT * from %s"), row(1, null, null));
assertEmpty(execute("SELECT * from mv"));
updateView("DELETE FROM %s WHERE k=1;");
if (flush)
FBUtilities.waitOnFutures(ks.flush());
assertEmpty(execute("SELECT * from %s"));
assertEmpty(execute("SELECT * from mv"));
}
@Test
public void testUpdateColumnNotInViewWithFlush() throws Throwable
{
testUpdateColumnNotInView(true);
}
@Test
public void testUpdateColumnNotInViewWithoutFlush() throws Throwable
{
// CASSANDRA-13127
testUpdateColumnNotInView(false);
}
private void testUpdateColumnNotInView(boolean flush) throws Throwable
{
// CASSANDRA-13127: if base column not selected in view are alive, then pk of view row should be alive
String baseTable = createTable("create table %s (p int, c int, v1 int, v2 int, primary key(p, c))");
execute("USE " + keyspace());
executeNet(protocolVersion, "USE " + keyspace());
Keyspace ks = Keyspace.open(keyspace());
createView("mv",
"CREATE MATERIALIZED VIEW %s AS SELECT p, c FROM %%s WHERE p IS NOT NULL AND c IS NOT NULL PRIMARY KEY (c, p);");
ks.getColumnFamilyStore("mv").disableAutoCompaction();
updateView("UPDATE %s USING TIMESTAMP 0 SET v1 = 1 WHERE p = 0 AND c = 0");
if (flush)
FBUtilities.waitOnFutures(ks.flush());
assertRowsIgnoringOrder(execute("SELECT * from %s WHERE c = ? AND p = ?", 0, 0), row(0, 0, 1, null));
assertRowsIgnoringOrder(execute("SELECT * from mv WHERE c = ? AND p = ?", 0, 0), row(0, 0));
updateView("DELETE v1 FROM %s USING TIMESTAMP 1 WHERE p = 0 AND c = 0");
if (flush)
FBUtilities.waitOnFutures(ks.flush());
assertEmpty(execute("SELECT * from %s WHERE c = ? AND p = ?", 0, 0));
assertEmpty(execute("SELECT * from mv WHERE c = ? AND p = ?", 0, 0));
// shadowed by tombstone
updateView("UPDATE %s USING TIMESTAMP 1 SET v1 = 1 WHERE p = 0 AND c = 0");
if (flush)
FBUtilities.waitOnFutures(ks.flush());
assertEmpty(execute("SELECT * from %s WHERE c = ? AND p = ?", 0, 0));
assertEmpty(execute("SELECT * from mv WHERE c = ? AND p = ?", 0, 0));
updateView("UPDATE %s USING TIMESTAMP 2 SET v2 = 1 WHERE p = 0 AND c = 0");
if (flush)
FBUtilities.waitOnFutures(ks.flush());
assertRowsIgnoringOrder(execute("SELECT * from %s WHERE c = ? AND p = ?", 0, 0), row(0, 0, null, 1));
assertRowsIgnoringOrder(execute("SELECT * from mv WHERE c = ? AND p = ?", 0, 0), row(0, 0));
updateView("DELETE v1 FROM %s USING TIMESTAMP 3 WHERE p = 0 AND c = 0");
if (flush)
FBUtilities.waitOnFutures(ks.flush());
assertRowsIgnoringOrder(execute("SELECT * from %s WHERE c = ? AND p = ?", 0, 0), row(0, 0, null, 1));
assertRowsIgnoringOrder(execute("SELECT * from mv WHERE c = ? AND p = ?", 0, 0), row(0, 0));
updateView("DELETE v2 FROM %s USING TIMESTAMP 4 WHERE p = 0 AND c = 0");
if (flush)
FBUtilities.waitOnFutures(ks.flush());
assertEmpty(execute("SELECT * from %s WHERE c = ? AND p = ?", 0, 0));
assertEmpty(execute("SELECT * from mv WHERE c = ? AND p = ?", 0, 0));
updateView("UPDATE %s USING TTL 3 SET v2 = 1 WHERE p = 0 AND c = 0");
if (flush)
FBUtilities.waitOnFutures(ks.flush());
assertRowsIgnoringOrder(execute("SELECT * from %s WHERE c = ? AND p = ?", 0, 0), row(0, 0, null, 1));
assertRowsIgnoringOrder(execute("SELECT * from mv WHERE c = ? AND p = ?", 0, 0), row(0, 0));
Thread.sleep(TimeUnit.SECONDS.toMillis(3));
assertRowsIgnoringOrder(execute("SELECT * from %s WHERE c = ? AND p = ?", 0, 0));
assertRowsIgnoringOrder(execute("SELECT * from mv WHERE c = ? AND p = ?", 0, 0));
updateView("UPDATE %s SET v2 = 1 WHERE p = 0 AND c = 0");
if (flush)
FBUtilities.waitOnFutures(ks.flush());
assertRowsIgnoringOrder(execute("SELECT * from %s WHERE c = ? AND p = ?", 0, 0), row(0, 0, null, 1));
assertRowsIgnoringOrder(execute("SELECT * from mv WHERE c = ? AND p = ?", 0, 0), row(0, 0));
assertInvalidMessage(String.format("Cannot drop column v2 on base table %s with materialized views.", baseTable), "ALTER TABLE %s DROP v2");
// // drop unselected base column, unselected metadata should be removed, thus view row is dead
// updateView("ALTER TABLE %s DROP v2");
// assertRowsIgnoringOrder(execute("SELECT * from %s WHERE c = ? AND p = ?", 0, 0));
// assertRowsIgnoringOrder(execute("SELECT * from mv WHERE c = ? AND p = ?", 0, 0));
// assertRowsIgnoringOrder(execute("SELECT * from %s"));
// assertRowsIgnoringOrder(execute("SELECT * from mv"));
}
@Test
public void testPartialUpdateWithUnselectedCollectionsWithFlush() throws Throwable
{
testPartialUpdateWithUnselectedCollections(true);
}
@Test
public void testPartialUpdateWithUnselectedCollectionsWithoutFlush() throws Throwable
{
testPartialUpdateWithUnselectedCollections(false);
}
public void testPartialUpdateWithUnselectedCollections(boolean flush) throws Throwable
{
execute("USE " + keyspace());
executeNet(protocolVersion, "USE " + keyspace());
String baseTable = createTable("CREATE TABLE %s (k int, c int, a int, b int, l list<int>, s set<int>, m map<int,int>, PRIMARY KEY (k, c))");
createView("mv",
"CREATE MATERIALIZED VIEW %s AS SELECT a, b FROM %%s WHERE k IS NOT NULL AND c IS NOT NULL PRIMARY KEY (c, k)");
Keyspace ks = Keyspace.open(keyspace());
ks.getColumnFamilyStore("mv").disableAutoCompaction();
updateView("UPDATE %s SET l=l+[1,2,3] WHERE k = 1 AND c = 1");
if (flush)
FBUtilities.waitOnFutures(ks.flush());
assertRows(execute("SELECT * from mv"), row(1, 1, null, null));
updateView("UPDATE %s SET l=l-[1,2] WHERE k = 1 AND c = 1");
if (flush)
FBUtilities.waitOnFutures(ks.flush());
assertRows(execute("SELECT * from mv"), row(1, 1, null, null));
updateView("UPDATE %s SET b=3 WHERE k=1 AND c=1");
if (flush)
FBUtilities.waitOnFutures(ks.flush());
assertRows(execute("SELECT * from mv"), row(1, 1, null, 3));
updateView("UPDATE %s SET b=null, l=l-[3], s=s-{3} WHERE k = 1 AND c = 1");
if (flush)
{
FBUtilities.waitOnFutures(ks.flush());
ks.getColumnFamilyStore("mv").forceMajorCompaction();
}
assertRowsIgnoringOrder(execute("SELECT k,c,a,b from %s"));
assertRowsIgnoringOrder(execute("SELECT * from mv"));
updateView("UPDATE %s SET m=m+{3:3}, l=l-[1], s=s-{2} WHERE k = 1 AND c = 1");
if (flush)
FBUtilities.waitOnFutures(ks.flush());
assertRowsIgnoringOrder(execute("SELECT k,c,a,b from %s"), row(1, 1, null, null));
assertRowsIgnoringOrder(execute("SELECT * from mv"), row(1, 1, null, null));
assertInvalidMessage(String.format("Cannot drop column m on base table %s with materialized views.", baseTable), "ALTER TABLE %s DROP m");
// executeNet(protocolVersion, "ALTER TABLE %s DROP m");
// ks.getColumnFamilyStore("mv").forceMajorCompaction();
// assertRowsIgnoringOrder(execute("SELECT k,c,a,b from %s WHERE k = 1 AND c = 1"));
// assertRowsIgnoringOrder(execute("SELECT * from mv WHERE k = 1 AND c = 1"));
// assertRowsIgnoringOrder(execute("SELECT k,c,a,b from %s"));
// assertRowsIgnoringOrder(execute("SELECT * from mv"));
}
@Test
public void testUnselectedColumnsTTLWithFlush() throws Throwable
{
// CASSANDRA-13127
testUnselectedColumnsTTL(true);
}
@Test
public void testUnselectedColumnsTTLWithoutFlush() throws Throwable
{
// CASSANDRA-13127
testUnselectedColumnsTTL(false);
}
private void testUnselectedColumnsTTL(boolean flush) throws Throwable
{
// CASSANDRA-13127 not ttled unselected column in base should keep view row alive
createTable("create table %s (p int, c int, v int, primary key(p, c))");
execute("USE " + keyspace());
executeNet(protocolVersion, "USE " + keyspace());
Keyspace ks = Keyspace.open(keyspace());
createView("mv",
"CREATE MATERIALIZED VIEW %s AS SELECT p, c FROM %%s WHERE p IS NOT NULL AND c IS NOT NULL PRIMARY KEY (c, p);");
ks.getColumnFamilyStore("mv").disableAutoCompaction();
updateViewWithFlush("INSERT INTO %s (p, c) VALUES (0, 0) USING TTL 3;", flush);
updateViewWithFlush("UPDATE %s USING TTL 1000 SET v = 0 WHERE p = 0 and c = 0;", flush);
assertRowsIgnoringOrder(execute("SELECT * from mv WHERE c = ? AND p = ?", 0, 0), row(0, 0));
Thread.sleep(3000);
UntypedResultSet.Row row = execute("SELECT v, ttl(v) from %s WHERE c = ? AND p = ?", 0, 0).one();
assertTrue("row should have value of 0", row.getInt("v") == 0);
assertTrue("row should have ttl less than 1000", row.getInt("ttl(v)") < 1000);
assertRowsIgnoringOrder(execute("SELECT * from mv WHERE c = ? AND p = ?", 0, 0), row(0, 0));
updateViewWithFlush("DELETE FROM %s WHERE p = 0 and c = 0;", flush);
assertRowsIgnoringOrder(execute("SELECT * from mv WHERE c = ? AND p = ?", 0, 0));
updateViewWithFlush("INSERT INTO %s (p, c) VALUES (0, 0) ", flush);
assertRowsIgnoringOrder(execute("SELECT * from mv WHERE c = ? AND p = ?", 0, 0), row(0, 0));
// already have a live row, no need to apply the unselected cell ttl
updateViewWithFlush("UPDATE %s USING TTL 3 SET v = 0 WHERE p = 0 and c = 0;", flush);
assertRowsIgnoringOrder(execute("SELECT * from mv WHERE c = ? AND p = ?", 0, 0), row(0, 0));
updateViewWithFlush("INSERT INTO %s (p, c) VALUES (1, 1) USING TTL 3", flush);
assertRowsIgnoringOrder(execute("SELECT * from mv WHERE c = ? AND p = ?", 1, 1), row(1, 1));
Thread.sleep(4000);
assertRowsIgnoringOrder(execute("SELECT * from mv WHERE c = ? AND p = ?", 0, 0), row(0, 0));
assertRowsIgnoringOrder(execute("SELECT * from mv WHERE c = ? AND p = ?", 1, 1));
// unselected should keep view row alive
updateViewWithFlush("UPDATE %s SET v = 0 WHERE p = 1 and c = 1;", flush);
assertRowsIgnoringOrder(execute("SELECT * from mv WHERE c = ? AND p = ?", 1, 1), row(1, 1));
}
@Test
public void testRangeDeletionWithFlush() throws Throwable
{
testRangeDeletion(true);
}
@Test
public void testRangeDeletionWithoutFlush() throws Throwable
{
testRangeDeletion(false);
}
public void testRangeDeletion(boolean flush) throws Throwable
{
// for partition range deletion, need to know that existing row is shadowed instead of not existed.
createTable("CREATE TABLE %s (a int, b int, c int, d int, PRIMARY KEY (a))");
execute("USE " + keyspace());
executeNet(protocolVersion, "USE " + keyspace());
createView("mv_test1",
"CREATE MATERIALIZED VIEW %s AS SELECT * FROM %%s WHERE a IS NOT NULL AND b IS NOT NULL PRIMARY KEY (a, b)");
Keyspace ks = Keyspace.open(keyspace());
ks.getColumnFamilyStore("mv_test1").disableAutoCompaction();
execute("INSERT INTO %s (a, b, c, d) VALUES (?, ?, ?, ?) using timestamp 0", 1, 1, 1, 1);
if (flush)
FBUtilities.waitOnFutures(ks.flush());
assertRowsIgnoringOrder(execute("SELECT * FROM mv_test1"), row(1, 1, 1, 1));
// remove view row
updateView("UPDATE %s using timestamp 1 set b = null WHERE a=1");
if (flush)
FBUtilities.waitOnFutures(ks.flush());
assertRowsIgnoringOrder(execute("SELECT * FROM mv_test1"));
// remove base row, no view updated generated.
updateView("DELETE FROM %s using timestamp 2 where a=1");
if (flush)
FBUtilities.waitOnFutures(ks.flush());
assertRowsIgnoringOrder(execute("SELECT * FROM mv_test1"));
// restor view row with b,c column. d is still tombstone
updateView("UPDATE %s using timestamp 3 set b = 1,c = 1 where a=1"); // upsert
if (flush)
FBUtilities.waitOnFutures(ks.flush());
assertRowsIgnoringOrder(execute("SELECT * FROM mv_test1"), row(1, 1, 1, null));
}
@Test
public void testBaseTTLWithSameTimestampTest() throws Throwable
{
// CASSANDRA-13127 when liveness timestamp tie, greater localDeletionTime should win if both are expiring.
createTable("create table %s (p int, c int, v int, primary key(p, c))");
execute("USE " + keyspace());
executeNet(protocolVersion, "USE " + keyspace());
Keyspace ks = Keyspace.open(keyspace());
updateView("INSERT INTO %s (p, c, v) VALUES (0, 0, 0) using timestamp 1;");
FBUtilities.waitOnFutures(ks.flush());
updateView("INSERT INTO %s (p, c, v) VALUES (0, 0, 0) USING TTL 3 and timestamp 1;");
FBUtilities.waitOnFutures(ks.flush());
Thread.sleep(4000);
assertEmpty(execute("SELECT * from %s WHERE c = ? AND p = ?", 0, 0));
// reversed order
execute("truncate %s;");
updateView("INSERT INTO %s (p, c, v) VALUES (0, 0, 0) USING TTL 3 and timestamp 1;");
FBUtilities.waitOnFutures(ks.flush());
updateView("INSERT INTO %s (p, c, v) VALUES (0, 0, 0) USING timestamp 1;");
FBUtilities.waitOnFutures(ks.flush());
Thread.sleep(4000);
assertEmpty(execute("SELECT * from %s WHERE c = ? AND p = ?", 0, 0));
}
@Test
public void testCommutativeRowDeletionFlush() throws Throwable
{
// CASSANDRA-13409
testCommutativeRowDeletion(true);
}
@Test
public void testCommutativeRowDeletionWithoutFlush() throws Throwable
{
// CASSANDRA-13409
testCommutativeRowDeletion(false);
}
private void testCommutativeRowDeletion(boolean flush) throws Throwable
{
// CASSANDRA-13409 new update should not resurrect previous deleted data in view
createTable("create table %s (p int primary key, v1 int, v2 int)");
execute("USE " + keyspace());
executeNet(protocolVersion, "USE " + keyspace());
Keyspace ks = Keyspace.open(keyspace());
createView("mv",
"create materialized view %s as select * from %%s where p is not null and v1 is not null primary key (v1, p);");
ks.getColumnFamilyStore("mv").disableAutoCompaction();
// sstable-1, Set initial values TS=1
updateView("Insert into %s (p, v1, v2) values (3, 1, 3) using timestamp 1;");
if (flush)
FBUtilities.waitOnFutures(ks.flush());
assertRowsIgnoringOrder(execute("SELECT v2, WRITETIME(v2) from mv WHERE v1 = ? AND p = ?", 1, 3), row(3, 1L));
// sstable-2
updateView("Delete from %s using timestamp 2 where p = 3;");
if (flush)
FBUtilities.waitOnFutures(ks.flush());
assertRowsIgnoringOrder(execute("SELECT v1, p, v2, WRITETIME(v2) from mv"));
// sstable-3
updateView("Insert into %s (p, v1) values (3, 1) using timestamp 3;");
if (flush)
FBUtilities.waitOnFutures(ks.flush());
assertRowsIgnoringOrder(execute("SELECT v1, p, v2, WRITETIME(v2) from mv"), row(1, 3, null, null));
// sstable-4
updateView("UPdate %s using timestamp 4 set v1 = 2 where p = 3;");
if (flush)
FBUtilities.waitOnFutures(ks.flush());
assertRowsIgnoringOrder(execute("SELECT v1, p, v2, WRITETIME(v2) from mv"), row(2, 3, null, null));
// sstable-5
updateView("UPdate %s using timestamp 5 set v1 = 1 where p = 3;");
if (flush)
FBUtilities.waitOnFutures(ks.flush());
assertRowsIgnoringOrder(execute("SELECT v1, p, v2, WRITETIME(v2) from mv"), row(1, 3, null, null));
if (flush)
{
// compact sstable 2 and 4, 5;
ColumnFamilyStore cfs = ks.getColumnFamilyStore("mv");
List<String> sstables = cfs.getLiveSSTables()
.stream()
.sorted((s1, s2) -> s1.descriptor.generation - s2.descriptor.generation)
.map(s -> s.getFilename())
.collect(Collectors.toList());
String dataFiles = String.join(",", Arrays.asList(sstables.get(1), sstables.get(3), sstables.get(4)));
CompactionManager.instance.forceUserDefinedCompaction(dataFiles);
assertEquals(3, cfs.getLiveSSTables().size());
}
// regular tombstone should be retained after compaction
assertRowsIgnoringOrder(execute("SELECT v1, p, v2, WRITETIME(v2) from mv"), row(1, 3, null, null));
}
@Test
public void testUnselectedColumnWithExpiredLivenessInfo() throws Throwable
{
boolean flush = true;
createTable("create table %s (k int, c int, a int, b int, PRIMARY KEY(k, c))");
execute("USE " + keyspace());
executeNet(protocolVersion, "USE " + keyspace());
Keyspace ks = Keyspace.open(keyspace());
createView("mv",
"create materialized view %s as select k,c,b from %%s where c is not null and k is not null primary key (c, k);");
ks.getColumnFamilyStore("mv").disableAutoCompaction();
// sstable-1, Set initial values TS=1
updateViewWithFlush("UPDATE %s SET a = 1 WHERE k = 1 AND c = 1;", flush);
assertRowsIgnoringOrder(execute("SELECT * from %s WHERE k = 1 AND c = 1;"),
row(1, 1, 1, null));
assertRowsIgnoringOrder(execute("SELECT k,c,b from mv WHERE k = 1 AND c = 1;"),
row(1, 1, null));
// sstable-2
updateViewWithFlush("INSERT INTO %s(k,c) VALUES(1,1) USING TTL 5", flush);
assertRowsIgnoringOrder(execute("SELECT * from %s WHERE k = 1 AND c = 1;"),
row(1, 1, 1, null));
assertRowsIgnoringOrder(execute("SELECT k,c,b from mv WHERE k = 1 AND c = 1;"),
row(1, 1, null));
Thread.sleep(5001);
assertRowsIgnoringOrder(execute("SELECT * from %s WHERE k = 1 AND c = 1;"),
row(1, 1, 1, null));
assertRowsIgnoringOrder(execute("SELECT k,c,b from mv WHERE k = 1 AND c = 1;"),
row(1, 1, null));
// sstable-3
updateViewWithFlush("Update %s set a = null where k = 1 AND c = 1;", flush);
assertRowsIgnoringOrder(execute("SELECT * from %s WHERE k = 1 AND c = 1;"));
assertRowsIgnoringOrder(execute("SELECT k,c,b from mv WHERE k = 1 AND c = 1;"));
// sstable-4
updateViewWithFlush("Update %s USING TIMESTAMP 1 set b = 1 where k = 1 AND c = 1;", flush);
assertRowsIgnoringOrder(execute("SELECT * from %s WHERE k = 1 AND c = 1;"),
row(1, 1, null, 1));
assertRowsIgnoringOrder(execute("SELECT k,c,b from mv WHERE k = 1 AND c = 1;"),
row(1, 1, 1));
}
@Test
public void testUpdateWithColumnTimestampSmallerThanPkWithFlush() throws Throwable
{
testUpdateWithColumnTimestampSmallerThanPk(true);
}
@Test
public void testUpdateWithColumnTimestampSmallerThanPkWithoutFlush() throws Throwable
{
testUpdateWithColumnTimestampSmallerThanPk(false);
}
public void testUpdateWithColumnTimestampSmallerThanPk(boolean flush) throws Throwable
{
createTable("create table %s (p int primary key, v1 int, v2 int)");
execute("USE " + keyspace());
executeNet(protocolVersion, "USE " + keyspace());
Keyspace ks = Keyspace.open(keyspace());
createView("mv",
"create materialized view %s as select * from %%s where p is not null and v1 is not null primary key (v1, p);");
ks.getColumnFamilyStore("mv").disableAutoCompaction();
// reset value
updateView("Insert into %s (p, v1, v2) values (3, 1, 3) using timestamp 6;");
if (flush)
FBUtilities.waitOnFutures(ks.flush());
assertRowsIgnoringOrder(execute("SELECT v1, p, v2, WRITETIME(v2) from mv"), row(1, 3, 3, 6L));
// increase pk's timestamp to 20
updateView("Insert into %s (p) values (3) using timestamp 20;");
if (flush)
FBUtilities.waitOnFutures(ks.flush());
assertRowsIgnoringOrder(execute("SELECT v1, p, v2, WRITETIME(v2) from mv"), row(1, 3, 3, 6L));
// change v1's to 2 and remove existing view row with ts7
updateView("UPdate %s using timestamp 7 set v1 = 2 where p = 3;");
if (flush)
FBUtilities.waitOnFutures(ks.flush());
assertRowsIgnoringOrder(execute("SELECT v1, p, v2, WRITETIME(v2) from mv"), row(2, 3, 3, 6L));
assertRowsIgnoringOrder(execute("SELECT v1, p, v2, WRITETIME(v2) from mv limit 1"), row(2, 3, 3, 6L));
// change v1's to 1 and remove existing view row with ts8
updateView("UPdate %s using timestamp 8 set v1 = 1 where p = 3;");
if (flush)
FBUtilities.waitOnFutures(ks.flush());
assertRowsIgnoringOrder(execute("SELECT v1, p, v2, WRITETIME(v2) from mv"), row(1, 3, 3, 6L));
}
@Test
public void testExpiredLivenessLimitWithFlush() throws Throwable
{
// CASSANDRA-13883
testExpiredLivenessLimit(true);
}
@Test
public void testExpiredLivenessLimitWithoutFlush() throws Throwable
{
// CASSANDRA-13883
testExpiredLivenessLimit(false);
}
private void testExpiredLivenessLimit(boolean flush) throws Throwable
{
createTable("CREATE TABLE %s (k int PRIMARY KEY, a int, b int);");
execute("USE " + keyspace());
executeNet(protocolVersion, "USE " + keyspace());
Keyspace ks = Keyspace.open(keyspace());
createView("mv1", "CREATE MATERIALIZED VIEW %s AS SELECT * FROM %%s WHERE k IS NOT NULL AND a IS NOT NULL PRIMARY KEY (k, a);");
createView("mv2", "CREATE MATERIALIZED VIEW %s AS SELECT * FROM %%s WHERE k IS NOT NULL AND a IS NOT NULL PRIMARY KEY (a, k);");
ks.getColumnFamilyStore("mv1").disableAutoCompaction();
ks.getColumnFamilyStore("mv2").disableAutoCompaction();
for (int i = 1; i <= 100; i++)
updateView("INSERT INTO %s(k, a, b) VALUES (?, ?, ?);", i, i, i);
for (int i = 1; i <= 100; i++)
{
if (i % 50 == 0)
continue;
// create expired liveness
updateView("DELETE a FROM %s WHERE k = ?;", i);
}
if (flush)
{
ks.getColumnFamilyStore("mv1").forceBlockingFlush();
ks.getColumnFamilyStore("mv2").forceBlockingFlush();
}
for (String view : Arrays.asList("mv1", "mv2"))
{
// paging
assertEquals(1, executeNetWithPaging(String.format("SELECT k,a,b FROM %s limit 1", view), 1).all().size());
assertEquals(2, executeNetWithPaging(String.format("SELECT k,a,b FROM %s limit 2", view), 1).all().size());
assertEquals(2, executeNetWithPaging(String.format("SELECT k,a,b FROM %s", view), 1).all().size());
assertRowsNet(executeNetWithPaging(String.format("SELECT k,a,b FROM %s ", view), 1),
row(50, 50, 50),
row(100, 100, 100));
// limit
assertEquals(1, execute(String.format("SELECT k,a,b FROM %s limit 1", view)).size());
assertRowsIgnoringOrder(execute(String.format("SELECT k,a,b FROM %s limit 2", view)),
row(50, 50, 50),
row(100, 100, 100));
}
}
@Test
public void testUpdateWithColumnTimestampBiggerThanPkWithFlush() throws Throwable
{
// CASSANDRA-11500
testUpdateWithColumnTimestampBiggerThanPk(true);
}
@Test
public void testUpdateWithColumnTimestampBiggerThanPkWithoutFlush() throws Throwable
{
// CASSANDRA-11500
testUpdateWithColumnTimestampBiggerThanPk(false);
}
public void testUpdateWithColumnTimestampBiggerThanPk(boolean flush) throws Throwable
{
// CASSANDRA-11500 able to shadow old view row with column ts greater tahn pk's ts and re-insert the view row
String baseTable = createTable("CREATE TABLE %s (k int PRIMARY KEY, a int, b int);");
execute("USE " + keyspace());
executeNet(protocolVersion, "USE " + keyspace());
Keyspace ks = Keyspace.open(keyspace());
createView("mv",
"CREATE MATERIALIZED VIEW %s AS SELECT * FROM %%s WHERE k IS NOT NULL AND a IS NOT NULL PRIMARY KEY (k, a);");
ks.getColumnFamilyStore("mv").disableAutoCompaction();
updateView("DELETE FROM %s USING TIMESTAMP 0 WHERE k = 1;");
if (flush)
FBUtilities.waitOnFutures(ks.flush());
// sstable-1, Set initial values TS=1
updateView("INSERT INTO %s(k, a, b) VALUES (1, 1, 1) USING TIMESTAMP 1;");
if (flush)
FBUtilities.waitOnFutures(ks.flush());
assertRowsIgnoringOrder(execute("SELECT k,a,b from mv"), row(1, 1, 1));
updateView("UPDATE %s USING TIMESTAMP 10 SET b = 2 WHERE k = 1;");
assertRowsIgnoringOrder(execute("SELECT k,a,b from mv"), row(1, 1, 2));
if (flush)
FBUtilities.waitOnFutures(ks.flush());
assertRowsIgnoringOrder(execute("SELECT k,a,b from mv"), row(1, 1, 2));
updateView("UPDATE %s USING TIMESTAMP 2 SET a = 2 WHERE k = 1;");
assertRowsIgnoringOrder(execute("SELECT k,a,b from mv"), row(1, 2, 2));
if (flush)
FBUtilities.waitOnFutures(ks.flush());
ks.getColumnFamilyStore("mv").forceMajorCompaction();
assertRowsIgnoringOrder(execute("SELECT k,a,b from mv"), row(1, 2, 2));
assertRowsIgnoringOrder(execute("SELECT k,a,b from mv limit 1"), row(1, 2, 2));
updateView("UPDATE %s USING TIMESTAMP 11 SET a = 1 WHERE k = 1;");
if (flush)
FBUtilities.waitOnFutures(ks.flush());
assertRowsIgnoringOrder(execute("SELECT k,a,b from mv"), row(1, 1, 2));
assertRowsIgnoringOrder(execute("SELECT k,a,b from %s"), row(1, 1, 2));
// set non-key base column as tombstone, view row is removed with shadowable
updateView("UPDATE %s USING TIMESTAMP 12 SET a = null WHERE k = 1;");
if (flush)
FBUtilities.waitOnFutures(ks.flush());
assertRowsIgnoringOrder(execute("SELECT k,a,b from mv"));
assertRowsIgnoringOrder(execute("SELECT k,a,b from %s"), row(1, null, 2));
// column b should be alive
updateView("UPDATE %s USING TIMESTAMP 13 SET a = 1 WHERE k = 1;");
if (flush)
FBUtilities.waitOnFutures(ks.flush());
assertRowsIgnoringOrder(execute("SELECT k,a,b from mv"), row(1, 1, 2));
assertRowsIgnoringOrder(execute("SELECT k,a,b from %s"), row(1, 1, 2));
assertInvalidMessage(String.format("Cannot drop column a on base table %s with materialized views.", baseTable), "ALTER TABLE %s DROP a");
}
@Test
public void testNonBaseColumnInViewPkWithFlush() throws Throwable
{
testNonBaseColumnInViewPk(true);
}
@Test
public void testNonBaseColumnInViewPkWithoutFlush() throws Throwable
{
testNonBaseColumnInViewPk(true);
}
public void testNonBaseColumnInViewPk(boolean flush) throws Throwable
{
createTable("create table %s (p1 int, p2 int, v1 int, v2 int, primary key (p1,p2))");
execute("USE " + keyspace());
executeNet(protocolVersion, "USE " + keyspace());
Keyspace ks = Keyspace.open(keyspace());
createView("mv",
"create materialized view %s as select * from %%s where p1 is not null and p2 is not null primary key (p2, p1)"
+ " with gc_grace_seconds=5;");
ColumnFamilyStore cfs = ks.getColumnFamilyStore("mv");
cfs.disableAutoCompaction();
updateView("UPDATE %s USING TIMESTAMP 1 set v1 =1 where p1 = 1 AND p2 = 1;");
if (flush)
FBUtilities.waitOnFutures(ks.flush());
assertRowsIgnoringOrder(execute("SELECT p1, p2, v1, v2 from %s"), row(1, 1, 1, null));
assertRowsIgnoringOrder(execute("SELECT p1, p2, v1, v2 from mv"), row(1, 1, 1, null));
updateView("UPDATE %s USING TIMESTAMP 2 set v1 = null, v2 = 1 where p1 = 1 AND p2 = 1;");
if (flush)
FBUtilities.waitOnFutures(ks.flush());
assertRowsIgnoringOrder(execute("SELECT p1, p2, v1, v2 from %s"), row(1, 1, null, 1));
assertRowsIgnoringOrder(execute("SELECT p1, p2, v1, v2 from mv"), row(1, 1, null, 1));
updateView("UPDATE %s USING TIMESTAMP 2 set v2 = null where p1 = 1 AND p2 = 1;");
if (flush)
FBUtilities.waitOnFutures(ks.flush());
assertRowsIgnoringOrder(execute("SELECT p1, p2, v1, v2 from %s"));
assertRowsIgnoringOrder(execute("SELECT p1, p2, v1, v2 from mv"));
updateView("INSERT INTO %s (p1,p2) VALUES(1,1) USING TIMESTAMP 3;");
if (flush)
FBUtilities.waitOnFutures(ks.flush());
assertRowsIgnoringOrder(execute("SELECT p1, p2, v1, v2 from %s"), row(1, 1, null, null));
assertRowsIgnoringOrder(execute("SELECT p1, p2, v1, v2 from mv"), row(1, 1, null, null));
updateView("DELETE FROM %s USING TIMESTAMP 4 WHERE p1 =1 AND p2 = 1;");
if (flush)
FBUtilities.waitOnFutures(ks.flush());
assertRowsIgnoringOrder(execute("SELECT p1, p2, v1, v2 from %s"));
assertRowsIgnoringOrder(execute("SELECT p1, p2, v1, v2 from mv"));
updateView("UPDATE %s USING TIMESTAMP 5 set v2 = 1 where p1 = 1 AND p2 = 1;");
if (flush)
FBUtilities.waitOnFutures(ks.flush());
assertRowsIgnoringOrder(execute("SELECT p1, p2, v1, v2 from %s"), row(1, 1, null, 1));
assertRowsIgnoringOrder(execute("SELECT p1, p2, v1, v2 from mv"), row(1, 1, null, 1));
}
@Test
public void testStrictLivenessTombstone() throws Throwable
{
createTable("create table %s (p int primary key, v1 int, v2 int)");
execute("USE " + keyspace());
executeNet(protocolVersion, "USE " + keyspace());
Keyspace ks = Keyspace.open(keyspace());
createView("mv",
"create materialized view %s as select * from %%s where p is not null and v1 is not null primary key (v1, p)"
+ " with gc_grace_seconds=5;");
ColumnFamilyStore cfs = ks.getColumnFamilyStore("mv");
cfs.disableAutoCompaction();
updateView("Insert into %s (p, v1, v2) values (1, 1, 1) ;");
assertRowsIgnoringOrder(execute("SELECT p, v1, v2 from mv"), row(1, 1, 1));
updateView("Update %s set v1 = null WHERE p = 1");
FBUtilities.waitOnFutures(ks.flush());
assertRowsIgnoringOrder(execute("SELECT p, v1, v2 from mv"));
cfs.forceMajorCompaction(); // before gc grace second, strict-liveness tombstoned dead row remains
assertEquals(1, cfs.getLiveSSTables().size());
Thread.sleep(6000);
assertEquals(1, cfs.getLiveSSTables().size()); // no auto compaction.
cfs.forceMajorCompaction(); // after gc grace second, no data left
assertEquals(0, cfs.getLiveSSTables().size());
updateView("Update %s using ttl 5 set v1 = 1 WHERE p = 1");
FBUtilities.waitOnFutures(ks.flush());
assertRowsIgnoringOrder(execute("SELECT p, v1, v2 from mv"), row(1, 1, 1));
cfs.forceMajorCompaction(); // before ttl+gc_grace_second, strict-liveness ttled dead row remains
assertEquals(1, cfs.getLiveSSTables().size());
assertRowsIgnoringOrder(execute("SELECT p, v1, v2 from mv"), row(1, 1, 1));
Thread.sleep(5500); // after expired, before gc_grace_second
cfs.forceMajorCompaction();// before ttl+gc_grace_second, strict-liveness ttled dead row remains
assertEquals(1, cfs.getLiveSSTables().size());
assertRowsIgnoringOrder(execute("SELECT p, v1, v2 from mv"));
Thread.sleep(5500); // after expired + gc_grace_second
assertEquals(1, cfs.getLiveSSTables().size()); // no auto compaction.
cfs.forceMajorCompaction(); // after gc grace second, no data left
assertEquals(0, cfs.getLiveSSTables().size());
}
@Test
public void testCellTombstoneAndShadowableTombstonesWithFlush() throws Throwable
{
testCellTombstoneAndShadowableTombstones(true);
}
@Test
public void testCellTombstoneAndShadowableTombstonesWithoutFlush() throws Throwable
{
testCellTombstoneAndShadowableTombstones(false);
}
private void testCellTombstoneAndShadowableTombstones(boolean flush) throws Throwable
{
createTable("create table %s (p int primary key, v1 int, v2 int)");
execute("USE " + keyspace());
executeNet(protocolVersion, "USE " + keyspace());
Keyspace ks = Keyspace.open(keyspace());
createView("mv",
"create materialized view %s as select * from %%s where p is not null and v1 is not null primary key (v1, p);");
ks.getColumnFamilyStore("mv").disableAutoCompaction();
// sstable 1, Set initial values TS=1
updateView("Insert into %s (p, v1, v2) values (3, 1, 3) using timestamp 1;");
if (flush)
FBUtilities.waitOnFutures(ks.flush());
assertRowsIgnoringOrder(execute("SELECT v2, WRITETIME(v2) from mv WHERE v1 = ? AND p = ?", 1, 3), row(3, 1L));
// sstable 2
updateView("UPdate %s using timestamp 2 set v2 = null where p = 3");
if (flush)
FBUtilities.waitOnFutures(ks.flush());
assertRowsIgnoringOrder(execute("SELECT v2, WRITETIME(v2) from mv WHERE v1 = ? AND p = ?", 1, 3),
row(null, null));
// sstable 3
updateView("UPdate %s using timestamp 3 set v1 = 2 where p = 3");
if (flush)
FBUtilities.waitOnFutures(ks.flush());
assertRowsIgnoringOrder(execute("SELECT v1, p, v2, WRITETIME(v2) from mv"), row(2, 3, null, null));
// sstable 4
updateView("UPdate %s using timestamp 4 set v1 = 1 where p = 3");
if (flush)
FBUtilities.waitOnFutures(ks.flush());
assertRowsIgnoringOrder(execute("SELECT v1, p, v2, WRITETIME(v2) from mv"), row(1, 3, null, null));
if (flush)
{
// compact sstable 2 and 3;
ColumnFamilyStore cfs = ks.getColumnFamilyStore("mv");
List<String> sstables = cfs.getLiveSSTables()
.stream()
.sorted(Comparator.comparingInt(s -> s.descriptor.generation))
.map(s -> s.getFilename())
.collect(Collectors.toList());
String dataFiles = String.join(",", Arrays.asList(sstables.get(1), sstables.get(2)));
CompactionManager.instance.forceUserDefinedCompaction(dataFiles);
}
// cell-tombstone in sstable 4 is not compacted away, because the shadowable tombstone is shadowed by new row.
assertRowsIgnoringOrder(execute("SELECT v1, p, v2, WRITETIME(v2) from mv"), row(1, 3, null, null));
assertRowsIgnoringOrder(execute("SELECT v1, p, v2, WRITETIME(v2) from mv limit 1"), row(1, 3, null, null));
}
@Test
public void complexTimestampDeletionTestWithFlush() throws Throwable
{
complexTimestampWithbaseNonPKColumnsInViewPKDeletionTest(true);
complexTimestampWithbasePKColumnsInViewPKDeletionTest(true);
}
@Test
public void complexTimestampDeletionTestWithoutFlush() throws Throwable
{
complexTimestampWithbaseNonPKColumnsInViewPKDeletionTest(false);
complexTimestampWithbasePKColumnsInViewPKDeletionTest(false);
}
private void complexTimestampWithbasePKColumnsInViewPKDeletionTest(boolean flush) throws Throwable
{
createTable("create table %s (p1 int, p2 int, v1 int, v2 int, primary key(p1, p2))");
execute("USE " + keyspace());
executeNet(protocolVersion, "USE " + keyspace());
Keyspace ks = Keyspace.open(keyspace());
createView("mv2",
"create materialized view %s as select * from %%s where p1 is not null and p2 is not null primary key (p2, p1);");
ks.getColumnFamilyStore("mv2").disableAutoCompaction();
// Set initial values TS=1
updateView("Insert into %s (p1, p2, v1, v2) values (1, 2, 3, 4) using timestamp 1;");
if (flush)
FBUtilities.waitOnFutures(ks.flush());
assertRowsIgnoringOrder(execute("SELECT v1, v2, WRITETIME(v2) from mv2 WHERE p1 = ? AND p2 = ?", 1, 2),
row(3, 4, 1L));
// remove row/mv TS=2
updateView("Delete from %s using timestamp 2 where p1 = 1 and p2 = 2;");
if (flush)
FBUtilities.waitOnFutures(ks.flush());
// view are empty
assertRowsIgnoringOrder(execute("SELECT * from mv2"));
// insert PK with TS=3
updateView("Insert into %s (p1, p2) values (1, 2) using timestamp 3;");
if (flush)
FBUtilities.waitOnFutures(ks.flush());
// deleted column in MV remained dead
assertRowsIgnoringOrder(execute("SELECT * from mv2"), row(2, 1, null, null));
ks.getColumnFamilyStore("mv2").forceMajorCompaction();
assertRowsIgnoringOrder(execute("SELECT * from mv2"), row(2, 1, null, null));
// reset values
updateView("Insert into %s (p1, p2, v1, v2) values (1, 2, 3, 4) using timestamp 10;");
if (flush)
FBUtilities.waitOnFutures(ks.flush());
assertRowsIgnoringOrder(execute("SELECT v1, v2, WRITETIME(v2) from mv2 WHERE p1 = ? AND p2 = ?", 1, 2),
row(3, 4, 10L));
updateView("UPDATE %s using timestamp 20 SET v2 = 5 WHERE p1 = 1 and p2 = 2");
if (flush)
FBUtilities.waitOnFutures(ks.flush());
assertRowsIgnoringOrder(execute("SELECT v1, v2, WRITETIME(v2) from mv2 WHERE p1 = ? AND p2 = ?", 1, 2),
row(3, 5, 20L));
updateView("DELETE FROM %s using timestamp 10 WHERE p1 = 1 and p2 = 2");
if (flush)
FBUtilities.waitOnFutures(ks.flush());
assertRowsIgnoringOrder(execute("SELECT v1, v2, WRITETIME(v2) from mv2 WHERE p1 = ? AND p2 = ?", 1, 2),
row(null, 5, 20L));
}
public void complexTimestampWithbaseNonPKColumnsInViewPKDeletionTest(boolean flush) throws Throwable
{
createTable("create table %s (p int primary key, v1 int, v2 int)");
execute("USE " + keyspace());
executeNet(protocolVersion, "USE " + keyspace());
Keyspace ks = Keyspace.open(keyspace());
createView("mv",
"create materialized view %s as select * from %%s where p is not null and v1 is not null primary key (v1, p);");
ks.getColumnFamilyStore("mv").disableAutoCompaction();
// Set initial values TS=1
updateView("Insert into %s (p, v1, v2) values (3, 1, 5) using timestamp 1;");
if (flush)
FBUtilities.waitOnFutures(ks.flush());
assertRowsIgnoringOrder(execute("SELECT v2, WRITETIME(v2) from mv WHERE v1 = ? AND p = ?", 1, 3), row(5, 1L));
// remove row/mv TS=2
updateView("Delete from %s using timestamp 2 where p = 3;");
if (flush)
FBUtilities.waitOnFutures(ks.flush());
// view are empty
assertRowsIgnoringOrder(execute("SELECT * from mv"));
// insert PK with TS=3
updateView("Insert into %s (p, v1) values (3, 1) using timestamp 3;");
if (flush)
FBUtilities.waitOnFutures(ks.flush());
// deleted column in MV remained dead
assertRowsIgnoringOrder(execute("SELECT * from mv"), row(1, 3, null));
// insert values TS=2, it should be considered dead due to previous tombstone
updateView("Insert into %s (p, v1, v2) values (3, 1, 5) using timestamp 2;");
if (flush)
FBUtilities.waitOnFutures(ks.flush());
// deleted column in MV remained dead
assertRowsIgnoringOrder(execute("SELECT * from mv"), row(1, 3, null));
assertRowsIgnoringOrder(execute("SELECT * from mv limit 1"), row(1, 3, null));
// insert values TS=2, it should be considered dead due to previous tombstone
executeNet(protocolVersion, "UPDATE %s USING TIMESTAMP 3 SET v2 = ? WHERE p = ?", 4, 3);
if (flush)
FBUtilities.waitOnFutures(ks.flush());
assertRows(execute("SELECT v1, p, v2, WRITETIME(v2) from mv"), row(1, 3, 4, 3L));
ks.getColumnFamilyStore("mv").forceMajorCompaction();
assertRows(execute("SELECT v1, p, v2, WRITETIME(v2) from mv"), row(1, 3, 4, 3L));
assertRows(execute("SELECT v1, p, v2, WRITETIME(v2) from mv limit 1"), row(1, 3, 4, 3L));
}
@Test
public void testMVWithDifferentColumnsWithFlush() throws Throwable
{
testMVWithDifferentColumns(true);
}
@Test
public void testMVWithDifferentColumnsWithoutFlush() throws Throwable
{
testMVWithDifferentColumns(false);
}
private void testMVWithDifferentColumns(boolean flush) throws Throwable
{
createTable("CREATE TABLE %s (a int, b int, c int, d int, e int, f int, PRIMARY KEY(a, b))");
execute("USE " + keyspace());
executeNet(protocolVersion, "USE " + keyspace());
List<String> viewNames = new ArrayList<>();
List<String> mvStatements = Arrays.asList(
// all selected
"CREATE MATERIALIZED VIEW %s AS SELECT * FROM %%s WHERE a IS NOT NULL AND b IS NOT NULL PRIMARY KEY (a,b)",
// unselected e,f
"CREATE MATERIALIZED VIEW %s AS SELECT c,d FROM %%s WHERE a IS NOT NULL AND b IS NOT NULL PRIMARY KEY (a,b)",
// no selected
"CREATE MATERIALIZED VIEW %s AS SELECT a,b FROM %%s WHERE a IS NOT NULL AND b IS NOT NULL PRIMARY KEY (a,b)",
// all selected, re-order keys
"CREATE MATERIALIZED VIEW %s AS SELECT * FROM %%s WHERE a IS NOT NULL AND b IS NOT NULL PRIMARY KEY (b,a)",
// unselected e,f, re-order keys
"CREATE MATERIALIZED VIEW %s AS SELECT a,b,c,d FROM %%s WHERE a IS NOT NULL AND b IS NOT NULL PRIMARY KEY (b,a)",
// no selected, re-order keys
"CREATE MATERIALIZED VIEW %s AS SELECT a,b FROM %%s WHERE a IS NOT NULL AND b IS NOT NULL PRIMARY KEY (b,a)");
Keyspace ks = Keyspace.open(keyspace());
for (int i = 0; i < mvStatements.size(); i++)
{
String name = "mv" + i;
viewNames.add(name);
createView(name, mvStatements.get(i));
ks.getColumnFamilyStore(name).disableAutoCompaction();
}
// insert
updateViewWithFlush("INSERT INTO %s (a,b,c,d,e,f) VALUES(1,1,1,1,1,1) using timestamp 1", flush);
assertBaseViews(row(1, 1, 1, 1, 1, 1), viewNames);
updateViewWithFlush("UPDATE %s using timestamp 2 SET c=0, d=0 WHERE a=1 AND b=1", flush);
assertBaseViews(row(1, 1, 0, 0, 1, 1), viewNames);
updateViewWithFlush("UPDATE %s using timestamp 2 SET e=0, f=0 WHERE a=1 AND b=1", flush);
assertBaseViews(row(1, 1, 0, 0, 0, 0), viewNames);
updateViewWithFlush("DELETE FROM %s using timestamp 2 WHERE a=1 AND b=1", flush);
assertBaseViews(null, viewNames);
// partial update unselected, selected
updateViewWithFlush("UPDATE %s using timestamp 3 SET f=1 WHERE a=1 AND b=1", flush);
assertBaseViews(row(1, 1, null, null, null, 1), viewNames);
updateViewWithFlush("UPDATE %s using timestamp 4 SET e = 1, f=null WHERE a=1 AND b=1", flush);
assertBaseViews(row(1, 1, null, null, 1, null), viewNames);
updateViewWithFlush("UPDATE %s using timestamp 4 SET e = null WHERE a=1 AND b=1", flush);
assertBaseViews(null, viewNames);
updateViewWithFlush("UPDATE %s using timestamp 5 SET c = 1 WHERE a=1 AND b=1", flush);
assertBaseViews(row(1, 1, 1, null, null, null), viewNames);
updateViewWithFlush("UPDATE %s using timestamp 5 SET c = null WHERE a=1 AND b=1", flush);
assertBaseViews(null, viewNames);
updateViewWithFlush("UPDATE %s using timestamp 6 SET d = 1 WHERE a=1 AND b=1", flush);
assertBaseViews(row(1, 1, null, 1, null, null), viewNames);
updateViewWithFlush("UPDATE %s using timestamp 7 SET d = null WHERE a=1 AND b=1", flush);
assertBaseViews(null, viewNames);
updateViewWithFlush("UPDATE %s using timestamp 8 SET f = 1 WHERE a=1 AND b=1", flush);
assertBaseViews(row(1, 1, null, null, null, 1), viewNames);
updateViewWithFlush("UPDATE %s using timestamp 6 SET c = 1 WHERE a=1 AND b=1", flush);
assertBaseViews(row(1, 1, 1, null, null, 1), viewNames);
// view row still alive due to c=1@6
updateViewWithFlush("UPDATE %s using timestamp 8 SET f = null WHERE a=1 AND b=1", flush);
assertBaseViews(row(1, 1, 1, null, null, null), viewNames);
updateViewWithFlush("UPDATE %s using timestamp 6 SET c = null WHERE a=1 AND b=1", flush);
assertBaseViews(null, viewNames);
}
private void assertBaseViews(Object[] row, List<String> viewNames) throws Throwable
{
UntypedResultSet result = execute("SELECT * FROM %s");
if (row == null)
assertRowsIgnoringOrder(result);
else
assertRowsIgnoringOrder(result, row);
for (int i = 0; i < viewNames.size(); i++)
assertBaseView(result, execute(String.format("SELECT * FROM %s", viewNames.get(i))), viewNames.get(i));
}
private void assertBaseView(UntypedResultSet base, UntypedResultSet view, String mv)
{
List<ColumnSpecification> baseMeta = base.metadata();
List<ColumnSpecification> viewMeta = view.metadata();
Iterator<UntypedResultSet.Row> iter = base.iterator();
Iterator<UntypedResultSet.Row> viewIter = view.iterator();
List<UntypedResultSet.Row> baseData = com.google.common.collect.Lists.newArrayList(iter);
List<UntypedResultSet.Row> viewData = com.google.common.collect.Lists.newArrayList(viewIter);
if (baseData.size() != viewData.size())
fail(String.format("Mismatch number of rows in view %s: <%s>, in base <%s>",
mv,
makeRowStrings(view),
makeRowStrings(base)));
if (baseData.size() == 0)
return;
if (viewData.size() != 1)
fail(String.format("Expect only one row in view %s, but got <%s>",
mv,
makeRowStrings(view)));
UntypedResultSet.Row row = baseData.get(0);
UntypedResultSet.Row viewRow = viewData.get(0);
Map<String, ByteBuffer> baseValues = new HashMap<>();
for (int j = 0; j < baseMeta.size(); j++)
{
ColumnSpecification column = baseMeta.get(j);
ByteBuffer actualValue = row.getBytes(column.name.toString());
baseValues.put(column.name.toString(), actualValue);
}
for (int j = 0; j < viewMeta.size(); j++)
{
ColumnSpecification column = viewMeta.get(j);
String name = column.name.toString();
ByteBuffer viewValue = viewRow.getBytes(name);
if (!baseValues.containsKey(name))
{
fail(String.format("Extra column: %s with value %s in view", name, column.type.compose(viewValue)));
}
else if (!Objects.equal(baseValues.get(name), viewValue))
{
fail(String.format("Non equal column: %s, expected <%s> but got <%s>",
name,
column.type.compose(baseValues.get(name)),
column.type.compose(viewValue)));
}
}
}
}