blob: 3b55dcf27d3bb4316126802f03a30320c87068f3 [file] [log] [blame]
/*
* Licensed to the Apache Software Foundation (ASF) under one
* or more contributor license agreements. See the NOTICE file
* distributed with this work for additional information
* regarding copyright ownership. The ASF licenses this file
* to you under the Apache License, Version 2.0 (the
* "License"); you may not use this file except in compliance
* with the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/
package org.apache.cassandra.distributed.test;
import java.io.IOException;
import java.net.InetAddress;
import java.util.Arrays;
import java.util.Collections;
import java.util.List;
import java.util.UUID;
import java.util.concurrent.TimeUnit;
import java.util.concurrent.atomic.AtomicInteger;
import java.util.regex.Pattern;
import java.util.stream.Collectors;
import org.awaitility.Awaitility;
import org.junit.After;
import org.junit.AfterClass;
import org.junit.Assert;
import org.junit.Before;
import org.junit.BeforeClass;
import org.junit.Test;
import org.apache.cassandra.distributed.Cluster;
import org.apache.cassandra.distributed.api.ConsistencyLevel;
import org.apache.cassandra.utils.TimeUUID;
public class SecondaryIndexTest extends TestBaseImpl
{
private static final int NUM_NODES = 3;
private static final int REPLICATION_FACTOR = 1;
private static final String CREATE_TABLE = "CREATE TABLE %s(k int, v int, PRIMARY KEY (k))";
private static final String CREATE_INDEX = "CREATE INDEX v_index_%d ON %s(v)";
private static final AtomicInteger seq = new AtomicInteger();
private static String tableName;
private static Cluster cluster;
@BeforeClass
public static void setupCluster() throws IOException
{
cluster = init(Cluster.build(NUM_NODES).start(), REPLICATION_FACTOR);
}
@AfterClass
public static void teardownCluster()
{
if (cluster != null)
cluster.close();
}
@Before
public void before()
{
tableName = String.format("%s.t_%d", KEYSPACE, seq.getAndIncrement());
cluster.schemaChange(String.format(CREATE_TABLE, tableName));
cluster.schemaChange(String.format(CREATE_INDEX, seq.get(), tableName));
}
@After
public void after()
{
cluster.schemaChange(String.format("DROP TABLE %s", tableName));
}
@Test
public void test_only_coordinator_chooses_index_for_query()
{
for (int i = 0 ; i < 99 ; ++i)
cluster.coordinator(1).execute(String.format("INSERT INTO %s (k, v) VALUES (?, ?)", tableName), ConsistencyLevel.ALL, i, i/3);
cluster.forEach(i -> i.flush(KEYSPACE));
Pattern indexScanningPattern =
Pattern.compile(String.format("Index mean cardinalities are v_index_%d:[0-9]+. Scanning with v_index_%d.", seq.get(), seq.get()));
for (int i = 0 ; i < 33; ++i)
{
UUID trace = TimeUUID.Generator.nextTimeUUID().asUUID();
Object[][] result = cluster.coordinator(1).executeWithTracing(trace, String.format("SELECT * FROM %s WHERE v = ?", tableName), ConsistencyLevel.ALL, i);
Assert.assertEquals("Failed on iteration " + i, 3, result.length);
Awaitility.await("For all events in the tracing session to persist")
.pollInterval(100, TimeUnit.MILLISECONDS)
.atMost(10, TimeUnit.SECONDS)
.untilAsserted(() ->
{
Object[][] traces = cluster.coordinator(1)
.execute("SELECT source, activity FROM system_traces.events WHERE session_id = ?",
ConsistencyLevel.ALL, trace);
List<InetAddress> scanning =
Arrays.stream(traces)
.filter(t -> indexScanningPattern.matcher(t[1].toString()).matches())
.map(t -> (InetAddress) t[0])
.distinct().collect(Collectors.toList());
List<InetAddress> executing =
Arrays.stream(traces)
.filter(t -> t[1].toString().equals(String.format("Executing read on " + tableName + " using index v_index_%d", seq.get())))
.map(t -> (InetAddress) t[0])
.distinct().collect(Collectors.toList());
Assert.assertEquals(Collections.singletonList(cluster.get(1).broadcastAddress().getAddress()), scanning);
Assert.assertEquals(3, executing.size());
});
}
}
}