blob: 9808c966fd562284da2f39a34a90bc9325acdc75 [file] [log] [blame]
/*
* Licensed to the Apache Software Foundation (ASF) under one
* or more contributor license agreements. See the NOTICE file
* distributed with this work for additional information
* regarding copyright ownership. The ASF licenses this file
* to you under the Apache License, Version 2.0 (the
* "License"); you may not use this file except in compliance
* with the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/
package org.apache.cassandra.cql3.validation.entities;
import java.util.Collection;
import java.util.HashMap;
import java.util.Map;
import com.google.common.collect.ImmutableList;
import org.junit.BeforeClass;
import org.junit.Test;
import org.apache.cassandra.cql3.CQLTester;
import org.apache.cassandra.db.Mutation;
import org.apache.cassandra.db.marshal.Int32Type;
import org.apache.cassandra.db.marshal.LongType;
import org.apache.cassandra.db.marshal.UTF8Type;
import org.apache.cassandra.db.partitions.Partition;
import org.apache.cassandra.db.partitions.PartitionUpdate;
import org.apache.cassandra.db.virtual.AbstractVirtualTable;
import org.apache.cassandra.db.virtual.SimpleDataSet;
import org.apache.cassandra.db.virtual.VirtualKeyspace;
import org.apache.cassandra.db.virtual.VirtualKeyspaceRegistry;
import org.apache.cassandra.db.virtual.VirtualTable;
import org.apache.cassandra.schema.ColumnMetadata;
import org.apache.cassandra.schema.TableMetadata;
import org.apache.cassandra.service.StorageService;
import org.apache.cassandra.service.StorageServiceMBean;
import org.apache.cassandra.triggers.ITrigger;
import static org.junit.Assert.assertEquals;
import static org.junit.Assert.fail;
public class VirtualTableTest extends CQLTester
{
private static final String KS_NAME = "test_virtual_ks";
private static final String VT1_NAME = "vt1";
private static final String VT2_NAME = "vt2";
private static final String VT3_NAME = "vt3";
private static class WritableVirtualTable extends AbstractVirtualTable
{
private final ColumnMetadata valueColumn;
private final Map<String, Integer> backingMap = new HashMap<>();
WritableVirtualTable(String keyspaceName, String tableName)
{
super(TableMetadata.builder(keyspaceName, tableName)
.kind(TableMetadata.Kind.VIRTUAL)
.addPartitionKeyColumn("key", UTF8Type.instance)
.addRegularColumn("value", Int32Type.instance)
.build());
valueColumn = metadata().regularColumns().getSimple(0);
}
@Override
public DataSet data()
{
SimpleDataSet data = new SimpleDataSet(metadata());
backingMap.forEach((key, value) -> data.row(key).column("value", value));
return data;
}
@Override
public void apply(PartitionUpdate update)
{
String key = (String) metadata().partitionKeyType.compose(update.partitionKey().getKey());
update.forEach(row ->
{
Integer value = Int32Type.instance.compose(row.getCell(valueColumn).buffer());
backingMap.put(key, value);
});
}
}
@BeforeClass
public static void setUpClass()
{
TableMetadata vt1Metadata =
TableMetadata.builder(KS_NAME, VT1_NAME)
.kind(TableMetadata.Kind.VIRTUAL)
.addPartitionKeyColumn("pk", UTF8Type.instance)
.addClusteringColumn("c", UTF8Type.instance)
.addRegularColumn("v1", Int32Type.instance)
.addRegularColumn("v2", LongType.instance)
.build();
SimpleDataSet vt1data = new SimpleDataSet(vt1Metadata);
vt1data.row("pk1", "c1").column("v1", 11).column("v2", 11L)
.row("pk2", "c1").column("v1", 21).column("v2", 21L)
.row("pk1", "c2").column("v1", 12).column("v2", 12L)
.row("pk2", "c2").column("v1", 22).column("v2", 22L)
.row("pk1", "c3").column("v1", 13).column("v2", 13L)
.row("pk2", "c3").column("v1", 23).column("v2", 23L);
VirtualTable vt1 = new AbstractVirtualTable(vt1Metadata)
{
public DataSet data()
{
return vt1data;
}
};
VirtualTable vt2 = new WritableVirtualTable(KS_NAME, VT2_NAME);
TableMetadata vt3Metadata =
TableMetadata.builder(KS_NAME, VT3_NAME)
.kind(TableMetadata.Kind.VIRTUAL)
.addPartitionKeyColumn("pk1", UTF8Type.instance)
.addPartitionKeyColumn("pk2", UTF8Type.instance)
.addClusteringColumn("ck1", UTF8Type.instance)
.addClusteringColumn("ck2", UTF8Type.instance)
.addRegularColumn("v1", Int32Type.instance)
.addRegularColumn("v2", LongType.instance)
.build();
SimpleDataSet vt3data = new SimpleDataSet(vt3Metadata);
vt3data.row("pk11", "pk11", "ck11", "ck11").column("v1", 1111).column("v2", 1111L)
.row("pk11", "pk11", "ck22", "ck22").column("v1", 1122).column("v2", 1122L);
VirtualTable vt3 = new AbstractVirtualTable(vt3Metadata)
{
public DataSet data()
{
return vt3data;
}
};
VirtualKeyspaceRegistry.instance.register(new VirtualKeyspace(KS_NAME, ImmutableList.of(vt1, vt2, vt3)));
CQLTester.setUpClass();
}
@Test
public void testQueries() throws Throwable
{
assertRowsNet(executeNet("SELECT * FROM test_virtual_ks.vt1 WHERE pk = 'UNKNOWN'"));
assertRowsNet(executeNet("SELECT * FROM test_virtual_ks.vt1 WHERE pk = 'pk1' AND c = 'UNKNOWN'"));
// Test DISTINCT query
assertRowsNet(executeNet("SELECT DISTINCT pk FROM test_virtual_ks.vt1"),
row("pk1"),
row("pk2"));
assertRowsNet(executeNet("SELECT DISTINCT pk FROM test_virtual_ks.vt1 WHERE token(pk) > token('pk1')"),
row("pk2"));
// Test single partition queries
assertRowsNet(executeNet("SELECT v1, v2 FROM test_virtual_ks.vt1 WHERE pk = 'pk1' AND c = 'c1'"),
row(11, 11L));
assertRowsNet(executeNet("SELECT c, v1, v2 FROM test_virtual_ks.vt1 WHERE pk = 'pk1' AND c IN ('c1', 'c2')"),
row("c1", 11, 11L),
row("c2", 12, 12L));
assertRowsNet(executeNet("SELECT c, v1, v2 FROM test_virtual_ks.vt1 WHERE pk = 'pk1' AND c IN ('c2', 'c1') ORDER BY c DESC"),
row("c2", 12, 12L),
row("c1", 11, 11L));
// Test multi-partition queries
assertRows(execute("SELECT * FROM test_virtual_ks.vt1 WHERE pk IN ('pk2', 'pk1') AND c IN ('c2', 'c1')"),
row("pk1", "c1", 11, 11L),
row("pk1", "c2", 12, 12L),
row("pk2", "c1", 21, 21L),
row("pk2", "c2", 22, 22L));
assertRows(execute("SELECT pk, c, v1 FROM test_virtual_ks.vt1 WHERE pk IN ('pk2', 'pk1') AND c IN ('c2', 'c1') ORDER BY c DESC"),
row("pk1", "c2", 12),
row("pk2", "c2", 22),
row("pk1", "c1", 11),
row("pk2", "c1", 21));
assertRows(execute("SELECT pk, c, v1 FROM test_virtual_ks.vt1 WHERE pk IN ('pk2', 'pk1') AND c IN ('c2', 'c1') ORDER BY c DESC LIMIT 1"),
row("pk1", "c2", 12));
assertRows(execute("SELECT c, v1, v2 FROM test_virtual_ks.vt1 WHERE pk IN ('pk2', 'pk1') AND c IN ('c2', 'c1' , 'c3') ORDER BY c DESC PER PARTITION LIMIT 1"),
row("c3", 13, 13L),
row("c3", 23, 23L));
assertRows(execute("SELECT count(*) FROM test_virtual_ks.vt1 WHERE pk IN ('pk2', 'pk1') AND c IN ('c2', 'c1')"),
row(4L));
for (int pageSize = 1; pageSize < 5; pageSize++)
{
assertRowsNet(executeNetWithPaging("SELECT pk, c, v1, v2 FROM test_virtual_ks.vt1 WHERE pk IN ('pk2', 'pk1') AND c IN ('c2', 'c1')", pageSize),
row("pk1", "c1", 11, 11L),
row("pk1", "c2", 12, 12L),
row("pk2", "c1", 21, 21L),
row("pk2", "c2", 22, 22L));
assertRowsNet(executeNetWithPaging("SELECT * FROM test_virtual_ks.vt1 WHERE pk IN ('pk2', 'pk1') AND c IN ('c2', 'c1') LIMIT 2", pageSize),
row("pk1", "c1", 11, 11L),
row("pk1", "c2", 12, 12L));
assertRowsNet(executeNetWithPaging("SELECT count(*) FROM test_virtual_ks.vt1 WHERE pk IN ('pk2', 'pk1') AND c IN ('c2', 'c1')", pageSize),
row(4L));
}
// Test range queries
for (int pageSize = 1; pageSize < 4; pageSize++)
{
assertRowsNet(executeNetWithPaging("SELECT * FROM test_virtual_ks.vt1 WHERE token(pk) < token('pk2') AND c IN ('c2', 'c1') ALLOW FILTERING", pageSize),
row("pk1", "c1", 11, 11L),
row("pk1", "c2", 12, 12L));
assertRowsNet(executeNetWithPaging("SELECT * FROM test_virtual_ks.vt1 WHERE token(pk) < token('pk2') AND c IN ('c2', 'c1') LIMIT 1 ALLOW FILTERING", pageSize),
row("pk1", "c1", 11, 11L));
assertRowsNet(executeNetWithPaging("SELECT * FROM test_virtual_ks.vt1 WHERE token(pk) <= token('pk2') AND c > 'c1' PER PARTITION LIMIT 1 ALLOW FILTERING", pageSize),
row("pk1", "c2", 12, 12L),
row("pk2", "c2", 22, 22L));
assertRowsNet(executeNetWithPaging("SELECT count(*) FROM test_virtual_ks.vt1 WHERE token(pk) = token('pk2') AND c < 'c3' ALLOW FILTERING", pageSize),
row(2L));
}
}
@Test
public void testQueriesOnTableWithMultiplePks() throws Throwable
{
assertRowsNet(executeNet("SELECT * FROM test_virtual_ks.vt3 WHERE pk1 = 'UNKNOWN' AND pk2 = 'UNKNOWN'"));
assertRowsNet(executeNet("SELECT * FROM test_virtual_ks.vt3 WHERE pk1 = 'pk11' AND pk2 = 'pk22' AND ck1 = 'UNKNOWN'"));
// Test DISTINCT query
assertRowsNet(executeNet("SELECT DISTINCT pk1, pk2 FROM test_virtual_ks.vt3"),
row("pk11", "pk11"));
// Test single partition queries
assertRowsNet(executeNet("SELECT v1, v2 FROM test_virtual_ks.vt3 WHERE pk1 = 'pk11' AND pk2 = 'pk11'"),
row(1111, 1111L),
row(1122, 1122L));
}
@Test
public void testModifications() throws Throwable
{
// check for clean state
assertRows(execute("SELECT * FROM test_virtual_ks.vt2"));
// fill the table, test UNLOGGED batch
execute("BEGIN UNLOGGED BATCH " +
"UPDATE test_virtual_ks.vt2 SET value = 1 WHERE key ='pk1';" +
"UPDATE test_virtual_ks.vt2 SET value = 2 WHERE key ='pk2';" +
"UPDATE test_virtual_ks.vt2 SET value = 3 WHERE key ='pk3';" +
"APPLY BATCH");
assertRows(execute("SELECT * FROM test_virtual_ks.vt2"),
row("pk1", 1),
row("pk2", 2),
row("pk3", 3));
// test that LOGGED batches don't allow virtual table updates
assertInvalidMessage("Cannot include a virtual table statement in a logged batch",
"BEGIN BATCH " +
"UPDATE test_virtual_ks.vt2 SET value = 1 WHERE key ='pk1';" +
"UPDATE test_virtual_ks.vt2 SET value = 2 WHERE key ='pk2';" +
"UPDATE test_virtual_ks.vt2 SET value = 3 WHERE key ='pk3';" +
"APPLY BATCH");
// test that UNLOGGED batch doesn't allow mixing updates for regular and virtual tables
createTable("CREATE TABLE %s (key text PRIMARY KEY, value int)");
assertInvalidMessage("Mutations for virtual and regular tables cannot exist in the same batch",
"BEGIN UNLOGGED BATCH " +
"UPDATE test_virtual_ks.vt2 SET value = 1 WHERE key ='pk1'" +
"UPDATE %s SET value = 2 WHERE key ='pk2'" +
"UPDATE test_virtual_ks.vt2 SET value = 3 WHERE key ='pk3'" +
"APPLY BATCH");
// update a single value with UPDATE
execute("UPDATE test_virtual_ks.vt2 SET value = 11 WHERE key ='pk1'");
assertRows(execute("SELECT * FROM test_virtual_ks.vt2 WHERE key = 'pk1'"),
row("pk1", 11));
// update a single value with INSERT
executeNet("INSERT INTO test_virtual_ks.vt2 (key, value) VALUES ('pk2', 22)");
assertRows(execute("SELECT * FROM test_virtual_ks.vt2 WHERE key = 'pk2'"),
row("pk2", 22));
// test that deletions are (currently) rejected
assertInvalidMessage("Virtual tables don't support DELETE statements",
"DELETE FROM test_virtual_ks.vt2 WHERE key ='pk1'");
// test that TTL is (currently) rejected with INSERT and UPDATE
assertInvalidMessage("Expiring columns are not supported by virtual tables",
"INSERT INTO test_virtual_ks.vt2 (key, value) VALUES ('pk1', 11) USING TTL 86400");
assertInvalidMessage("Expiring columns are not supported by virtual tables",
"UPDATE test_virtual_ks.vt2 USING TTL 86400 SET value = 11 WHERE key ='pk1'");
// test that LWT is (currently) rejected with virtual tables in batches
assertInvalidMessage("Conditional BATCH statements cannot include mutations for virtual tables",
"BEGIN UNLOGGED BATCH " +
"UPDATE test_virtual_ks.vt2 SET value = 3 WHERE key ='pk3' IF value = 2;" +
"APPLY BATCH");
// test that LWT is (currently) rejected with virtual tables in UPDATEs
assertInvalidMessage("Conditional updates are not supported by virtual tables",
"UPDATE test_virtual_ks.vt2 SET value = 3 WHERE key ='pk3' IF value = 2");
// test that LWT is (currently) rejected with virtual tables in INSERTs
assertInvalidMessage("Conditional updates are not supported by virtual tables",
"INSERT INTO test_virtual_ks.vt2 (key, value) VALUES ('pk2', 22) IF NOT EXISTS");
}
@Test
public void testInvalidDDLOperations() throws Throwable
{
assertInvalidMessage("Virtual keyspace 'test_virtual_ks' is not user-modifiable",
"DROP KEYSPACE test_virtual_ks");
assertInvalidMessage("Virtual keyspace 'test_virtual_ks' is not user-modifiable",
"ALTER KEYSPACE test_virtual_ks WITH durable_writes = false");
assertInvalidMessage("Virtual keyspace 'test_virtual_ks' is not user-modifiable",
"CREATE TABLE test_virtual_ks.test (id int PRIMARY KEY)");
assertInvalidMessage("Virtual keyspace 'test_virtual_ks' is not user-modifiable",
"CREATE TYPE test_virtual_ks.type (id int)");
assertInvalidMessage("Virtual keyspace 'test_virtual_ks' is not user-modifiable",
"DROP TABLE test_virtual_ks.vt1");
assertInvalidMessage("Virtual keyspace 'test_virtual_ks' is not user-modifiable",
"ALTER TABLE test_virtual_ks.vt1 DROP v1");
assertInvalidMessage("Error during truncate: Cannot truncate virtual tables",
"TRUNCATE TABLE test_virtual_ks.vt1");
assertInvalidMessage("Virtual keyspace 'test_virtual_ks' is not user-modifiable",
"CREATE INDEX ON test_virtual_ks.vt1 (v1)");
assertInvalidMessage("Virtual keyspace 'test_virtual_ks' is not user-modifiable",
"CREATE MATERIALIZED VIEW test_virtual_ks.mvt1 AS SELECT c, v1 FROM test_virtual_ks.vt1 WHERE c IS NOT NULL PRIMARY KEY(c)");
assertInvalidMessage("Virtual keyspace 'test_virtual_ks' is not user-modifiable",
"CREATE TRIGGER test_trigger ON test_virtual_ks.vt1 USING '" + TestTrigger.class.getName() + '\'');
}
/**
* Noop trigger for audit log testing
*/
public static class TestTrigger implements ITrigger
{
public Collection<Mutation> augment(Partition update)
{
return null;
}
}
@Test
public void testMBeansMethods() throws Throwable
{
StorageServiceMBean mbean = StorageService.instance;
assertJMXFails(() -> mbean.forceKeyspaceCompaction(false, KS_NAME));
assertJMXFails(() -> mbean.forceKeyspaceCompaction(false, KS_NAME, VT1_NAME));
assertJMXFails(() -> mbean.scrub(true, true, true, true, 1, KS_NAME));
assertJMXFails(() -> mbean.scrub(true, true, true, true, 1, KS_NAME, VT1_NAME));
assertJMXFails(() -> mbean.verify(true, KS_NAME));
assertJMXFails(() -> mbean.verify(true, KS_NAME, VT1_NAME));
assertJMXFails(() -> mbean.upgradeSSTables(KS_NAME, false, 1));
assertJMXFails(() -> mbean.upgradeSSTables(KS_NAME, false, 1, VT1_NAME));
assertJMXFails(() -> mbean.garbageCollect("ROW", 1, KS_NAME, VT1_NAME));
assertJMXFails(() -> mbean.forceKeyspaceFlush(KS_NAME));
assertJMXFails(() -> mbean.forceKeyspaceFlush(KS_NAME, VT1_NAME));
assertJMXFails(() -> mbean.truncate(KS_NAME, VT1_NAME));
assertJMXFails(() -> mbean.loadNewSSTables(KS_NAME, VT1_NAME));
assertJMXFails(() -> mbean.getAutoCompactionStatus(KS_NAME));
assertJMXFails(() -> mbean.getAutoCompactionStatus(KS_NAME, VT1_NAME));
}
@FunctionalInterface
private static interface ThrowingRunnable
{
public void run() throws Throwable;
}
private void assertJMXFails(ThrowingRunnable r) throws Throwable
{
try
{
r.run();
fail();
}
catch (IllegalArgumentException e)
{
assertEquals("Cannot perform any operations against virtual keyspace " + KS_NAME, e.getMessage());
}
}
}