blob: 60a6c486575a7f833aa9c3fb83254f648a3ce33c [file] [log] [blame]
/*
* Licensed to the Apache Software Foundation (ASF) under one
* or more contributor license agreements. See the NOTICE file
* distributed with this work for additional information
* regarding copyright ownership. The ASF licenses this file
* to you under the Apache License, Version 2.0 (the
* "License"); you may not use this file except in compliance
* with the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/
package org.apache.cassandra.stress.operations.userdefined;
import java.io.IOException;
import java.util.HashSet;
import java.util.List;
import java.util.Set;
import java.util.stream.Collectors;
import javax.naming.OperationNotSupportedException;
import com.google.common.util.concurrent.RateLimiter;
import com.datastax.driver.core.ColumnMetadata;
import com.datastax.driver.core.PagingState;
import com.datastax.driver.core.ResultSet;
import com.datastax.driver.core.Row;
import com.datastax.driver.core.SimpleStatement;
import com.datastax.driver.core.Statement;
import com.datastax.driver.core.TableMetadata;
import com.datastax.driver.core.Token;
import com.datastax.driver.core.TokenRange;
import org.apache.cassandra.stress.Operation;
import org.apache.cassandra.stress.StressYaml;
import org.apache.cassandra.stress.WorkManager;
import org.apache.cassandra.stress.generate.TokenRangeIterator;
import org.apache.cassandra.stress.settings.StressSettings;
import org.apache.cassandra.stress.util.JavaDriverClient;
import org.apache.cassandra.stress.util.ThriftClient;
import org.apache.cassandra.stress.util.Timer;
public class TokenRangeQuery extends Operation
{
private final ThreadLocal<State> currentState = new ThreadLocal<>();
private final TableMetadata tableMetadata;
private final TokenRangeIterator tokenRangeIterator;
private final String columns;
private final int pageSize;
private final boolean isWarmup;
public TokenRangeQuery(Timer timer,
StressSettings settings,
TableMetadata tableMetadata,
TokenRangeIterator tokenRangeIterator,
StressYaml.TokenRangeQueryDef def,
boolean isWarmup)
{
super(timer, settings);
this.tableMetadata = tableMetadata;
this.tokenRangeIterator = tokenRangeIterator;
this.columns = sanitizeColumns(def.columns, tableMetadata);
this.pageSize = isWarmup ? Math.min(100, def.page_size) : def.page_size;
this.isWarmup = isWarmup;
}
/**
* We need to specify the columns by name because we need to add token(partition_keys) in order to count
* partitions. So if the user specifies '*' then replace it with a list of all columns.
*/
private static String sanitizeColumns(String columns, TableMetadata tableMetadata)
{
if (!columns.equals("*"))
return columns;
return String.join(", ", tableMetadata.getColumns().stream().map(ColumnMetadata::getName).collect(Collectors.toList()));
}
/**
* The state of a token range currently being retrieved.
* Here we store the paging state to retrieve more pages
* and we keep track of which partitions have already been retrieved,
*/
private final static class State
{
public final TokenRange tokenRange;
public final String query;
public PagingState pagingState;
public Set<Token> partitions = new HashSet<>();
public State(TokenRange tokenRange, String query)
{
this.tokenRange = tokenRange;
this.query = query;
}
@Override
public String toString()
{
return String.format("[%s, %s]", tokenRange.getStart(), tokenRange.getEnd());
}
}
abstract static class Runner implements RunOp
{
int partitionCount;
int rowCount;
@Override
public int partitionCount()
{
return partitionCount;
}
@Override
public int rowCount()
{
return rowCount;
}
}
private class JavaDriverRun extends Runner
{
final JavaDriverClient client;
private JavaDriverRun(JavaDriverClient client)
{
this.client = client;
}
public boolean run() throws Exception
{
State state = currentState.get();
if (state == null)
{ // start processing a new token range
TokenRange range = tokenRangeIterator.next();
if (range == null)
return true; // no more token ranges to process
state = new State(range, buildQuery(range));
currentState.set(state);
}
ResultSet results;
Statement statement = new SimpleStatement(state.query);
statement.setFetchSize(pageSize);
if (state.pagingState != null)
statement.setPagingState(state.pagingState);
results = client.getSession().execute(statement);
state.pagingState = results.getExecutionInfo().getPagingState();
int remaining = results.getAvailableWithoutFetching();
rowCount += remaining;
for (Row row : results)
{
// this call will only succeed if we've added token(partition keys) to the query
Token partition = row.getPartitionKeyToken();
if (!state.partitions.contains(partition))
{
partitionCount += 1;
state.partitions.add(partition);
}
if (--remaining == 0)
break;
}
if (results.isExhausted() || isWarmup)
{ // no more pages to fetch or just warming up, ready to move on to another token range
currentState.set(null);
}
return true;
}
}
private String buildQuery(TokenRange tokenRange)
{
Token start = tokenRange.getStart();
Token end = tokenRange.getEnd();
List<String> pkColumns = tableMetadata.getPartitionKey().stream().map(ColumnMetadata::getName).collect(Collectors.toList());
String tokenStatement = String.format("token(%s)", String.join(", ", pkColumns));
StringBuilder ret = new StringBuilder();
ret.append("SELECT ");
ret.append(tokenStatement); // add the token(pk) statement so that we can count partitions
ret.append(", ");
ret.append(columns);
ret.append(" FROM ");
ret.append(tableMetadata.getName());
if (start != null || end != null)
ret.append(" WHERE ");
if (start != null)
{
ret.append(tokenStatement);
ret.append(" > ");
ret.append(start.toString());
}
if (start != null && end != null)
ret.append(" AND ");
if (end != null)
{
ret.append(tokenStatement);
ret.append(" <= ");
ret.append(end.toString());
}
return ret.toString();
}
private static class ThriftRun extends Runner
{
final ThriftClient client;
private ThriftRun(ThriftClient client)
{
this.client = client;
}
public boolean run() throws Exception
{
throw new OperationNotSupportedException("Bulk read over thrift not supported");
}
}
@Override
public void run(JavaDriverClient client) throws IOException
{
timeWithRetry(new JavaDriverRun(client));
}
@Override
public void run(ThriftClient client) throws IOException
{
timeWithRetry(new ThriftRun(client));
}
public boolean ready(WorkManager workManager, RateLimiter rateLimiter)
{
tokenRangeIterator.update();
if (tokenRangeIterator.exhausted() && currentState.get() == null)
return false;
int numLeft = workManager.takePermits(1);
if (rateLimiter != null && numLeft > 0 )
rateLimiter.acquire(numLeft);
return numLeft > 0;
}
public String key()
{
State state = currentState.get();
return state == null ? "-" : state.toString();
}
}